从火花中的kafka消息中获取主题

Ela*_*dor 5 apache-kafka apache-spark spark-streaming

在我们的火花流媒体工作中,我们从kafka中读取流媒体中的消息.

为此,我们使用KafkaUtils.createDirectStream返回的API JavaPairInputDStreamfrom.

消息是通过以下方式从kafka(来自三个主题 - test1,test2,test3)读取的:

private static final String TOPICS = "test1,test2,test3";
HashSet<String> topicsSet = new HashSet<>(Arrays.asList(TOPICS.split(",")));

HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", BROKERS);

JavaPairInputDStream<String, String> messages = 
KafkaUtils.createDirectStream(
                streamingContext,
                String.class,
                String.class,
                StringDecoder.class,
                StringDecoder.class,
                kafkaParams,
                topicsSet
                );
Run Code Online (Sandbox Code Playgroud)

我们希望以不同的方式处理来自每个主题的消息,为了实现这一点,我们需要知道每条消息的主题名称.

所以我们做了以下事情:

JavaDStream<String> lines = messages.map(new SplitToLinesFunction());
Run Code Online (Sandbox Code Playgroud)

这是执行SplitToLinesFunction:

public class SplitToLinesFunction implements Function<Tuple2<String, String>, String> {
    @Override
    public String call(Tuple2<String, String> tuple2) 
    {
        System.out.println(tuple2._1);
        return tuple2._2();
    }
}
Run Code Online (Sandbox Code Playgroud)

问题是,它tuple2._1是null,我们假设tuple2._1它将包含一些元数据,例如消息来自的主题/分区的名称.

但是,当我们打印时tuple2._1,它是空的.

我们的问题 - 有没有办法在kafka中发送主题名称,以便在spark-streaming代码中,tuple2._1它将包含它(而不是null)?

请注意,我们还尝试从spark-streaming kafka-integration教程中提到的DStream中获取主题名称:

但它返回发送到的所有主题KafkaUtils.createDirectStream,而不是从消息(属于当前RDD)到达的特定主题.

因此,它无法帮助我们从RDD中发送消息的位置识别主题的名称.

编辑

回应大卫的回答 - 我尝试使用MessageAndMetadata这样的:

        Map<TopicAndPartition, Long> topicAndPartition = new HashMap();
        topicAndPartition.put(new TopicAndPartition("test1", 0), 1L);
        topicAndPartition.put(new TopicAndPartition("test2", 0), 1L);
        topicAndPartition.put(new TopicAndPartition("test3", 0), 1L);

        class MessageAndMetadataFunction implements Function<MessageAndMetadata<String, String>, String>
        {

            @Override
            public String call(MessageAndMetadata<String, String> v1)
                    throws Exception {
                // nothing is printed here
                System.out.println("topic = " + v1.topic() + ", partition = " + v1.partition());
                return v1.topic();
            }

        }

        JavaInputDStream<String> messages = KafkaUtils.createDirectStream(streamingContext, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, topicAndPartition, new MessageAndMetadataFunction());
        messages.foreachRDD(new VoidFunction() {

            @Override
            public void call(Object t) throws Exception {
                JavaRDD<String> rdd = (JavaRDD<String>)t;
                OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
                // here all the topics kafka listens to are printed, but that doesn't help
                for (OffsetRange offset : offsets) {
                    System.out.println(offset.topic() + " " + offset.partition() + " " + offset.fromOffset() + " " + offset.untilOffset());
                }
            }
        });
Run Code Online (Sandbox Code Playgroud)

问题是该MessageAndMetadataFunction.call方法中没有打印任何内容.我应该修复什么才能在方法中获得该RDD的相关主题MessageAndMetadataFunction.call

Dav*_*fin 6

使用其中一个版本createDirectStreammessageHandler函数作为参数.这是我做的:

val messages = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder, (String, Array[Byte]](
  ssc,
  kafkaParams,
  getPartitionsAndOffsets(topics).map(t => (t._1, t._2._1).toMap,
  (msg: MessageAndMetadata[Array[Byte],Array[Byte]]) => { (msg.topic, msg.message)}
)
Run Code Online (Sandbox Code Playgroud)

那里的东西对你没有任何意义 - 相关的部分是

(msg: MessageAndMetadata[Array[Byte],Array[Byte]]) => { (msg.topic, msg.message)}
Run Code Online (Sandbox Code Playgroud)

如果你不熟悉Scala,所有的功能都是返回一个Tuple2包含msg.topicmsg.message.您的函数需要返回这两个函数才能在下游使用它们.你可以只返回整个MessageAndMetadata对象,这会给你一些其他有趣的领域.但是,如果你只想要topicmessage,那么使用上面的.