Pri*_*til 1 apache-kafka apache-spark spark-streaming
HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "localhost:9092");
String topics = "test4";
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(" ")));
JavaDStream<String> stream1 = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class,
StringDecoder.class, kafkaParams, topicsSet)
.transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
@Override
public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) {
rdd.saveAsTextFile("output");
return rdd;
}
}).map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> kv) {
return kv._2();
}
});
stream1.print();
jssc.start();
jssc.awaitTermination();
Run Code Online (Sandbox Code Playgroud)
交叉检查主题“test4”中是否有有效数据。
我期望从 kafka 集群流式传输的字符串在控制台中打印。控制台中没有异常,但也没有输出。我这里缺少什么吗?
您是否尝试过在流应用程序启动后在您的主题中生成数据?
默认情况下,直接流使用配置auto.offset.reset=largest,这意味着当没有初始偏移量时,它会自动重置为最大偏移量,因此基本上您将只能读取在主题之后进入的新消息流应用程序启动。
归档时间: |
|
查看次数: |
1243 次 |
最近记录: |