Eug*_*erg 7 python-2.7 spark-streaming pyspark kafka-python
我正在尝试编写一个简单的pyspark作业,它将从kafka代理主题接收数据,对该数据进行一些转换,并将转换后的数据放在不同的kafka代理主题上.
我有以下代码,它从kafka主题读取数据,但没有影响运行sendkafka函数:
from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from kafka import SimpleProducer, KafkaClient
def sendkafka(messages):
kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka)
for message in messages:
yield producer.send_messages('spark.out', message)
def main():
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 5)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
parsed = kvs.map(lambda (key, value): json.loads(value))
parsed.pprint()
sentRDD = kvs.mapPartitions(sendkafka)
sentRDD.count()
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
Run Code Online (Sandbox Code Playgroud)
我应该更改什么,以使我的sendkafka函数实际发送数据到spark.out kafka主题?
这是正确的代码,从Kafka读取到Spark,并将spark数据写回另一个kafka主题:
from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from kafka import SimpleProducer, KafkaClient
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
def handler(message):
records = message.collect()
for record in records:
producer.send('spark.out', str(record))
producer.flush()
def main():
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 10)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
kvs.foreachRDD(handler)
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
Run Code Online (Sandbox Code Playgroud)
运行方法是:
spark-submit --jars spark-streaming-kafka-assembly_2.10-1.6.1.jar s.py localhost:9092 test
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
7904 次 |
| 最近记录: |