如何迭代 Flink DataStream 中的每条消息?

Meg*_*p V 2 apache-flink flink-streaming

我有来自 Kafka 的消息流,如下所示

DataStream<String> messageStream = env
  .addSource(new FlinkKafkaConsumer09<>(topic, new MsgPackDeserializer(), props));
Run Code Online (Sandbox Code Playgroud)

如何迭代流中的每条消息并对其执行某些操作?我看到一个iterate()方法,DataStream但它不返回Iterator<String>.

Fab*_*ske 5

我认为您正在寻找一个MapFunction.

DataStream<String> messageStream = env.addSource(
    new FlinkKafkaConsumer09<>(topic, new MsgPackDeserializer(), props));

DataStream<Y> mappedMessages = messageStream
  .map(new MapFunction<String, Y>() {
    public Y map(String message) {
      // do something with each message and return Y
    }
  });
Run Code Online (Sandbox Code Playgroud)

如果您不想为每条传入消息发出恰好一条记录,请查看FlatMapFunction.