Python 异步 IO 图像处理

Max*_*ann 5 python image python-asyncio

我目前正在从网络摄像头拍摄图像,捕获后应立即进行处理。该代码使用 cv2.videocapture() 函数。

想法:一旦捕获图像,我想利用系统捕获新图像所需的时间来对前一个图像进行计算。

为了完成此任务,我使用 asyncio 库。

首先,我定义一个生成器函数,它生成 n 个图像,这些图像应存储在队列中:

async def produce(queue, n):
    for x in range(n):
        # produce an item
        print('producing {}/{}'.format(x, n))
        # simulate i/o operation using sleep
        await asyncio.sleep(random.random())
        item = obtainImage()
        z.append(x)
        # put the item in the queue
        await queue.put(item)
Run Code Online (Sandbox Code Playgroud)

之后,我实现了一个消费者函数,它等待队列中的条目并计算一些图像特征(这里是一个简单的阈值):

 async def consume(queue):
        while True:
            # wait for an item from the producer
            # Load the image stored in the queue 
            item = await queue.get()
            # process the item and store value
            ret,thresh1 = cv2.threshold(item,127,255,cv2.THRESH_BINARY)

            # simulate i/o operation using sleep
            await asyncio.sleep(random.random())

            # Notify the queue that the item has been processed
            queue.task_done() 

            #Return the Processesed image
            return thresh1
Run Code Online (Sandbox Code Playgroud)

为了启动这些函数,我创建了一个运行函数并将其添加到事件循环中(运行方法包含一个表示我要处理多少图像的参数):

async def run(n):
    queue = asyncio.Queue()
    # schedule the consumer
    consumer = asyncio.ensure_future(consume(queue))
    # run the producer and wait for completion
    await produce(queue, n)
    # wait until the consumer has processed all items
    await queue.join()
    # the consumer is still awaiting for an item, cancel it
    consumer.cancel()

  loop = asyncio.get_event_loop()
  loop.run_until_complete(run(10))
  print(loop)
  loop.close()
Run Code Online (Sandbox Code Playgroud)

只要我使用 print 语句,我就会得到如下异步输出:

生产 0/10

生产1/10

消耗0...

生产 2/10

消耗1...

生产 3/10

消耗2...

生产 4/10

消耗3...

这正是我所希望的顺序。但是,我无法取回任何阈值图像。我缺少什么?

先感谢您

urb*_*ban 2

Minimal example with global queue for results returning (tested with Python 3.5.2)

import asyncio
import random

outQ = asyncio.Queue() 

async def produce(queue, n):
    for x in range(n):
        print('producing {}/{}'.format(x, n))
        await asyncio.sleep(random.random())
        item = x
        await queue.put(item)

async def consume(queue):
    while True:
        print('consume ')
        item = await queue.get()
        thresh1 = item**2
        print("... adding %d" % thresh1)
        await outQ.put(thresh1)
        await asyncio.sleep(random.random())
        queue.task_done()


async def run(n):
    queue = asyncio.Queue()
    consumer = asyncio.ensure_future(consume(queue))
    await produce(queue, n)
    await queue.join()
    consumer.cancel()

loop = asyncio.get_event_loop()
loop.run_until_complete(run(10))
loop.close()

while not outQ.empty():
    print(outQ.get_nowait())
Run Code Online (Sandbox Code Playgroud)

Output:

...
producing 9/10
... adding 64
consume 
... adding 81
consume 
0
1
4
9
16
25
36
49
64
81
Run Code Online (Sandbox Code Playgroud)