Flink:如何在flink中处理外部应用程序配置更改

Neo*_*ter 7 apache-flink flink-streaming flink-cep

我的要求是在一天内传输数百万条记录,并且它对外部配置参数有很大的依赖性.例如,用户可以随时在Web应用程序中更改所需的设置,并且在进行更改后,必须使用新的应用程序配置参数进行流式处理.这些是应用级配置,我们还有一些动态排除参数,每个数据都必须通过并过滤.

我看到flink没有全局状态,它在所有任务管理器和子任务中共享.拥有一个集中式缓存是一个选项,但对于每个参数,我必须从缓存中读取它,这将增加延迟.请告知更好的方法来处理这些场景以及其他应用程序如何处理它.谢谢.

Fab*_*ske 7

更新正在运行的流应用程序的配置是常见的要求.在Flink的DataStream API中,这可以使用所谓的CoFlatMapFunction处理两个输入流来完成.其中一个流可以是数据流,另一个是控制流.

以下示例显示如何动态调整过滤掉超过特定长度的字符串的用户函数.

val data: DataStream[String] = ???
val control: DataStream[Int] = ???

val filtered: DataStream[String] = data
  // broadcast all control messages to the following CoFlatMap subtasks
  .connect(control.broadcast)
  // process data and control messages
  .flatMap(new DynLengthFilter)


class DynLengthFilter extends CoFlatMapFunction[String, Int, String] with Checkpointed[Int] {

  var length = 0

  // filter strings by length
  override def flatMap1(value: String, out: Collector[String]): Unit = {
    if (value.length < length) {
      out.collect(value)
    }
  }

  // receive new filter length
  override def flatMap2(value: Int, out: Collector[String]): Unit = {
    length = value
  }

  override def snapshotState(checkpointId: Long, checkpointTimestamp: Long): Int = length

  override def restoreState(state: Int): Unit = {
    length = state
  }
}
Run Code Online (Sandbox Code Playgroud)

DynLengthFilter用户的功能实现Checkpointed为过滤器长度接口.如果发生故障,将自动恢复此信息.

  • 是的,在更高版本的 Flink 中,您可以使用 BroadcastState 动态分发和管理配置参数。 (2认同)