小编Ran*_*nde的帖子

对Apache Flink中的两个消息流使用相同的接收器

Flink有两种消息

  1. 控制消息->仅滚动文件
  2. 数据消息->将使用接收器存储在S3中

对于这两个消息,我们有单独的源流。我们对两个流都附加了相同的接收器。我们想要做的是广播控制消息,以便所有并行运行的接收器都应该接收它。

下面是相同的代码:

package com.ranjit.com.flinkdemo;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.fs.DateTimeBucketer;
import org.apache.flink.streaming.connectors.fs.RollingSink;

import org.apache.flink.streaming.connectors.fs.StringWriter;;

public class FlinkBroadcast {
    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        DataStream<String> ctrl_message_stream = env.socketTextStream("localhost", 8088);

        ctrl_message_stream.broadcast();

        DataStream<String> message_stream = env.socketTextStream("localhost", 8087);

        RollingSink sink = new RollingSink<String>("/base/path");
        sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"));
        sink.setWriter(new StringWriter<String>() );
        sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,

        ctrl_message_stream.broadcast().addSink(sink);
        message_stream.addSink(sink);

        env.execute("stream");
    }

}
Run Code Online (Sandbox Code Playgroud)

但是我观察到的是,它正在创建4个接收器实例,并且控制消息仅广播到2个接收器(由控制消息流创建)。因此,我了解的是,两个流都应通过相同的运算符链来执行此操作,而这是我们不希望的,因为数据消息将进行多次转换。我们已经编写了自己的接收器,如果它是控制消息,它将接收消息,然后它只会滚动文件。

示例代码:

package com.gslab.com.dataSets;
import java.io.File;
import java.util.ArrayList;
import …
Run Code Online (Sandbox Code Playgroud)

apache-flink flink-streaming

5
推荐指数
1
解决办法
1754
查看次数

标签 统计

apache-flink ×1

flink-streaming ×1