Ela*_*dor 5 apache-kafka apache-spark spark-streaming
在我们的火花流媒体工作中,我们从kafka中读取流媒体中的消息.
为此,我们使用KafkaUtils.createDirectStream返回的API JavaPairInputDStreamfrom.
消息是通过以下方式从kafka(来自三个主题 - test1,test2,test3)读取的:
private static final String TOPICS = "test1,test2,test3";
HashSet<String> topicsSet = new HashSet<>(Arrays.asList(TOPICS.split(",")));
HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", BROKERS);
JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(
streamingContext,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
Run Code Online (Sandbox Code Playgroud)
我们希望以不同的方式处理来自每个主题的消息,为了实现这一点,我们需要知道每条消息的主题名称.
所以我们做了以下事情:
JavaDStream<String> lines = messages.map(new SplitToLinesFunction());
Run Code Online (Sandbox Code Playgroud)
这是执行SplitToLinesFunction:
public class SplitToLinesFunction implements Function<Tuple2<String, String>, String> {
@Override
public String call(Tuple2<String, String> tuple2)
{
System.out.println(tuple2._1);
return tuple2._2();
}
}
Run Code Online (Sandbox Code Playgroud)
问题是,它tuple2._1是null,我们假设tuple2._1它将包含一些元数据,例如消息来自的主题/分区的名称.
但是,当我们打印时tuple2._1,它是空的.
我们的问题 - 有没有办法在kafka中发送主题名称,以便在spark-streaming代码中,tuple2._1它将包含它(而不是null)?
请注意,我们还尝试从spark-streaming kafka-integration教程中提到的DStream中获取主题名称:
但它返回发送到的所有主题KafkaUtils.createDirectStream,而不是从消息(属于当前RDD)到达的特定主题.
因此,它无法帮助我们从RDD中发送消息的位置识别主题的名称.
编辑
回应大卫的回答 - 我尝试使用MessageAndMetadata这样的:
Map<TopicAndPartition, Long> topicAndPartition = new HashMap();
topicAndPartition.put(new TopicAndPartition("test1", 0), 1L);
topicAndPartition.put(new TopicAndPartition("test2", 0), 1L);
topicAndPartition.put(new TopicAndPartition("test3", 0), 1L);
class MessageAndMetadataFunction implements Function<MessageAndMetadata<String, String>, String>
{
@Override
public String call(MessageAndMetadata<String, String> v1)
throws Exception {
// nothing is printed here
System.out.println("topic = " + v1.topic() + ", partition = " + v1.partition());
return v1.topic();
}
}
JavaInputDStream<String> messages = KafkaUtils.createDirectStream(streamingContext, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, topicAndPartition, new MessageAndMetadataFunction());
messages.foreachRDD(new VoidFunction() {
@Override
public void call(Object t) throws Exception {
JavaRDD<String> rdd = (JavaRDD<String>)t;
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
// here all the topics kafka listens to are printed, but that doesn't help
for (OffsetRange offset : offsets) {
System.out.println(offset.topic() + " " + offset.partition() + " " + offset.fromOffset() + " " + offset.untilOffset());
}
}
});
Run Code Online (Sandbox Code Playgroud)
问题是该MessageAndMetadataFunction.call方法中没有打印任何内容.我应该修复什么才能在方法中获得该RDD的相关主题MessageAndMetadataFunction.call?
使用其中一个版本createDirectStream将messageHandler函数作为参数.这是我做的:
val messages = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder, (String, Array[Byte]](
ssc,
kafkaParams,
getPartitionsAndOffsets(topics).map(t => (t._1, t._2._1).toMap,
(msg: MessageAndMetadata[Array[Byte],Array[Byte]]) => { (msg.topic, msg.message)}
)
Run Code Online (Sandbox Code Playgroud)
那里的东西对你没有任何意义 - 相关的部分是
(msg: MessageAndMetadata[Array[Byte],Array[Byte]]) => { (msg.topic, msg.message)}
Run Code Online (Sandbox Code Playgroud)
如果你不熟悉Scala,所有的功能都是返回一个Tuple2包含msg.topic和msg.message.您的函数需要返回这两个函数才能在下游使用它们.你可以只返回整个MessageAndMetadata对象,这会给你一些其他有趣的领域.但是,如果你只想要topic和message,那么使用上面的.
| 归档时间: |
|
| 查看次数: |
4246 次 |
| 最近记录: |