小编Ale*_*oks的帖子

flink+Kafka:getHostnamePort

我想从 flink 阅读一个 kafka 主题

package Toletum.pruebas;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

public class LeeKafka {
  public static void main(String[] args) throws Exception {
    final ParameterTool parameterTool = ParameterTool.fromArgs(args);
    
    // create execution environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

      FlinkKafkaConsumer082<String> kafkaSrc = new FlinkKafkaConsumer082<String>("test02", 
      new SimpleStringSchema(), 
      parameterTool.getProperties());
      
      DataStream<String> messageStream = env.addSource(kafkaSrc);
      
    messageStream.rebalance().map(new MapFunction<String, String>() {
      private static final long serialVersionUID = -6867736771747690202L;
  
      public String map(String value) throws Exception {
        return "Kafka and Flink says: " …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka apache-flink

5
推荐指数
1
解决办法
1056
查看次数

标签 统计

apache-flink ×1

apache-kafka ×1

java ×1