Apache Flink:两个(或多个)任务管理器之间共享状态

Mau*_*ino 0 state join apache-flink

假设我有两个任务管理器,每个任务管理器只有一个任务槽。现在,我有以下工作:

    KeyedStream<...> streamA = env.addSource(...).keyBy(...);
    KeyedStream<...> streamB = env.addSource(...).keyBy(...);

    streamA.connect(streamB).flatMap(new StatefulJoinFunction()).setParallelism(2);
Run Code Online (Sandbox Code Playgroud)

一个任务管理器将使用来自 Kafka 主题的数据,另一个任务管理器将使用来自另一个 Kafka 主题的数据。

我将作业发送给作业管理器来执行它。Flink 分配两个任务管理器来处理 flatMap(因为任务管理器只有一个任务槽)。

flatMap 在事件之间进行简单的连接(使用两个键控状态):

    public class StatefulJoinFunction extends RichCoFlatMapFunction<A, B, String> {
        private ValueState<A> AState;
        private ValueState<B> BState;

        @Override
        public void open(Configuration config) {
            AState = getRuntimeContext().getState(new ValueStateDescriptor<>("A event state", A.class));
            BState = getRuntimeContext().getState(new ValueStateDescriptor<>("B event state", B.class));
        }

        @Override
        public void flatMap1(A event, Collector<String> out) throws Exception {
            B secondEvent = BState.value();

            if (secondEvent == null)
                AState.update(event);
            else {
                out.collect(...);
                BState.clear();
            }
        }

        @Override
        public void flatMap2(A event, Collector<String> out) throws Exception {
            A firstEvent = AState.value();

            if (firstEvent == null)
                BState.update(event);
            else {
                out.collect(...);
                AState.clear();
            }
        }
    }
Run Code Online (Sandbox Code Playgroud)

如果我理解正确的话,在 connect 方法之后,流就变成只有一个。现在,实现的 flatMap 需要共享状态,因为操作员必须控制相关事件是否到达才能应用连接,但它是以两个并行度执行的,因此使用两个任务管理器。这意味着每次必须更新状态时,任务管理器都应该保存在另一个任务管理器的状态中(在 connect 方法之后共享),或者可能需要简单地读取状态。那么任务管理器如何通信呢?由于任务管理器可能在不同的集群节点上运行,这是否会影响性能?

编辑:我在 Flink 的博客上找到了以下文章,似乎两个任务管理器可以通过 TCP 连接进行通信,这对我来说很有意义,因为在某些情况下我们需要在事件之间共享状态。如果这是错误的,您能向我解释一下Flink 如何管理以下场景吗?

假设总是有两个任务管理器,物理上位于两个集群节点上。每个任务管理器始终只有一个插槽。我运行上述作业并将并行度设置为 2(例如,在将作业发送到作业管理器时使用-p参数)。现在,Flink 将从我的作业中创建两个结构相同的子任务,并将它们发送到任务管理器。两个任务管理器都将执行“相同”的作业,但消耗不同的事件。该作业消耗来自两个 Kafka 主题的事件:A 和 B。这意味着第一个和第二个任务管理器将消耗来自主题 A 和 B 的事件,但是不同的事件,否则会出现重复。作业是相同的,即它执行上面的 RichCoFlatMapFunction,然后每个任务管理器将在本地处理其消耗的事件集和个人本地状态。现在问题来了:假设第一个任务管理器消耗了一个具有键“1”的事件。该事件到达 RichCoFlatMapFunction 内部,并存储在状态内部,因为操作员仍在等待具有相同键的另一个事件来生成连接。如果从第二个任务管理器消耗了键为“1”的另一个事件,并且它们不共享其状态或通信,则将不可能进行连接。我的推理有什么问题吗?

Dav*_*son 5

两个任务管理器不需要为了状态共享而进行通信——Flink 中没有状态共享。

下面显示的这三个执行图中的任何一个都是可能的,具体取决于您如何排列源的详细信息。在每个图的左侧,我们看到 A 和 B 的源运算符,右侧是通过 RichCoFlatMap 实现联接的双输入运算符的两个并行实例。

在此输入图像描述

keyBy 不是运算符,而是指定源和两个 RichCoFlatMap 实例的连接方式。它将其安排为一个散列连接,对源流进行重新分区。

采用这三种情况中的哪一种并不重要,因为在所有三种情况下,keyBy 都会具有相同的效果,将某些键的所有事件引导到 Join1,并将其他键的所有事件引导到 Join2。

换句话说,对于任何给定的键,该键的所有事件都将在同一任务槽中处理。您可以将其视为ValueState<A>分布式(分片)键/值存储,其中值的类型为 A。每个任务管理器都有该键/值存储的一部分的状态(对于键的不相交子集),并处理这些键的所有事件(并且仅这些键)。

例如:在 中flatMap1,当BState.value()使用 from 的元素调用时streamA,Flink 运行时将访问BState 当前上下文中的键streamA的值,这意味着与当前正在处理的事件的键关联的值。在当前任务中,该状态始终是本地的。同样,flatMap2将始终使用来自 的元素进行调用streamB

这种设计避免了任务管理器之间的任何耦合,这有利于可扩展性和性能。