Cam*_*mer 14 python matlab twisted websocket autobahn
我正在尝试从Matlab创建一个连接,以通过WebSocket流式传输JSON帧.我已经测试了我的高速公路安装的高速公路并使用以下方法进行了扭曲.
使用JSONlab工具箱将Matlab数据转换为JSON格式然后压缩和Base64编码数据的示例驱动程序代码.由于我还没有使用RPC工作,我正在使用命令行,我需要压缩和Base64编码,以避免线路长度和shell转义问题.
clear all
close all
python = '/usr/local/bin/python'
bc = '/Users/palmerc/broadcast_client.py'
i = uint32(1)
encoder = org.apache.commons.codec.binary.Base64
while true
tic;
packet = rand(100, 100);
json_packet = uint8(savejson('', packet));
compressed = CompressLib.compress(json_packet);
b64 = char(encoder.encode(compressed));
message = sprintf('%s %s %s', python, bc, b64);
status = system(message);
i = i + 1;
toc;
end
Run Code Online (Sandbox Code Playgroud)
客户端代码有两种被调用方式.您可以通过命令行传递消息,也可以创建BroadcastClient实例并调用sendMessage.
#!/usr/bin/env python
import sys
from twisted.internet import reactor
from txjsonrpc.web.jsonrpc import Proxy
class BroadcastClient():
def __init__(self, server=None):
self.proxy = Proxy(server)
def errorMessage(self, value):
print 'Error ', value
def sendMessage(self, message):
rc = self.proxy.callRemote('broadcastMessage', message).addCallback(lambda _: reactor.stop())
rc.addErrback(self.errorMessage)
def main(cli_arguments):
if len(cli_arguments) > 1:
message = cli_arguments[1]
broadcastClient = BroadcastClient('http://127.0.0.1:7080/')
broadcastClient.sendMessage(message)
reactor.run()
if __name__ == '__main__':
main(sys.argv)
Run Code Online (Sandbox Code Playgroud)
服务器在7080上提供RPC客户端,在8080上提供Web客户端,在9080上使用TXJSONRPC,Twisted和Autobahn提供WebSocket.该高速公路Web客户端对调试有用的,应放置在同一个目录服务器代码.
#!/usr/bin/env python
import sys
from twisted.internet import reactor
from twisted.python import log
from twisted.web.server import Site
from twisted.web.static import File
from txjsonrpc.web import jsonrpc
from autobahn.twisted.websocket import WebSocketServerFactory, \
WebSocketServerProtocol, \
listenWS
class BroadcastServerProtocol(WebSocketServerProtocol):
def onOpen(self):
self.factory.registerClient(self)
def onMessage(self, payload, isBinary):
if not isBinary:
message = "{} from {}".format(payload.decode('utf8'), self.peer)
self.factory.broadcastMessage(message)
def connectionLost(self, reason):
WebSocketServerProtocol.connectionLost(self, reason)
self.factory.unregisterClient(self)
class BroadcastServerFactory(WebSocketServerFactory):
"""
Simple broadcast server broadcasting any message it receives to all
currently connected clients.
"""
def __init__(self, url, debug=False, debugCodePaths=False):
WebSocketServerFactory.__init__(self, url, debug=debug, debugCodePaths=debugCodePaths)
self.clients = []
def registerClient(self, client):
if client not in self.clients:
print("registered client {}".format(client.peer))
self.clients.append(client)
def unregisterClient(self, client):
if client in self.clients:
print("unregistered client {}".format(client.peer))
self.clients.remove(client)
def broadcastMessage(self, message):
print("broadcasting message '{}' ..".format(message))
for client in self.clients:
client.sendMessage(message.encode('utf8'))
print("message sent to {}".format(client.peer))
class BroadcastPreparedServerFactory(BroadcastServerFactory):
"""
Functionally same as above, but optimized broadcast using
prepareMessage and sendPreparedMessage.
"""
def broadcastMessage(self, message):
print("broadcasting prepared message '{}' ..".format(message))
preparedMessage = self.prepareMessage(message.encode('utf8'), isBinary=False)
for client in self.clients:
client.sendPreparedMessage(preparedMessage)
print("prepared message sent to {}".format(client.peer))
class MatlabClient(jsonrpc.JSONRPC):
factory = None
def jsonrpc_broadcastMessage(self, message):
if self.factory is not None:
print self.factory.broadcastMessage(message)
if __name__ == '__main__':
if len(sys.argv) > 1 and sys.argv[1] == 'debug':
log.startLogging(sys.stdout)
debug = True
else:
debug = False
factory = BroadcastPreparedServerFactory(u"ws://127.0.0.1:9000",
debug=debug,
debugCodePaths=debug)
factory.protocol = BroadcastServerProtocol
listenWS(factory)
matlab = MatlabClient()
matlab.factory = factory
reactor.listenTCP(7080, Site(matlab))
webdir = File(".")
web = Site(webdir)
reactor.listenTCP(8080, web)
reactor.run()
Run Code Online (Sandbox Code Playgroud)
首先一个说明,如果你有麻烦蟒蛇从MATLAB工作,你需要确保你在Python中的系统上的正确版本使用指向pyversion命令,并且可以使用更正pyversion('/path/to/python')
clear all
close all
i = uint32(1)
while true
tic;
packet = rand(100, 100);
json_packet = uint8(savejson('', packet));
compressed = CompressLib.compress(json_packet);
b64 = char(encoder.encode(compressed));
bc.sendMessage(py.str(b64.'));
py.twisted.internet.reactor.run % This won't work.
i = i + 1;
toc;
end
Run Code Online (Sandbox Code Playgroud)
另一个尝试是使用Matlab webwrite来POST到服务器.结果webwrite只是通过传递正确的数据将数据转换为JSON weboptions.
options = weboptions('MediaType', 'application/json');
data = struct('Matrix', rand(100, 100));
webwrite(server, data, options);
Run Code Online (Sandbox Code Playgroud)
这有效,但每封邮件的速度很慢(~0.1秒).我应该提到矩阵不是我发送的真实数据,真实数据序列化为每个消息约280000字节,但这提供了合理的近似值.
我怎样才能打电话bc.sendMessage让它正确设法让反应堆运行或以另一种更快的方式解决这个问题?
首先,您需要确保使用正确的python二进制文件.在Mac上,您可能正在使用系统标准版本而不是Homebrew安装的版本.使用以下命令检查python安装的位置:
pyversion
Run Code Online (Sandbox Code Playgroud)
您可以使用以下方法将Matlab指向正确的版本:
pyversion('path/to/python')
Run Code Online (Sandbox Code Playgroud)
这可能需要你重启python.
如上所述,我使用Twisted将我的Matlab数据多路复用到WebSocket客户端.我找到解决这个问题的最好方法就是创建一个处理POSTS的服务器,然后将其传递给WebSocket客户端.压缩只会减慢速度,所以每个请求发送280 kB的JSON,每条消息大约需要0.05秒.我希望这会更快,.01秒,但这是一个好的开始.
server = 'http://127.0.0.1:7080/update.json';
headers = py.dict(pyargs('Charset','UTF-8','Content-Type','application/json'));
while true
tic;
packet = rand(100, 100);
json_packet = savejson('', packet);
r = py.requests.post(server, pyargs('data', json_packet, 'headers', headers));
toc;
end
Run Code Online (Sandbox Code Playgroud)
我本可以使用Matlab webwrite函数,但通常我发现调用python更灵活.
import sys
from twisted.internet import reactor
from twisted.python import log
from twisted.web.resource import Resource
from twisted.web.server import Site
from twisted.web.static import File
from autobahn.twisted.websocket import WebSocketServerFactory, \
WebSocketServerProtocol, \
listenWS
class BroadcastServerProtocol(WebSocketServerProtocol):
def onOpen(self):
self.factory.registerClient(self)
def onMessage(self, payload, isBinary):
if not isBinary:
message = "{} from {}".format(payload.decode('utf8'), self.peer)
self.factory.broadcastMessage(message)
def connectionLost(self, reason):
WebSocketServerProtocol.connectionLost(self, reason)
self.factory.unregisterClient(self)
class BroadcastServerFactory(WebSocketServerFactory):
def __init__(self, url, debug=False, debugCodePaths=False):
WebSocketServerFactory.__init__(self, url, debug=debug, debugCodePaths=debugCodePaths)
self.clients = []
def registerClient(self, client):
if client not in self.clients:
print("registered client {}".format(client.peer))
self.clients.append(client)
def unregisterClient(self, client):
if client in self.clients:
print("unregistered client {}".format(client.peer))
self.clients.remove(client)
def broadcastMessage(self, message):
for client in self.clients:
client.sendMessage(message.encode('utf8'))
class BroadcastPreparedServerFactory(BroadcastServerFactory):
def broadcastMessage(self, message, isBinary=False):
if isBinary is True:
message = message.encode('utf8')
preparedMessage = self.prepareMessage(message, isBinary=isBinary)
for client in self.clients:
client.sendPreparedMessage(preparedMessage)
class WebClient(Resource):
webSocket = None
def render_POST(self, request):
self.webSocket.broadcastMessage(request.content.read())
return 'OK'
if __name__ == '__main__':
if len(sys.argv) > 1 and sys.argv[1] == 'debug':
log.startLogging(sys.stdout)
debug = True
else:
debug = False
factory = BroadcastPreparedServerFactory(u"ws://127.0.0.1:9000",
debug=debug,
debugCodePaths=debug)
factory.protocol = BroadcastServerProtocol
listenWS(factory)
root = Resource()
webClient = WebClient()
webClient.webSocket = factory
root.putChild('update.json', webClient)
webFactory = Site(root)
reactor.listenTCP(7080, webFactory)
webdir = File(".")
web = Site(webdir)
reactor.listenTCP(8080, web)
reactor.run()
Run Code Online (Sandbox Code Playgroud)
我摆脱了RPC尝试,只是直接POST.仍有很多提升绩效的机会.