Den*_*din 0 apache-flink flink-streaming
早些时候我询问了Flink 的一个简单的hello world 示例。这给了我一些很好的例子!
\n不过,我想询问更多 \xe2\x80\x98streaming\xe2\x80\x99 示例,其中我们每秒生成一个输入值。理想情况下,这应该是随机的,但即使每次都是相同的值也可以。
\n目标是获得一个在没有/最少外部接触的情况下 \xe2\x80\x98moves\xe2\x80\x99 的流。
\n因此我的问题是:
\n我找到了如何通过在外部生成数据并写入 Kafka 或收听公共源来展示这一点,但是我试图以最小的依赖来解决它(就像从 Nifi 中的GenerateFlowFile 开始)。
\n这是一个例子。这是作为如何使源和接收器可插拔的示例而构建的。这个想法是,在开发中,您可以使用随机源并打印结果,对于测试,您可以使用输入事件的硬连线列表并将结果收集在列表中,在生产中,您将使用真实的源和接收器。
这是工作:
/*
 * Example showing how to make sources and sinks pluggable in your application code so
 * you can inject special test sources and test sinks in your tests.
 */
public class TestableStreamingJob {
    private SourceFunction<Long> source;
    private SinkFunction<Long> sink;
    public TestableStreamingJob(SourceFunction<Long> source, SinkFunction<Long> sink) {
        this.source = source;
        this.sink = sink;
    }
    public void execute() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Long> LongStream =
                env.addSource(source)
                        .returns(TypeInformation.of(Long.class));
        LongStream
                .map(new IncrementMapFunction())
                .addSink(sink);
        env.execute();
    }
    public static void main(String[] args) throws Exception {
        TestableStreamingJob job = new TestableStreamingJob(new RandomLongSource(), new PrintSinkFunction<>());
        job.execute();
    }
    // While it's tempting for something this simple, avoid using anonymous classes or lambdas
    // for any business logic you might want to unit test.
    public class IncrementMapFunction implements MapFunction<Long, Long> {
        @Override
        public Long map(Long record) throws Exception {
            return record + 1 ;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)
这是RandomLongSource:
public class RandomLongSource extends RichParallelSourceFunction<Long> {
    private volatile boolean cancelled = false;
    private Random random;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        random = new Random();
    }
    @Override
    public void run(SourceContext<Long> ctx) throws Exception {
        while (!cancelled) {
            Long nextLong = random.nextLong();
            synchronized (ctx.getCheckpointLock()) {
                ctx.collect(nextLong);
            }
        }
    }
    @Override
    public void cancel() {
        cancelled = true;
    }
}
Run Code Online (Sandbox Code Playgroud)
        |   归档时间:  |  
           
  |  
        
|   查看次数:  |  
           1901 次  |  
        
|   最近记录:  |