如何使用Pyspark Streaming模块实现RabbitMQ使用者?

joc*_*uiz 5 python rabbitmq pika spark-streaming pyspark

我有一个Apache Spark集群和一个RabbitMQ代理,我想使用该pyspark.streaming模块消费消息并计算一些指标.

问题是我只找到了这个包,但是用JavaScala实现.除此之外,我没有在Python中找到任何示例或桥接实现.

我有一个使用Pika实现的消费者,但我不知道如何将有效负载传递给我StreamingContext.

joc*_*uiz 4

该解决方案使用Spark Streaming中的pika 异步消费者示例socketTextStream方法

  1. 下载示例并将其另存为.py文件
  2. 修改该文件以使用您自己的 RabbitMQ 凭据和连接参数。就我而言,我必须修改Consumer课程
  3. 在下面,if __name__ == '__main__':我们需要打开一个与 Spark Streaming 的 TCP 连接相对应的HOST套接字。PORT我们必须将套接字中的方法保存sendall到变量中,并将其传递给Consumer

    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
      s.bind((HOST, PORT))
      s.listen(1)
      conn, addr = s.accept()
      dispatcher = conn.sendall #assigning sendall to dispatcher variable
    consumer = Consumer(dispatcher)
    try:
      consumer.run()
    except Exception as e:
      consumer.stop()
      s.close()
    
    Run Code Online (Sandbox Code Playgroud)
  4. 修改__init__Consumer中的方法,传递dispatcher

    def __init__(self,dispatcher):
      self._connection = None
      self._channel = None
      self._closing = False
      self._consumer_tag = None
      self._url = amqp_url
      #new code
      self._dispatcher = dispatcher
    
    Run Code Online (Sandbox Code Playgroud)
  5. on_message在Consumer内部的方法中我们调用self._dispatcher发送bodyAMQP消息

    def on_message(self, unused_channel, basic_deliver, properties, body):
      self._channel.basic_ack(basic_deliver.delivery_tag)
      try:
        # we need an '\n' at the each row Spark socketTextStream
        self._dispatcher(bytes(body.decode("utf-8")+'\n',"utf-8"))
      except Exception as e:
        raise
    
    Run Code Online (Sandbox Code Playgroud)
  6. 在Spark中,把ssc.socketTextStream(HOST, int(PORT))HOST对应PORT到我们的TCP套接字。Spark 将管理连接

  7. 首先运行消费者,然后运行 ​​Spark 应用程序

最后备注:

  • 尝试在不同的机器上运行您的消费者,而不是您的 Spark 机器
  • 任何超过 10000 的端口都应该没问题。不要让内核打开一些随机端口
  • 平台:Linux Debian 7 和 8、Ubuntu 14.04 和 16.04
  • 皮卡版本0.10.0
  • Python版本3.5.2
  • Spark 版本 1.6.1、1.6.2 和 2.0.0