joc*_*uiz 5 python rabbitmq pika spark-streaming pyspark
我有一个Apache Spark集群和一个RabbitMQ代理,我想使用该pyspark.streaming模块消费消息并计算一些指标.
问题是我只找到了这个包,但是用Java和Scala实现.除此之外,我没有在Python中找到任何示例或桥接实现.
我有一个使用Pika实现的消费者,但我不知道如何将有效负载传递给我StreamingContext.
该解决方案使用Spark Streaming中的pika 异步消费者示例和socketTextStream方法
.py文件Consumer课程在下面,if __name__ == '__main__':我们需要打开一个与 Spark Streaming 的 TCP 连接相对应的HOST套接字。PORT我们必须将套接字中的方法保存sendall到变量中,并将其传递给Consumer类
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((HOST, PORT))
s.listen(1)
conn, addr = s.accept()
dispatcher = conn.sendall #assigning sendall to dispatcher variable
consumer = Consumer(dispatcher)
try:
consumer.run()
except Exception as e:
consumer.stop()
s.close()
Run Code Online (Sandbox Code Playgroud)修改__init__Consumer中的方法,传递dispatcher
def __init__(self,dispatcher):
self._connection = None
self._channel = None
self._closing = False
self._consumer_tag = None
self._url = amqp_url
#new code
self._dispatcher = dispatcher
Run Code Online (Sandbox Code Playgroud)on_message在Consumer内部的方法中我们调用self._dispatcher发送bodyAMQP消息
def on_message(self, unused_channel, basic_deliver, properties, body):
self._channel.basic_ack(basic_deliver.delivery_tag)
try:
# we need an '\n' at the each row Spark socketTextStream
self._dispatcher(bytes(body.decode("utf-8")+'\n',"utf-8"))
except Exception as e:
raise
Run Code Online (Sandbox Code Playgroud)在Spark中,把ssc.socketTextStream(HOST, int(PORT))和HOST对应PORT到我们的TCP套接字。Spark 将管理连接
首先运行消费者,然后运行 Spark 应用程序
最后备注:
| 归档时间: |
|
| 查看次数: |
1436 次 |
| 最近记录: |