我用 BroadcastProcessFunction 实现了一个 flink 流。我从 processBroadcastElement 获取模型,并将其应用于 processElement 中的事件。
我找不到对我的流进行单元测试的方法,因为我找不到确保在第一个事件之前调度模型的解决方案。我想说有两种方法可以实现这一点:
1. 找到一种解决方案,首先将模型推送到流中
2. 在执行流之前先用模型填充广播状态,以便将其恢复
我可能错过了一些东西,但我还没有找到一个简单的方法来做到这一点。
这是针对我的问题的简单单元测试:
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
import org.scalatest.Matchers._
import org.scalatest.{BeforeAndAfter, FunSuite}
import scala.collection.mutable
class BroadCastProcessor extends BroadcastProcessFunction[Int, (Int, String), String] {
import BroadCastProcessor._
override def processElement(value: Int,
ctx: BroadcastProcessFunction[Int, (Int, String), String]#ReadOnlyContext,
out: Collector[String]): Unit = {
val broadcastState = ctx.getBroadcastState(broadcastStateDescriptor)
if (broadcastState.contains(value)) {
out.collect(broadcastState.get(value))
}
}
override def processBroadcastElement(value: (Int, String),
ctx: BroadcastProcessFunction[Int, (Int, String), String]#Context,
out: Collector[String]): Unit …Run Code Online (Sandbox Code Playgroud) 我在 flink 中有一个数据流,并使用 ProcessFunction 中的量规生成自己的指标。
由于这些指标对我的活动很重要,因此我想在流程执行后对它们进行单元测试。
不幸的是,我没有找到一种方法来实现适当的测试报告器。这是解释我的问题的简单代码。
这段代码有两个问题:
这是样本
import java.util.concurrent.atomic.AtomicInteger
import org.apache.flink.api.scala.metrics.ScalaGauge
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.metrics.reporter.AbstractReporter
import org.apache.flink.metrics.{Gauge, Metric, MetricConfig}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.util.Collector
import org.scalatest.FunSuite
import org.scalatest.Matchers._
import org.scalatest.PartialFunctionValues._
import scala.collection.JavaConverters._
import scala.collection.mutable
/* Test based on Flink test example https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/testing.html */
class MultiplyByTwo extends ProcessFunction[Long, Long] {
override def processElement(data: Long, context: ProcessFunction[Long, Long]#Context, collector: Collector[Long]): Unit = {
collector.collect(data * 2L)
}
val nbrCalls …Run Code Online (Sandbox Code Playgroud)