kafka引发流式传输java api问题

moh*_*t_d 3 java apache-spark

我是卡夫卡和火花的初学者.我想通过火花流对我从kafka收到的关于特定主题的数据进行实时处理.我无法使用createStream函数返回的JavaPairReceiverInputDStream.

SparkConf conf = new SparkConf().setMaster("local[2]").setAppName(
                "testwordCount");
        JavaStreamingContext jssc = new JavaStreamingContext(conf,
                Durations.seconds(1));

        Map<String, Integer> topics_map = new HashMap<String, Integer>();

        topics_map.put("Customtopic", 10);

JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils
                .createStream(jssc, "localhost:2181", "kafkasparkconsumer",
                        topics_map);
Run Code Online (Sandbox Code Playgroud)

下面的代码给出了一个错误:

 JavaPairDStream<String, Integer> wordCounts = kafkaStream.map(
                 new PairFunction<String, String, Integer>() {
                 @Override public Tuple2<String, Integer> call(String s) {
                 return new Tuple2<String, Integer>(s, 1);
                 }
                 }).reduceByKey(new Function2<Integer, Integer, Integer>() {
                 @Override
                 public Integer call(Integer i1, Integer i2) {
                 return i1 + i2;
                 }
                 });
                 wordCounts.print();
Run Code Online (Sandbox Code Playgroud)

JavaPairDStream类型中的方法映射(Function,R>)不适用于参数(new PairFunction(){})SparkStreamingKafka.java/Kafka-Spark/src/com/sd/kafka第43行Java问题

我使用的火花版本是1.2.0.我找不到java api处理kafka消息的例子.谁能告诉我我需要改变什么?

gas*_*rms 7

你打错了方法.在java中如果你想获得一对,你应该调用MapToPair.试试这段代码:

JavaPairDStream<String, Integer> pairs = kafkaStream
            .mapToPair(new PairFunction<Tuple2<String, String>, String, Integer>() {
                @Override public Tuple2<String, Integer> call(Tuple2<String, String> word) throws Exception {
                    return new Tuple2<String, Integer>(word._2(), 1);
                }
            }).reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override public Integer call(Integer integer, Integer integer2) throws Exception {
                    return integer + integer2;
                }
            });

    pairs.print();

    jssc.start();
    jssc.awaitTermination();
Run Code Online (Sandbox Code Playgroud)