小编Mat*_*ieu的帖子

当 processElement 依赖于广播数据时,如何在 flink 中对 BroadcastProcessFunction 进行单元测试

我用 BroadcastProcessFunction 实现了一个 flink 流。我从 processBroadcastElement 获取模型,并将其应用于 processElement 中的事件。

我找不到对我的流进行单元测试的方法,因为我找不到确保在第一个事件之前调度模型的解决方案。我想说有两种方法可以实现这一点:
1. 找到一种解决方案,首先将模型推送到流中
2. 在执行流之前先用模型填充广播状态,以便将其恢复

我可能错过了一些东西,但我还没有找到一个简单的方法来做到这一点。

这是针对我的问题的简单单元测试:

import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
import org.scalatest.Matchers._
import org.scalatest.{BeforeAndAfter, FunSuite}

import scala.collection.mutable


class BroadCastProcessor extends BroadcastProcessFunction[Int, (Int, String), String] {

  import BroadCastProcessor._

  override def processElement(value: Int,
                              ctx: BroadcastProcessFunction[Int, (Int, String), String]#ReadOnlyContext,
                              out: Collector[String]): Unit = {
    val broadcastState = ctx.getBroadcastState(broadcastStateDescriptor)

    if (broadcastState.contains(value)) {
      out.collect(broadcastState.get(value))
    }
  }

  override def processBroadcastElement(value: (Int, String),
                                       ctx: BroadcastProcessFunction[Int, (Int, String), String]#Context,
                                       out: Collector[String]): Unit …
Run Code Online (Sandbox Code Playgroud)

scala apache-flink

5
推荐指数
1
解决办法
1096
查看次数

如何在 flink 中统一测试指标

我在 flink 中有一个数据流,并使用 ProcessFunction 中的量规生成自己的指标。
由于这些指标对我的活动很重要,因此我想在流程执行后对它们进行单元测试。
不幸的是,我没有找到一种方法来实现适当的测试报告器。这是解释我的问题的简单代码。
这段代码有两个问题:

  1. 我如何触发仪表
  2. 如何通过 env.execute 实例化记者

这是样本

import java.util.concurrent.atomic.AtomicInteger

import org.apache.flink.api.scala.metrics.ScalaGauge
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.metrics.reporter.AbstractReporter
import org.apache.flink.metrics.{Gauge, Metric, MetricConfig}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.util.Collector
import org.scalatest.FunSuite
import org.scalatest.Matchers._
import org.scalatest.PartialFunctionValues._

import scala.collection.JavaConverters._
import scala.collection.mutable

/* Test based on Flink test example https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/testing.html */

class MultiplyByTwo extends ProcessFunction[Long, Long] {
  override def processElement(data: Long, context: ProcessFunction[Long, Long]#Context, collector: Collector[Long]): Unit = {
    collector.collect(data * 2L)
  }

  val nbrCalls …
Run Code Online (Sandbox Code Playgroud)

scala apache-flink

3
推荐指数
1
解决办法
1808
查看次数

标签 统计

apache-flink ×2

scala ×2