我是卡夫卡和火花的初学者.我想通过火花流对我从kafka收到的关于特定主题的数据进行实时处理.我无法使用createStream函数返回的JavaPairReceiverInputDStream.
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName(
"testwordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf,
Durations.seconds(1));
Map<String, Integer> topics_map = new HashMap<String, Integer>();
topics_map.put("Customtopic", 10);
JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils
.createStream(jssc, "localhost:2181", "kafkasparkconsumer",
topics_map);
Run Code Online (Sandbox Code Playgroud)
下面的代码给出了一个错误:
JavaPairDStream<String, Integer> wordCounts = kafkaStream.map(
new PairFunction<String, String, Integer>() {
@Override public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
wordCounts.print();
Run Code Online (Sandbox Code Playgroud)
JavaPairDStream类型中的方法映射(Function,R>)不适用于参数(new PairFunction(){})SparkStreamingKafka.java/Kafka-Spark/src/com/sd/kafka第43行Java问题
我使用的火花版本是1.2.0.我找不到java api处理kafka消息的例子.谁能告诉我我需要改变什么?
你打错了方法.在java中如果你想获得一对,你应该调用MapToPair.试试这段代码:
JavaPairDStream<String, Integer> pairs = kafkaStream
.mapToPair(new PairFunction<Tuple2<String, String>, String, Integer>() {
@Override public Tuple2<String, Integer> call(Tuple2<String, String> word) throws Exception {
return new Tuple2<String, Integer>(word._2(), 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});
pairs.print();
jssc.start();
jssc.awaitTermination();
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1295 次 |
| 最近记录: |