我想在 Apache Flink 中实现以下场景:
给定一个具有 4 个分区的 Kafka 主题,我想根据事件的类型使用不同的逻辑在 Flink 中独立处理分区内数据。
特别是,假设输入 Kafka 主题包含前面图像中描述的事件。每个事件都有不同的结构:分区 1 有字段“ a ”作为键,分区 2 有字段“ b ”作为键,等等。在 Flink 中,我想根据事件应用不同的业务逻辑,所以我想我应该以某种方式分割流。为了实现图中所描述的效果,我想只使用一个消费者来做类似的事情(我不明白为什么我应该使用更多):
FlinkKafkaConsumer<..> consumer = ...
DataStream<..> stream = flinkEnv.addSource(consumer);
stream.keyBy("a").map(new AEventMapper()).addSink(...);
stream.keyBy("b").map(new BEventMapper()).addSink(...);
stream.keyBy("c").map(new CEventMapper()).addSink(...);
stream.keyBy("d").map(new DEventMapper()).addSink(...);
Run Code Online (Sandbox Code Playgroud)
(一)正确吗?另外,如果我想并行处理每个 Flink 分区,因为我只想按顺序处理按同一 Kafka 分区排序的事件,而不是全局考虑它们,(b) 我该怎么办?我知道该方法的存在setParallelism()
,但我不知道在这种情况下将其应用到哪里。
我正在寻找有关标记(a)和(b)的问题的答案。先感谢您。
parallel-processing partitioning apache-kafka apache-flink kafka-topic
我已经阅读了 Flink 关于状态后端的官方文档,这里。特别是,我对RocksDBStateBackend很感兴趣。
我不明白,如果我启用这种后端,RocksDB 是否可以通过 Flink 集群内的另一个节点从TaskManagers访问?
到目前为止,我对 RocksDBStateBackend 的理解是任务管理器将状态存储在它们的内存中,即 JVM 进程的内存中。之后,他们会将状态发送到存储在 RocksDB 中吗?如果是,Flink 集群中的 RocksDB 在哪里?物理上在哪里?
假设我有两个任务管理器,每个任务管理器只有一个任务槽。现在,我有以下工作:
KeyedStream<...> streamA = env.addSource(...).keyBy(...);
KeyedStream<...> streamB = env.addSource(...).keyBy(...);
streamA.connect(streamB).flatMap(new StatefulJoinFunction()).setParallelism(2);
Run Code Online (Sandbox Code Playgroud)
一个任务管理器将使用来自 Kafka 主题的数据,另一个任务管理器将使用来自另一个 Kafka 主题的数据。
我将作业发送给作业管理器来执行它。Flink 分配两个任务管理器来处理 flatMap(因为任务管理器只有一个任务槽)。
flatMap 在事件之间进行简单的连接(使用两个键控状态):
public class StatefulJoinFunction extends RichCoFlatMapFunction<A, B, String> {
private ValueState<A> AState;
private ValueState<B> BState;
@Override
public void open(Configuration config) {
AState = getRuntimeContext().getState(new ValueStateDescriptor<>("A event state", A.class));
BState = getRuntimeContext().getState(new ValueStateDescriptor<>("B event state", B.class));
}
@Override
public void flatMap1(A event, Collector<String> out) throws Exception {
B secondEvent = BState.value();
if (secondEvent == null)
AState.update(event);
else {
out.collect(...); …
Run Code Online (Sandbox Code Playgroud)