Flink:在CoFlatMapFunction中共享状态

Vla*_*yak 10 apache-flink flink-streaming

卡住了一下CoFlatMapFunction.如果我把它放在DataStream之前的窗口上似乎工作正常但是如果放在窗口的"应用"功能之后就失败了.

我正在测试两个流,主要的"功能"在flatMap1不断摄取数据和控制流"模型"时flatMap2根据请求更改模型.

我能够设置并看到b0/b1正确设置flatMap2,但flatMap1始终看到b0和b1在初始化时设置为0.

我错过了一些明显的东西吗?

public static class applyModel implements CoFlatMapFunction<Features, Model, EnrichedFeatures> {
    private static final long serialVersionUID = 1L;

    Double b0;
    Double b1;

    public applyModel(){
        b0=0.0;
        b1=0.0;
    }

    @Override
    public void flatMap1(Features value, Collector<EnrichedFeatures> out) {
        System.out.print("Main: " + this + "\n");
    }

    @Override
    public void flatMap2(Model value, Collector<EnrichedFeatures> out) {
        System.out.print("Old Model: " + this + "\n");
        b0 = value.getB0();
        b1 = value.getB1();
        System.out.print("New Model: " + this + "\n");
    }

    @Override
    public String toString(){
        return "CoFlatMapFunction: {b0: " + b0 + ", b1: " + b1 + "}";
    }
}
Run Code Online (Sandbox Code Playgroud)

Vla*_*yak 4

这是邮件列表中的答案...

CoFlatMapFunction 是否打算并行执行?

如果是,您需要某种方法来确定性地将哪个记录分配给哪个并行实例。以某种方式,CoFlatMapFunction 在模型和会话窗口的结果之间进行并行(分区)连接,因此您需要某种形式的键来选择元素转到哪个分区。那有意义吗?

如果不是,请尝试显式将其设置为并行度 1。

问候,斯蒂芬


所有人都可以通过broadcast() 访问只读的全局状态。

目前不提供可供所有人读取和更新的全局状态。对此的一致操作将非常昂贵,需要某种形式的分布式通信/共识。

相反,我鼓励您遵循以下内容:

1) 如果您可以对状态进行分区,请使用 keyBy().mapWithState() - 本地化状态操作并使其非常快。

2)如果你的状态不是按键组织的,那么你的状态可能非常小,你也许可以使用非并行操作。

3)如果某个操作更新状态而另一个操作访问它,您通常可以通过迭代和 CoFlatMapFunction 来实现(一侧是原始输入,另一侧是反馈输入)。

所有方法最终都会本地化状态访问和修改,如果可能的话,这是一个很好的遵循模式。

问候,斯蒂芬