Lia*_*nen 8 python websocket python-3.x python-asyncio
在处理从websocket服务器推送到client.py的数据时,我遇到了很长的(3小时)延迟(编辑:一开始就是短暂的延迟,然后在一天中变得更长).我知道它不会被服务器延迟.
例如,我每5秒钟看到keep_alive日志事件及其各自的时间戳.这样运行顺利.但是当我看到在日志中处理的数据帧实际上是在服务器发送它之后 3小时.我是在做什么来推迟这个过程吗?
我是否正确地调用了我的协程'keep_alive'?keep_alive只是服务器的一条消息,用于保持连接的活动状态.服务器回显消息.我也记得太多了吗?这可能会延迟处理(我不这么认为,因为我看到记录事件立即发生).
async def keep_alive(websocket):
"""
This only needs to happen every 30 minutes. I currently have it set to every 5 seconds.
"""
await websocket.send('Hello')
await asyncio.sleep(5)
async def open_connection_test():
"""
Establishes web socket (WSS). Receives data and then stores in csv.
"""
async with websockets.connect(
'wss://{}:{}@localhost.urlname.com/ws'.format(user,pswd), ssl=True, ) as websocket:
while True:
"""
Handle message from server.
"""
message = await websocket.recv()
if message.isdigit():
# now = datetime.datetime.now()
rotating_logger.info ('Keep alive message: {}'.format(str(message)))
else:
jasonified_message = json.loads(message)
for key in jasonified_message:
rotating_logger.info ('{}: \n\t{}\n'.format(key,jasonified_message[key]))
"""
Store in a csv file.
"""
try:
convert_and_store(jasonified_message)
except PermissionError:
convert_and_store(jasonified_message, divert = True)
"""
Keep connection alive.
"""
await keep_alive(websocket)
"""
Logs any exceptions in logs file.
"""
try:
asyncio.get_event_loop().run_until_complete(open_connection())
except Exception as e:
rotating_logger.info (e)
Run Code Online (Sandbox Code Playgroud)
编辑:从文档 - 我认为这可能与它有关 - 但我没有连接点.
max_queue参数设置保存传入消息的队列的最大长度.默认值为32. 0禁用限制.消息收到后会添加到内存中的队列中; 然后recv()从该队列中弹出.为了防止在接收到的消息超过可处理的消息时过多的内存消耗,必须限制队列.如果队列填满,协议将停止处理传入数据,直到调用recv().在这种情况下,各种接收缓冲区(至少在asyncio和OS中)将填满,然后TCP接收窗口将缩小,减慢传输速度以避免丢包.
编辑9/28/2018:我正在测试它没有keep-alive消息,这似乎不是问题.它可能与convert_and_store()函数有关吗?这需要异步def然后等待吗?
def convert_and_store(data, divert = False, test = False):
if test:
data = b
fields = data.keys()
file_name = parse_call_type(data, divert = divert)
json_to_csv(data, file_name, fields)
Run Code Online (Sandbox Code Playgroud)
编辑10/1/2018:似乎keep-alive消息和convert_and_store都是问题; 如果我将保持活动消息扩展到60秒 - 那么convert_and_store将每60秒运行一次.所以convert_and_store正在等待keep_alive()......
Art*_*nov 10
它可能与 convert_and_store() 函数有关吗?
是的,可能是。不应直接调用阻塞代码。如果函数执行 CPU 密集型计算 1 秒,则所有 asyncio 任务和 IO 操作都会延迟 1 秒。
执行程序可用于在不同的线程/进程中运行阻塞代码:
import asyncio
import concurrent.futures
import time
def long_runned_job(x):
time.sleep(2)
print("Done ", x)
async def test():
loop = asyncio.get_event_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
for i in range(5):
loop.run_in_executor(pool, long_runned_job, i)
print(i, " is runned")
await asyncio.sleep(0.5)
loop = asyncio.get_event_loop()
loop.run_until_complete(test())
Run Code Online (Sandbox Code Playgroud)
在你的情况下,它应该是这样的:
import concurrent.futures
async def open_connection_test():
loop = asyncio.get_event_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
async with websockets.connect(...) as websocket:
while True:
...
loop.run_in_executor(pool, convert_and_store, args)
Run Code Online (Sandbox Code Playgroud)
编辑:
似乎 keep-alive 消息和 convert_and_store 都存在问题
您可以keep_alive
在后台运行:
async def keep_alive(ws):
while ws.open:
await ws.ping(...)
await asyncio.sleep(...)
async with websockets.connect(...) as websocket:
asyncio.ensure_future(keep_alive(websocket))
while True:
...
Run Code Online (Sandbox Code Playgroud)