Phi*_*sky 8

如果要从Kafka主题中的记录创建RDD,请使用一组静态元组.

提供所有进口

from pyspark.streaming.kafka import KafkaUtils, OffsetRange
Run Code Online (Sandbox Code Playgroud)

然后你创建一个Kafka Brokers的字典

kafkaParams = {"metadata.broker.list": "host1:9092,host2:9092,host3:9092"}
Run Code Online (Sandbox Code Playgroud)

然后创建偏移对象

start = 0
until = 10
partition = 0
topic = 'topic'    
offset = OffsetRange(topic,partition,start,until)
offsets = [offset]
Run Code Online (Sandbox Code Playgroud)

最后,您创建RDD:

kafkaRDD = KafkaUtils.createRDD(sc, kafkaParams,offsets)
Run Code Online (Sandbox Code Playgroud)

要使用偏移创建Stream,您需要执行以下操作:

from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
from pyspark.streaming import StreamingContext
Run Code Online (Sandbox Code Playgroud)

然后使用sparkcontext创建sparkstreaming上下文

ssc = StreamingContext(sc, 1)
Run Code Online (Sandbox Code Playgroud)

接下来我们设置所有参数

 kafkaParams = {"metadata.broker.list": "host1:9092,host2:9092,host3:9092"}
 start = 0
 partition = 0
 topic = 'topic'    
Run Code Online (Sandbox Code Playgroud)

然后我们创建fromOffset Dictionary

topicPartion = TopicAndPartition(topic,partition)
fromOffset = {topicPartion: long(start)}
//notice that we must cast the int to long 
Run Code Online (Sandbox Code Playgroud)

最后我们创建了Stream

directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic],kafkaParams, 
fromOffsets=fromOffset)
Run Code Online (Sandbox Code Playgroud)


小智 5

你可以做:

from pyspark.streaming.kafka import TopicAndPartition
topic = "test"
brokers = "localhost:9092"
partition = 0
start = 0
topicpartion = TopicAndPartition(topic, partition)
fromoffset = {topicpartion: int(start)}
kafkaDStream = KafkaUtils.createDirectStream(spark_streaming,[topic], \
        {"metadata.broker.list": brokers}, fromOffsets = fromoffset)
Run Code Online (Sandbox Code Playgroud)

注意:Spark 2.2.0,python 3.6