小编Cha*_*mer的帖子

pySpark Kafka Direct Streaming 更新 Zookeeper / Kafka Offset

目前我正在使用 Kafka/Zookeeper 和 pySpark (1.6.0)。我已经成功创建了一个 kafka 消费者,它使用KafkaUtils.createDirectStream().

所有流都没有问题,但我认识到,在我消费了一些消息后,我的 Kafka 主题没有更新到当前偏移量。

由于我们需要更新主题以在此处进行监控,这在某种程度上很奇怪。

在 Spark 的文档中,我发现了这个评论:

   offsetRanges = []

     def storeOffsetRanges(rdd):
         global offsetRanges
         offsetRanges = rdd.offsetRanges()
         return rdd

     def printOffsetRanges(rdd):
         for o in offsetRanges:
             print "%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset)

     directKafkaStream\
         .transform(storeOffsetRanges)\
         .foreachRDD(printOffsetRanges)
Run Code Online (Sandbox Code Playgroud)

如果您希望基于 Zookeeper 的 Kafka 监控工具显示流应用程序的进度,您可以使用它来自己更新 Zookeeper。

这是文档:http : //spark.apache.org/docs/1.6.0/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers

我在 Scala 中找到了一个解决方案,但找不到 Python 的等价物。这是 Scala 示例:http : //geeks.aretotally.in/spark-streaming-kafka-direct-api-store-offsets-in-zk/

但问题是,从那时起我如何能够更新动物园管理员?

python apache-kafka spark-streaming pyspark apache-zookeeper

2
推荐指数
1
解决办法
2026
查看次数