Flink有两种消息
对于这两个消息,我们有单独的源流。我们对两个流都附加了相同的接收器。我们想要做的是广播控制消息,以便所有并行运行的接收器都应该接收它。
下面是相同的代码:
package com.ranjit.com.flinkdemo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.fs.DateTimeBucketer;
import org.apache.flink.streaming.connectors.fs.RollingSink;
import org.apache.flink.streaming.connectors.fs.StringWriter;;
public class FlinkBroadcast {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStream<String> ctrl_message_stream = env.socketTextStream("localhost", 8088);
ctrl_message_stream.broadcast();
DataStream<String> message_stream = env.socketTextStream("localhost", 8087);
RollingSink sink = new RollingSink<String>("/base/path");
sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"));
sink.setWriter(new StringWriter<String>() );
sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,
ctrl_message_stream.broadcast().addSink(sink);
message_stream.addSink(sink);
env.execute("stream");
}
}
Run Code Online (Sandbox Code Playgroud)
但是我观察到的是,它正在创建4个接收器实例,并且控制消息仅广播到2个接收器(由控制消息流创建)。因此,我了解的是,两个流都应通过相同的运算符链来执行此操作,而这是我们不希望的,因为数据消息将进行多次转换。我们已经编写了自己的接收器,如果它是控制消息,它将接收消息,然后它只会滚动文件。
示例代码:
package com.gslab.com.dataSets;
import java.io.File;
import java.util.ArrayList;
import …Run Code Online (Sandbox Code Playgroud)