Flink KeyedCoProcessFunction 处理状态

Ser*_*ent 1 apache-flink flink-streaming

我使用KeyedCoProcessFunction函数用来自另一个流的数据来丰富主数据流

代码:

class AssetDataEnrichment extends KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData] with LazyLogging {

  case class AssetStateDoc(assetId: Option[String])
  private var associatedDevices: ValueState[AssetStateDoc] = _

  override def open(parameters: Configuration): Unit = {
    val associatedDevicesDescriptor =
      new ValueStateDescriptor[AssetStateDoc]("associatedDevices", classOf[AssetStateDoc])
    associatedDevices = getRuntimeContext.getState[AssetStateDoc](associatedDevicesDescriptor)
  }

  override def processElement1(
                                packet: PacketData,
                                ctx: KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData]#Context,
                                out: Collector[AssetData]): Unit = {
    
    val tmpState = associatedDevices.value
    val state = if (tmpState == null) AssetStateDoc(None) else tmpState
    
    state.assetId match {
      case Some(assetId) =>
        logger.debug(s"There are state for ${packet.tag.externalId} = $assetId")
        out.collect(AssetData(assetId, packet.tag.externalId.get, packet.toString))
      case None => logger.debug(s"No state for a packet ${packet.tag.externalId}")
      case _ => logger.debug("Smth went wrong")
    }
  }

  override def processElement2(
                                value: AssetCommandState,
                                ctx: KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData]#Context,
                                out: Collector[AssetData]): Unit = {
    
    value.command match {
      case CREATE =>
        logger.debug(s"Got command to CREATE state for tag: ${value.id} with value: ${value.assetId}")
        logger.debug(s"current state is ${associatedDevices.value()}")
        associatedDevices.update(AssetStateDoc(Some(value.assetId)))
        logger.debug(s"new state is ${associatedDevices.value()}")
      case _ =>
        logger.error("Got unknown AssetCommandState command")
    }
  }
}

Run Code Online (Sandbox Code Playgroud)

processElement2()效果很好,它接受数据并更新状态。
processElement1()我总是在打case None => logger.debug(s"No state for a packet ${packet.tag.externalId}")

processElement2虽然我预计函数中会设置一个值

作为一个例子,我使用了本指南 -https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/

Dav*_*son 5

processElement1processElement2共享状态,但请记住这是键分区状态。这意味着processElement2处理给定值v2processElement1时设置的值仅在稍后使用与v2具有相同键的值v1调用时才会看到。

另请记住,您无法控制进入processElement1和的两个流之间的竞争条件processElement2

官方 Apache Flink 培训中的RidesAndFares 练习就是学习如何使用 API 的这一部分。https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/etl/是相应教程的主页。