如何正确使用pyspark向kafka经纪人发送数据?

Eug*_*erg 7 python-2.7 spark-streaming pyspark kafka-python

我正在尝试编写一个简单的pyspark作业,它将从kafka代理主题接收数据,对该数据进行一些转换,并将转换后的数据放在不同的kafka代理主题上.

我有以下代码,它从kafka主题读取数据,但没有影响运行sendkafka函数:

from pyspark import SparkConf, SparkContext

from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from kafka import SimpleProducer, KafkaClient

def sendkafka(messages):
    kafka = KafkaClient("localhost:9092")
    producer = SimpleProducer(kafka)
    for message in messages:
        yield producer.send_messages('spark.out', message)

def main():
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 5)

    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    parsed = kvs.map(lambda (key, value): json.loads(value))
    parsed.pprint()

    sentRDD = kvs.mapPartitions(sendkafka)
    sentRDD.count()

    ssc.start()
    ssc.awaitTermination()
if __name__ == "__main__":

   main()
Run Code Online (Sandbox Code Playgroud)

我应该更改什么,以使我的sendkafka函数实际发送数据到spark.out kafka主题?

Eug*_*erg 9

这是正确的代码,从Kafka读取到Spark,并将spark数据写回另一个kafka主题:

from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from kafka import SimpleProducer, KafkaClient
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092')

def handler(message):
    records = message.collect()
    for record in records:
        producer.send('spark.out', str(record))
        producer.flush()

def main():
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 10)

    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    kvs.foreachRDD(handler)

    ssc.start()
    ssc.awaitTermination()
if __name__ == "__main__":

   main()
Run Code Online (Sandbox Code Playgroud)

运行方法是:

spark-submit --jars spark-streaming-kafka-assembly_2.10-1.6.1.jar s.py localhost:9092 test
Run Code Online (Sandbox Code Playgroud)

  • @bey这个答案只能作为本地模型,而不是集群, (2认同)