在Python 3中使用asyncio和websockets的长时间延迟

Lia*_*nen 8 python websocket python-3.x python-asyncio

在处理从websocket服务器推送到client.py的数据时,我遇到了很长的(3小时)延迟(编辑:一开始就是短暂的延迟,然后在一天中变得更长).我知道它不会被服务器延迟.

例如,我每5秒钟看到keep_alive日志事件及其各自的时间戳.这样运行顺利.但是当我看到在日志中处理的数据帧实际上是在服务器发送它之后 3小时.我是在做什么来推迟这个过程吗?

我是否正确地调用了我的协程'keep_alive'?keep_alive只是服务器的一条消息,用于保持连接的活动状态.服务器回显消息.我也记得太多了吗?这可能会延迟处理(我不这么认为,因为我看到记录事件立即发生).

async def keep_alive(websocket):
                """
                 This only needs to happen every 30 minutes. I currently have it set to every 5 seconds.
                """
                await websocket.send('Hello')   
                await asyncio.sleep(5)

async def open_connection_test():
    """
    Establishes web socket (WSS). Receives data and then stores in csv.
    """
    async with websockets.connect( 
            'wss://{}:{}@localhost.urlname.com/ws'.format(user,pswd), ssl=True, ) as websocket:
        while True:    
            """
            Handle message from server.
            """
            message = await websocket.recv()
            if message.isdigit():
                # now = datetime.datetime.now()
                rotating_logger.info ('Keep alive message: {}'.format(str(message)))
            else:
                jasonified_message = json.loads(message)
                for key in jasonified_message:
                    rotating_logger.info ('{}: \n\t{}\n'.format(key,jasonified_message[key]))    
                """
                Store in a csv file.
                """
                try:            
                    convert_and_store(jasonified_message)
                except PermissionError:
                    convert_and_store(jasonified_message, divert = True)                        
            """
            Keep connection alive.
            """            
            await keep_alive(websocket)

"""
Logs any exceptions in logs file.
"""
try:
    asyncio.get_event_loop().run_until_complete(open_connection())
except Exception as e:
    rotating_logger.info (e)
Run Code Online (Sandbox Code Playgroud)

编辑:从文档 - 我认为这可能与它有关 - 但我没有连接点.

max_queue参数设置保存传入消息的队列的最大长度.默认值为32. 0禁用限制.消息收到后会添加到内存中的队列中; 然后recv()从该队列中弹出.为了防止在接收到的消息超过可处理的消息时过多的内存消耗,必须限制队列.如果队列填满,协议将停止处理传入数据,直到调用recv().在这种情况下,各种接收缓冲区(至少在asyncio和OS中)将填满,然后TCP接收窗口将缩小,减慢传输速度以避免丢包.

编辑9/28/2018:我正在测试它没有keep-alive消息,这似乎不是问题.它可能与convert_and_store()函数有关吗?这需要异步def然后等待吗?

def convert_and_store(data, divert = False, test = False):
    if test:
        data = b
    fields = data.keys()
    file_name =  parse_call_type(data, divert = divert)
    json_to_csv(data, file_name, fields)
Run Code Online (Sandbox Code Playgroud)

编辑10/1/2018:似乎keep-alive消息和convert_and_store都是问题; 如果我将保持活动消息扩展到60秒 - 那么convert_and_store将每60秒运行一次.所以convert_and_store正在等待keep_alive()......

Art*_*nov 10

它可能与 convert_and_store() 函数有关吗?

是的,可能是。不应直接调用阻塞代码。如果函数执行 CPU 密集型计算 1 秒,则所有 asyncio 任务和 IO 操作都会延迟 1 秒。

执行程序可用于在不同的线程/进程中运行阻塞代码:

import asyncio
import concurrent.futures
import time

def long_runned_job(x):
    time.sleep(2)
    print("Done ", x)

async def test():
    loop = asyncio.get_event_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        for i in range(5):
            loop.run_in_executor(pool, long_runned_job, i)
            print(i, " is runned")
            await asyncio.sleep(0.5)
loop = asyncio.get_event_loop()
loop.run_until_complete(test())
Run Code Online (Sandbox Code Playgroud)

在你的情况下,它应该是这样的:

import concurrent.futures

async def open_connection_test():
    loop = asyncio.get_event_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        async with websockets.connect(...) as websocket:
            while True:    
                ...
                loop.run_in_executor(pool, convert_and_store, args)
Run Code Online (Sandbox Code Playgroud)

编辑

似乎 keep-alive 消息和 convert_and_store 都存在问题

您可以keep_alive在后台运行:

async def keep_alive(ws):
    while ws.open:
        await ws.ping(...)   
        await asyncio.sleep(...)

async with websockets.connect(...) as websocket:
    asyncio.ensure_future(keep_alive(websocket))
    while True:    
        ...
Run Code Online (Sandbox Code Playgroud)