Ale*_*son 2 java stream-processing apache-flink flink-streaming
我在 Flink 中有一个流,它从源发送多维数据集,对多维数据集进行转换(为多维数据集中的每个元素添加 1),然后最后将其发送到下游以打印每秒的吞吐量。
该流通过 4 个线程并行化。
如果我理解正确的话,该windowAll运算符是一个非并行转换,因此应该将并行度缩小到 1,并通过将其与 一起使用TumblingProcessingTimeWindows.of(Time.seconds(1)),对最近一秒内所有并行子任务的吞吐量求和并打印它。我不确定是否得到正确的输出,因为每秒的吞吐量打印如下:
1> 25
2> 226
3> 354
4> 372
1> 382
2> 403
3> 363
...
Run Code Online (Sandbox Code Playgroud)
问题:流打印机是否打印每个线程(1、2、3 和 4)的吞吐量,还是仅选择线程 3 来打印所有子任务的吞吐量总和?
当我一开始将环境的并行度设置为 1 时env.setParallelism(1),我在吞吐量之前没有得到“x>”,但我似乎获得了与设置为 4 时相同(甚至更好)的吞吐量。这:
45
429
499
505
1
503
524
530
...
Run Code Online (Sandbox Code Playgroud)
这是该程序的代码片段:
imports...
public class StreamingCase {
public static void main(String[] args) throws Exception {
int parallelism = 4;
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setParallelism(parallelism);
DataStream<Cube> start = env
.addSource(new CubeSource());
DataStream<Cube> adder = start
.map(new MapFunction<Cube, Cube>() {
@Override
public Cube map(Cube cube) throws Exception {
return cube.cubeAdd(1);
}
});
DataStream<Integer> throughput = ((SingleOutputStreamOperator<Cube>) adder)
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.apply(new AllWindowFunction<Cube, Integer, TimeWindow>() {
@Override
public void apply(TimeWindow tw,
Iterable<Cube> values,
Collector<Integer> out) throws Exception {
int sum = 0;
for (Cube c : values)
sum++;
out.collect(sum);
}
});
throughput.print();
env.execute("Cube Stream of Sweetness");
}
}
Run Code Online (Sandbox Code Playgroud)
如果环境的并行度设置为 3 并且您使用 WindowAll 运算符,则只有窗口运算符以并行度 1 运行。接收器仍将以并行度 3 运行。因此,计划如下所示:
In_1 -\ /- Out_1
In_2 --- WindowAll_1 --- Out_2
In_3 -/ \- Out_3
Run Code Online (Sandbox Code Playgroud)
WindowAll 运算符使用循环策略将其输出发送到后续任务。这就是不同线程发出程序结果记录的原因。
当您将环境并行度设置为 1 时,所有运算符都运行单个任务。