类型错误:当我使用 asyncio Loop.run_in_executor() 时,无法 pickle 协程对象

Wad*_*ang 5 python python-asyncio

我指的是这个 repo将mmaction2 grad-cam demo从短视频离线推理改编为长视频在线推理。脚本如下所示:

注意:为了使该脚本可以轻松重现,我注释掉了一些需要许多依赖项的代码。

import cv2
import numpy as np
import torchvision.transforms as transforms
import sys
from PIL import Image
#from mmaction.apis import init_recognizer
#from utils.gradcam_utils import GradCAM
import torch
import asyncio
from concurrent.futures import ProcessPoolExecutor
from functools import partial
# sys.path.append('./utils')


async def preprocess_img(arr):
    image = Image.fromarray(np.uint8(arr))
    mean = [0.485, 0.456, 0.406]
    std = [0.229, 0.224, 0.225]
    transform = transforms.Compose([
        transforms.Resize((model_input_height, model_input_width)),
        transforms.ToTensor(),
        transforms.Normalize(mean, std, inplace=False),
    ])
    normalized_img = transform(image)
    img_np = normalized_img.numpy()
    return img_np


async def inference(frame_buffer):
    print("starting inference")
    # inputs = {}
    # input_tensor = torch.from_numpy(frame_buffer).type(torch.FloatTensor)
    # input_cuda_tensor = input_tensor.cuda()
    # inputs['imgs'] = input_cuda_tensor
    # results = gradcam(inputs)
    # display_buffer = np.squeeze(results[0].cpu().detach().numpy(), axis=0)
    # return display_buffer


async def run_blocking_func(loop_, queue_, frame_buffer):
    with ProcessPoolExecutor() as pool:
        blocking_func = partial(inference, frame_buffer)
        frame = await loop_.run_in_executor(pool, blocking_func)
        print(frame)
        await queue_.put(frame)
        await asyncio.sleep(0.01)

async def get_frames(capture):
    capture.grab()
    ret, frame = capture.retrieve()
    if not ret:
        print("empty frame")
        return
    for i in range(32):
        img = await preprocess_img(frame)
        expandimg = np.expand_dims(img, axis=(0, 1, 3))
        print(f'expandimg.shape{expandimg.shape}')
        frame_buffer[:, :, :, i, :, :] = expandimg[:, :, :, 0, :, :]
    return frame_buffer


async def show_frame(queue_: asyncio.LifoQueue):
    display_buffer = await queue_.get()
    for i in range(32):
        blended_image = display_buffer[i, :, :, :]
        cv2.imshow('Grad-CAM VIS', blended_image)
        if cv2.waitKey(10) & 0xFF == ord('q'):
            cap.release()
            cv2.destroyAllWindows()
            break


async def produce(loop_, queue_, cap):
    while True:
        frame_buffer = await asyncio.create_task(get_frames(cap))
        # Apply Grad-CAM
        display_buffer = await asyncio.create_task(run_blocking_func(loop_, queue_,frame_buffer))
        await queue_.put(display_buffer)


async def consume(queue_):
    while True:
        if queue_.qsize():
            task1 = asyncio.create_task(show_frame(queue_))
            await asyncio.wait(task1)
            if cv2.waitKey(1) == 27:
                break
        else:
            await asyncio.sleep(0.01)


async def run(loop_, queue_, cap_):
    producer_task = asyncio.create_task(produce(loop_, queue_, cap_))
    consumer_task = asyncio.create_task(consume(queue_))
    await asyncio.gather(producer_task, consumer_task)


if __name__ == '__main__':

    # config = '/home/user/Repo/mmaction2/configs/recognition/i3d/i3d_r50_video_inference_32x2x1_100e_kinetics400_rgb.py'
    # checkpoint = '/home/user/Repo/mmaction2/checkpoints/i3d_r50_video_32x2x1_100e_kinetics400_rgb_20200826-e31c6f52.pth'
    # device = torch.device('cuda:0')
    # model = init_recognizer(config, checkpoint, device=device, use_frames=False)
    video_path = 'replace_with_your_video.mp4'
    model_input_height = 256
    model_input_width = 340
    # target_layer_name = 'backbone/layer4/1/relu'
    # gradcam = GradCAM(model, target_layer_name)

    cap = cv2.VideoCapture(video_path)
    width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)  # float
    height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)  # float

    frame_buffer = np.zeros((1, 1, 3, 32, model_input_height, model_input_width))
    display_buffer = np.zeros((32, model_input_height, model_input_width, 3))  # (32, 256, 340, 3)

    loop = asyncio.get_event_loop()
    queue = asyncio.LifoQueue(maxsize=2)

    try:
        loop.run_until_complete(run(loop_=loop, queue_=queue, cap_=cap))
    finally:
        print("shutdown service")
        loop.close()
Run Code Online (Sandbox Code Playgroud)

但是当我运行它时,它报告以下错误:

concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/home/user/miniconda3/lib/python3.7/concurrent/futures/process.py", line 205, in _sendback_result
    exception=exception))
  File "/home/user/miniconda3/lib/python3.7/multiprocessing/queues.py", line 358, in put
    obj = _ForkingPickler.dumps(obj)
  File "/home/user/miniconda3/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: can't pickle coroutine objects
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/user/Repo/Python-AI-Action-Utils/temp2.py", line 120, in <module>
    loop.run_until_complete(run(loop_=loop, queue_=queue, cap_=cap))
  File "/home/user/miniconda3/lib/python3.7/asyncio/base_events.py", line 587, in run_until_complete
    return future.result()
  File "/home/user/Repo/Python-AI-Action-Utils/temp2.py", line 94, in run
    await asyncio.gather(producer_task, consumer_task)
  File "/home/user/Repo/Python-AI-Action-Utils/temp2.py", line 76, in produce
    display_buffer = await asyncio.create_task(run_blocking_func(loop_, queue_,frame_buffer))
  File "/home/user/Repo/Python-AI-Action-Utils/temp2.py", line 42, in run_blocking_func
    frame = await loop_.run_in_executor(pool, blocking_func)
TypeError: can't pickle coroutine objects
Task was destroyed but it is pending!
task: <Task pending coro=<consume() running at /home/user/Repo/Python-AI-Action-Utils/temp2.py:88> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f7cf1418cd0>()]> cb=[gather.<locals>._done_callback() at /home/user/miniconda3/lib/python3.7/asyncio/tasks.py:691]>

Process finished with exit code 1
Run Code Online (Sandbox Code Playgroud)

ale*_*ame 9

如果使用run_in_executor,目标函数不应该是async。您需要删除async之前的关键字def inference()