Phi*_*sky 10 apache-kafka apache-spark pyspark
如何使用Pyspark中KafkaUtils.createDirectStream的特定偏移量Topic?
如果要从Kafka主题中的记录创建RDD,请使用一组静态元组.
提供所有进口
from pyspark.streaming.kafka import KafkaUtils, OffsetRange
Run Code Online (Sandbox Code Playgroud)
然后你创建一个Kafka Brokers的字典
kafkaParams = {"metadata.broker.list": "host1:9092,host2:9092,host3:9092"}
Run Code Online (Sandbox Code Playgroud)
然后创建偏移对象
start = 0
until = 10
partition = 0
topic = 'topic'
offset = OffsetRange(topic,partition,start,until)
offsets = [offset]
Run Code Online (Sandbox Code Playgroud)
最后,您创建RDD:
kafkaRDD = KafkaUtils.createRDD(sc, kafkaParams,offsets)
Run Code Online (Sandbox Code Playgroud)
要使用偏移创建Stream,您需要执行以下操作:
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
from pyspark.streaming import StreamingContext
Run Code Online (Sandbox Code Playgroud)
然后使用sparkcontext创建sparkstreaming上下文
ssc = StreamingContext(sc, 1)
Run Code Online (Sandbox Code Playgroud)
接下来我们设置所有参数
kafkaParams = {"metadata.broker.list": "host1:9092,host2:9092,host3:9092"}
start = 0
partition = 0
topic = 'topic'
Run Code Online (Sandbox Code Playgroud)
然后我们创建fromOffset Dictionary
topicPartion = TopicAndPartition(topic,partition)
fromOffset = {topicPartion: long(start)}
//notice that we must cast the int to long
Run Code Online (Sandbox Code Playgroud)
最后我们创建了Stream
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic],kafkaParams,
fromOffsets=fromOffset)
Run Code Online (Sandbox Code Playgroud)
小智 5
你可以做:
from pyspark.streaming.kafka import TopicAndPartition
topic = "test"
brokers = "localhost:9092"
partition = 0
start = 0
topicpartion = TopicAndPartition(topic, partition)
fromoffset = {topicpartion: int(start)}
kafkaDStream = KafkaUtils.createDirectStream(spark_streaming,[topic], \
{"metadata.broker.list": brokers}, fromOffsets = fromoffset)
Run Code Online (Sandbox Code Playgroud)
注意:Spark 2.2.0,python 3.6