只是为了让事情变得棘手,我想使用来自rabbitMQ队列的消息.现在我知道有一个针对兔子的MQTT插件(https://www.rabbitmq.com/mqtt.html).
但是,我似乎无法在Spark消耗由pika生成的消息的情况下进行示例工作.
例如,我在这里使用简单的wordcount.py程序(https://spark.apache.org/docs/1.2.0/streaming-programming-guide.html),看看我是否可以在下面看到一个消息生产者办法:
import sys
import pika
import json
import future
import pprofile
def sendJson(json):
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='analytics', durable=True)
channel.queue_bind(exchange='analytics_exchange',
queue='analytics')
channel.basic_publish(exchange='analytics_exchange', routing_key='analytics',body=json)
connection.close()
if __name__ == "__main__":
with open(sys.argv[1],'r') as json_file:
sendJson(json_file.read())
Run Code Online (Sandbox Code Playgroud)
Sparkstreaming 消费者如下:
import sys
import operator
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.mqtt import MQTTUtils
sc = SparkContext(appName="SS")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")
#ssc.setLogLevel("ERROR")
#RabbitMQ
"""EXCHANGE = 'analytics_exchange'
EXCHANGE_TYPE = 'direct'
QUEUE …
Run Code Online (Sandbox Code Playgroud) 有什么方法可以找到来自其他客户端的所有连接客户端详细信息(IP和名称)吗?我知道有一个主题"$ SYS/broker/clients/active"给出了当前连接客户端的数量,但是如果我想了解更多关于每个连接客户端的信息,有什么办法吗?
我正在开发一个解决方案,其中客户端的数量(使用无线网络)连接到位于服务器上的MQTT代理.我还将在同一台机器上运行另一个客户端并连接到代理,它将观察是否有任何新客户端与代理连接或者是否有断开连接的客户端.当新客户端连接或连接客户端断开连接时,我可以在代理控制台上看到消息.我们可以从连接到代理的客户端获得类似的东西吗?请建议实现这一目标的最佳方法是什么?
提前致谢.
-Dilip
我一直在关注AWS IoT文档试图开始使用http://docs.aws.amazon.com/iot/latest/developerguide/protocols.html#mqtt-ws上的MQTT教程 来设置WebSocket连接到Web应用程序中的AWS IoT.第一步是通过对通过遵循教程中的步骤创建的端点发出GET请求来启动WebSocket连接,但是我收到此错误:
XMLHttpRequest cannot load <my endpoint> Cross origin requests are only supported for protocol schemes: http, data, chrome, chrome-extension, https, chrome-extension-resource.
我想知道是否有人遇到过这个错误,如果有的话,他们是如何解决的?
按照本Auth0文章中的说明,我使用"JWT"作为用户名并使用JWT令牌作为密码成功验证了MQTT客户端.
然而,在我的用例中,JWT令牌是短暂的.客户端必须在当前令牌的到期日期之前获取新令牌,然后将其提供给MQTT服务器.否则,服务器终止连接.
我的问题是:如何实现令牌更新?它是来自客户的发布消息吗?哪个主题?我是否断开客户端连接,并让客户端使用新令牌重新进行身份验证?或者还有另一种方式吗?
我们需要构建一个可以与运行Android变体的某些嵌入式设备通信的服务器.我们需要能够向设备发送命令,并接收响应.一个简单的命令可能是询问设备的状态.我们不会有HTTP,所以我们需要让客户端/设备与服务器建立连接.
我们正在考虑使用MQTT,因为它具有许多不错的属性(QoS,轻量级,为物联网构建),但它本身不支持请求响应工作流.
我们已经考虑过在MQTT之上构建RPC,但在我们开始之前,我只是想让人们对此问题有所了解.Websockets,WAMP,ZeroMQ会更好吗?
编辑:
Q1:
我们甚至需要RPC吗?
Q2:
有没有一种方法来构建系统,我总是发送异步类型的消息,仍然提供良好的用户体验?
Q3:
任何例子?
寻找实施示例并亲身体验使用单个设备构建物联网通信系统的经验.
我在 Win7 PC 上安装了 mosquitto,我希望它可以侦听许多端口,因此我根据 mosquitto 文档和网上找到的一些教程/示例修改了 mosquitto.conf。这些是我对 mosquitto.conf 所做的修改:
\n\n # Plain MQTT protocol\n listener 1883\n\n # MQTT over TLS/SSL\n listener 8883\n protocol mqtt\n require_certificate false\n\n # MQTT over TLS/SSL with certificates\n listener 8884\n protocol mqtt\n require_certificate true\n certfile cert.pem\n cafile chain.pem\n keyfile privkey.pem\n\n # Plain WebSockets configuration\n listener 9001\n protocol websockets\n\n # WebSockets over TLS/SSL\n listener 9883\n protocol websockets\n require_certificate true\n cafile mosquitto/ca.crt\n certfile mosquitto/hostname.crt\n keyfile mosquitto/hostname.key\n\n # Log system configuration\n log_type all \n #log_dest file C:/Dati/mosquitto/mosquitto.log\n log_facility …
Run Code Online (Sandbox Code Playgroud) 我们刚刚开始为Android构建我们自己的推送通知系统(由于客户的要求),并找到了Eclipse Paho(http://www.eclipse.org/paho/).不用说,这个项目真的令人兴奋.
Android的问题是,如果CPU处于睡眠状态,MQTT客户端可能无法以设置的间隔发送ping.解决方法是使用AlarmManager将其唤醒并完成工作.Android文档说:
只要警报接收器的onReceive()方法正在执行,警报管理器就会保持CPU唤醒锁定.这可以保证在您完成广播处理之后手机不会睡眠.一旦onReceive()返回,Alarm Manager就会释放此唤醒锁.这意味着在某些情况下,只要onReceive()方法完成,手机就会休眠.
http://developer.android.com/reference/android/app/AlarmManager.html
我需要确保我可以在该onReceive()方法中发送ping命令,而CPU有PARTIAL_WAKE_LOCK,所以我正在寻找一种手动发送ping到服务器的方法,但似乎客户端没有暴露任何这样的方法.我错过了什么吗?或者,除了发布我自己的"ping消息"之外,这里的解决方法是什么?我想避免这种情况,因为:
当我第一次运行Mosquitto(MQTT)经纪人时没有问题.但是,当他第二次使用默认配置运行它时,由于以下错误,我无法成功运行代码:
1379497253:mosquitto版本1.2(构建日期2013-09-17 17:59:39 + 0530)启动1379497253:使用默认配置.1379497253:在端口1883上打开ipv6侦听套接字.1379497253:错误:地址已在使用中
我想知道如何从命令行停止代理.如果有人可以提供帮助,那就太好了.
我正在考虑将mosquitto作为MQTT经纪人.根据我的阅读,我意识到Mosquitto不支持水平缩放.
到目前为止,我的要求的所有其他标准都可以通过Mosquitto来满足.
我不确定这个问题是否过于通用或广泛,但我想知道的是,如果有任何方法可以实现扩展功能,还可以为Mosquitto实现负载平衡等.
我在Raspberry Pi上设置了MQTT,并为代理配置了Arduino Uno,但我在/var/log/mosquitto/mosquitto.log文件中看到以下条目:
New connection from 192.168.10.114 on port 1883.
Socket error on client <unknown>, disconnecting.
Run Code Online (Sandbox Code Playgroud)
Pi设置为ETH0连接到我的本地LAN,IP地址为192.168.1.50
Pi上还有一个WiFi AP设置.Arduino Uno通过WiFi连接发送/接收MQTT消息.WiFi AP的IP地址为192.168.10.1,并通过提供DHCP租约dnsmasq
.
我尝试在本地MQTT代理服务器(Pi)上发布和订阅测试并得到相同的错误:
Command:
mosquitto_sub -h 192.168.10.1 -t topic
mosquitto.log:
New connection from 192.168.10.1 on port 1883.
New client connected from 192.168.10.1 as mosqsub/1837-raspberryp (cl, k60).
Socket error on client <unknown>, disconnecting.
Run Code Online (Sandbox Code Playgroud)
这是/etc/mosquitto/mosquitto.conf:
pid_file /var/run/mosquitto.pid
persistence true
log_dest file /var/log/mosquitto/mosquitto.log
allow_anonymous true
include_dir /etc/mosquitto/conf.d
Run Code Online (Sandbox Code Playgroud)
sudo服务mosquitto停止sudo服务mosquitto开始:
mosquitto version 1.4.8 terminating
mosquitto version 1.4.8 (build date Sun, 14 Feb 2016 15:06:55 …
Run Code Online (Sandbox Code Playgroud)