Vla*_*yak 10 apache-flink flink-streaming
卡住了一下CoFlatMapFunction.如果我把它放在DataStream之前的窗口上似乎工作正常但是如果放在窗口的"应用"功能之后就失败了.
我正在测试两个流,主要的"功能"在flatMap1不断摄取数据和控制流"模型"时flatMap2根据请求更改模型.
我能够设置并看到b0/b1正确设置flatMap2,但flatMap1始终看到b0和b1在初始化时设置为0.
我错过了一些明显的东西吗?
public static class applyModel implements CoFlatMapFunction<Features, Model, EnrichedFeatures> {
private static final long serialVersionUID = 1L;
Double b0;
Double b1;
public applyModel(){
b0=0.0;
b1=0.0;
}
@Override
public void flatMap1(Features value, Collector<EnrichedFeatures> out) {
System.out.print("Main: " + this + "\n");
}
@Override
public void flatMap2(Model value, Collector<EnrichedFeatures> out) {
System.out.print("Old Model: " + this + "\n");
b0 = value.getB0();
b1 = value.getB1();
System.out.print("New Model: " + this + "\n");
}
@Override
public String toString(){
return "CoFlatMapFunction: {b0: " + b0 + ", b1: " + b1 + "}";
}
}
Run Code Online (Sandbox Code Playgroud)
这是邮件列表中的答案...
CoFlatMapFunction 是否打算并行执行?
如果是,您需要某种方法来确定性地将哪个记录分配给哪个并行实例。以某种方式,CoFlatMapFunction 在模型和会话窗口的结果之间进行并行(分区)连接,因此您需要某种形式的键来选择元素转到哪个分区。那有意义吗?
如果不是,请尝试显式将其设置为并行度 1。
问候,斯蒂芬
所有人都可以通过broadcast() 访问只读的全局状态。
目前不提供可供所有人读取和更新的全局状态。对此的一致操作将非常昂贵,需要某种形式的分布式通信/共识。
相反,我鼓励您遵循以下内容:
1) 如果您可以对状态进行分区,请使用 keyBy().mapWithState() - 本地化状态操作并使其非常快。
2)如果你的状态不是按键组织的,那么你的状态可能非常小,你也许可以使用非并行操作。
3)如果某个操作更新状态而另一个操作访问它,您通常可以通过迭代和 CoFlatMapFunction 来实现(一侧是原始输入,另一侧是反馈输入)。
所有方法最终都会本地化状态访问和修改,如果可能的话,这是一个很好的遵循模式。
问候,斯蒂芬
| 归档时间: |
|
| 查看次数: |
1319 次 |
| 最近记录: |