我有一个基于tkinter的GUI程序在Python 3.4.1中运行.我在程序中运行了几个线程来从各个URL获取JSON数据.我想添加一些WebSocket功能,以便能够允许程序充当服务器,并允许多个客户端通过WebSocket连接到它并交换其他JSON数据.
我正在尝试使用Autobahn | Python WebSocket服务器进行asyncio.
我首先尝试在GUI程序下的单独线程中运行asyncio事件循环.但是,每次尝试都会产生'AssertionError:线程'Thread-1'中没有当前事件循环.
然后,我尝试使用标准库多处理程序包生成一个进程,该程序包在另一个进程中运行asyncio事件循环.当我尝试这个时,我没有得到任何异常,但WebSocket服务器也没有启动.
甚至可以在另一个Python程序的子进程中运行asyncio事件循环吗?
有没有办法将asyncio事件循环集成到当前多线程/ tkinter程序中?
更新 下面是我尝试运行初始测试的实际代码.
from autobahn.asyncio.websocket import WebSocketServerProtocol
from autobahn.asyncio.websocket import WebSocketServerFactory
import asyncio
from multiprocessing import Process
class MyServerProtocol(WebSocketServerProtocol):
def onConnect(self, request):
print("Client connecting: {0}".format(request.peer))
def onOpen(self):
print("WebSocket connection open.")
def onMessage(self, payload, isBinary):
if isBinary:
print("Binary message received: {0} bytes".format(len(payload)))
else:
print("Text message received: {0}".format(payload.decode('utf8')))
## echo back message verbatim
self.sendMessage(payload, isBinary)
def onClose(self, wasClean, code, reason):
print("WebSocket connection closed: {0}".format(reason))
def start_server():
factory = WebSocketServerFactory("ws://10.241.142.27:6900", debug = False) …
Run Code Online (Sandbox Code Playgroud) 我正在尝试从Matlab创建一个连接,以通过WebSocket流式传输JSON帧.我已经测试了我的高速公路安装的高速公路并使用以下方法进行了扭曲.
使用JSONlab工具箱将Matlab数据转换为JSON格式然后压缩和Base64编码数据的示例驱动程序代码.由于我还没有使用RPC工作,我正在使用命令行,我需要压缩和Base64编码,以避免线路长度和shell转义问题.
clear all
close all
python = '/usr/local/bin/python'
bc = '/Users/palmerc/broadcast_client.py'
i = uint32(1)
encoder = org.apache.commons.codec.binary.Base64
while true
tic;
packet = rand(100, 100);
json_packet = uint8(savejson('', packet));
compressed = CompressLib.compress(json_packet);
b64 = char(encoder.encode(compressed));
message = sprintf('%s %s %s', python, bc, b64);
status = system(message);
i = i + 1;
toc;
end
Run Code Online (Sandbox Code Playgroud)
客户端代码有两种被调用方式.您可以通过命令行传递消息,也可以创建BroadcastClient实例并调用sendMessage.
#!/usr/bin/env python
import sys
from twisted.internet import reactor
from txjsonrpc.web.jsonrpc import Proxy
class BroadcastClient():
def __init__(self, server=None): …
Run Code Online (Sandbox Code Playgroud) 全新的websockets.
我在理解如何与另一个应用程序中的python Autobahn/twisted进行交互时遇到了一些麻烦,似乎无法找到任何有用的示例.
我有一个运行的Python应用程序,需要在某些事件上发送两种类型的消息之一.第一个是向所有用户发送的广播消息.第二种类型是单个特定用户.
使用以下两个示例,我可以接收消息并发送响应.但是,我不需要从连接的客户端(连接到websockets服务器的客户端除外)接收任何内容,只发送给他们.
我玩过:https://github.com/tavendo/AutobahnPython/tree/master/examples/twisted/websocket/echo
另外(非高速公路相关):https://github.com/opiate/SimpleWebSocketServer
问题:
1 - 我正在尝试做什么?我是否可以拥有与Autobahn应用程序/服务器连接的外部应用程序,并向所有连接的用户或单个用户广播消息.
2 - 如果可能的话,有人能指出我正确的方向来学习如何做到这一点吗?
谢谢
我有一个python聊天服务器,使用twisted和autobahn websockets进行连接.
factory = MessageServerFactory("ws://localhost:9000", debug=debug, debugCodePaths=debug)
factory.protocol = MessageServerProtocol
factory.setProtocolOptions(allowHixie76=True)
listenWS(factory)
Run Code Online (Sandbox Code Playgroud)
这是服务器
import logging
from autobahn.websocket import WebSocketServerFactory, WebSocketServerProtocol
from DatabaseConnector import DbConnector
from LoginManager import LoginManager
from MessageTypes import MessageParser
class MessageServerProtocol(WebSocketServerProtocol):
def onOpen(self):
self.factory.register(self)
def onMessage(self, msg, binary):
if not binary:
self.factory.processMessage(self, msg)
def connectionLost(self, reason):
WebSocketServerProtocol.connectionLost(self, reason)
self.factory.unregister(self)
class MessageServerFactory(WebSocketServerFactory):
logging.basicConfig(filename='log/dastan.log',format='%(levelname)s:%(message)s',level=logging.WARNING)
def __init__(self, url, debug=False, debugCodePaths=False):
WebSocketServerFactory.__init__(self, url, debug=debug, debugCodePaths=debugCodePaths)
self.clients = {}
self.connector = DbConnector()
self.messages = MessageParser()
self.manager = LoginManager()
def register(self, client):
print …
Run Code Online (Sandbox Code Playgroud) 我有一个高速公路的Websocket服务器,onX
它的协议中有典型的功能.我的问题是我无法找到退出的方法onX
,同时继续做特定消息到达时我想做的各种事情.更具体地说,在我的onMessage
函数中,我有时会对API进行非常慢的HTTP请求.结果,发送websocket消息的客户端被服务器的onMessage
终结阻止.即使我做的self.sendMessage
还是reactor.callFromThread(<http request here>)
,或者self.transport.loseConnection()
从服务器端,在onMessage
块中,onMessage
仍在执行HTTP请求和我的客户等待.
这是我客户的代码:
@asyncio.coroutine
def send(command,userPath,token):
websocket = yield from websockets.connect('wss://127.0.0.1:7000',ssl=ssl.SSLContext(protocol=ssl.PROTOCOL_TLSv1_2))
data = json.dumps({"api_command":"session","body":command,"headers": {'X-User-Path': userPath, 'X-User-Token': token}})
response = {}
try:
yield from websocket.send(data)
finally:
yield from websocket.close()
if 'command' in response:
if response['command'] == 'ACK_SESSION_COMMAND' or response['command'] == 'ACK_INITIALIZATION':
return ('OK',200)
else:
return('',400)
Run Code Online (Sandbox Code Playgroud)
我甚至尝试websocket.send(data)
从客户端,但由于某种原因它不发送数据(我没有看到它们到达服务器).我不明白如何从onMessage
块中返回并继续执行我的HTTP请求.
为了解释我的情况,我只想将1 ssl websocket消息发送到我的服务器并立即关闭连接.任何能做到这一点的东西都适合我.
我正在编写一个应用程序,使用WebSocket从Apache Kafka向浏览器传输消息.当服务器暂停消耗消息并且偏移量落后时,websocket会在1000或2000条消息后关闭.这个数字似乎在每次测试运行之间交替.消息是utf8,通常在8字节片段中大约130-140字节.
当偏移量被捕获并且消息以稍慢的速率进入时,它会持续更长时间,但有时仍会在打开一段时间后随机关闭.
在服务器端,我正在使用Autobahn和Twisted,这个kafka-python库,以及浏览器端的本机WebSocket对象.在每种情况下,两端都会收到1006错误代码(意外关闭).
我目前只在RHEL和OSX上使用Firefox 31进行了测试.服务器的python版本是2.6.6.升级将是相当困难的,所以不幸的是我不能只看到一个新的python版本修复它; 但如果这肯定是问题,那就可以做到.
目前代码非常简单 - 当连接打开时,服务器开始使用Kafka消息并将其发送到websocket上.客户端使用jquery将其添加到简单的span元素后,将它们预先添加到正文中.
也许我在Twisted的异步设计中遗漏了一些东西,但我似乎无法找到一种方法来调用sendMessage()方法"externaly".我的意思是,发送消息而不仅仅是Twisted/AutobahnWebsockets的回调方法(比如onOpen或者在onMessage()上从服务器接收数据)
当然我可以启动一个线程并调用my_protocol_instance.sendMessage("hello")但这会破坏异步设计的所有目的吗?
在一个具体的例子中,我需要一个顶级包装类来打开连接并管理它,每当我需要时,我都会调用my_class.send_my_toplevel_message(msg).我该如何实现呢?
希望我对我的解释一直很清楚.
谢谢
只是想知道realm
AutobahnJS 的神秘领域是什么.从文档中,创建连接如下:
var connection = new autobahn.Connection({
url: 'ws://127.0.0.1:9000/',
realm: 'realm1'
});
Run Code Online (Sandbox Code Playgroud)
我没有设置领域服务器端,那么这个领域参数是什么?此外,它是必需的字段,必须意味着连接工作是必要的.有人可以启发我们吗?
我有一个使用Twisted的Autobahn Python客户端应用程序,它连接到Crossbar.io服务器.使用ReconnectingClientFactory丢失网络连接后,客户端应用程序可以成功重新连接.客户端在连接时注册被叫方名称,以便其他应用程序可以调用它.这始终适用于初始连接.
但是,从丢失的连接恢复时,无法重新注册被叫方名称,因为仍然从先前丢失的连接注册了被叫方名称.这会导致错误'wamp.error.procedure_already_exists'.由于被叫方名称注册仍与先前丢失的连接相关联,因此我将取消注册旧的被叫方名称.
我能看到的唯一解决方案是在每个连接上生成并注册唯一的新被叫方名称,以避免与先前注册的被叫方名称冲突.
有没有更好或更简单的方法来处理这个?似乎WAMP协议允许使用注册ID从另一个连接取消注册被调用者名称,但是高速公路Python客户端库似乎不允许这样做.
假设我们想要创建私人聊天室,用户可以在这里聊天.用户可以加入多个/ x组.在每种情况下,我都需要在服务器上创建一个唯一的组,并为这些用户订阅.哪种方法是推荐/更高效的方式:
[1]在服务器端,我创建了一个Room类,并为每个群聊添加新的房间频道,例如"chats/room-asdhqk1","chats/room-fwuefhw1","chats/room-awsdhqwd2".现在,某些指定用户可以加入此频道并添加到组客户端列表中.在客户端,用户订阅了他们添加到的组渠道.
问题:当用户在x通道中时,我需要在侧载后将他订阅到这些x通道.好:可以通过群组频道名称向特定群组广播,并且订阅该频道的所有用户将自动接收消息,因为他们已订阅了js部分内的频道.
[2]每个用户都有自己的频道,例如"notifications/user-1","notifications/user-2"....... 在服务器端,我在Rooms Class(无通道)中创建组.通过将用户添加到房间列表的子列表中,可以将用户添加到特定房间.当他们彼此聊天时,服务器迭代群聊的订阅用户,并向每个用户通知频道发送消息 - 这里根本没有群组频道 - 仅针对每个用户频道.
问题:我无法轻松地广播消息,我需要遍历每个订阅用户并将消息发送到他的通知通道.另外,我不能在前端使用"发布"方法轻松地将消息发布到频道,因为用户在不同的频道中分开.
好:最后,广播方法也会这样做:迭代订阅者列表.为了发送消息,我可以轻松实现与"发布"方法相同的RPC方法 - 查找组的订阅用户并将消息发送给他们.使用这种方法,用户不需要连接到客户端的x通道,他只有一个通道可以处理所有通道.
(我知道对于第二种方法,需要一个推动器(例如zmq)).
你有什么看法?我认为第二个更好,因为我不必在客户端订阅x-channels.如果用户需要首先连接到例如500个频道,则这将不具有性能.
问候.
autobahn ×10
python ×8
websocket ×7
twisted ×5
javascript ×2
apache-kafka ×1
autobahnws ×1
chat ×1
crossbar ×1
matlab ×1
php ×1
ratchet ×1
tkinter ×1