使用Python的asyncio按顺序获取数据

mat*_*mat 4 python asynchronous python-asyncio

我有一个Python 2.7程序,它从网站提取数据并将结果转储到数据库.它遵循消费者生产者模型,并使用线程模块编写.

只是为了好玩,我想用新的asyncio模块(从3.4)重写这个程序,但我无法弄清楚如何正确地做到这一点.

最关键的要求是程序必须按顺序从同一网站获取数据.例如,对于网址' http://a-restaurant.com ',它应首先获得' http://a-restaurant.com/menu/0 ',然后' http://a-restaurant.com/menu/ 1 ',然后' http://a-restaurant.com/menu/2 ',...如果没有提取它们,网站将完全停止提供页面,你必须从0开始.

然而另一个网站(' http://another-restaurant.com ')的另一个获取可以(并且应该)同时运行(其他网站也有sequantial限制).

线程模块非常适合这种情况,因为我可以为每个网站创建单独的线程,并且在每个线程中它可以等到一个页面完成加载,然后再获取另一个页面.

这是一个来自线程版本(Python 2.7)的极为简化的代码片段:

class FetchThread(threading.Threading)
    def __init__(self, queue, url)
        self.queue = queue
        self.baseurl = url
    ...
    def run(self)
        # Get 10 menu pages in a sequantial order
        for food in range(10):
            url = self.baseurl + '/' + str(food)
            text = urllib2.urlopen(url).read()
            self.queue.put(text)
            ...
def main()
    queue = Queue.Queue()
    urls = ('http://a-restaurant.com/menu', 'http://another-restaurant.com/menu')
    for url in urls:
        fetcher = FetchThread(queue, url)
        fetcher.start()
        ...
Run Code Online (Sandbox Code Playgroud)

以下是我尝试使用asyncio(在3.4.1中)的方法:

@asyncio.coroutine
def fetch(url):
    response = yield from aiohttp.request('GET', url)
    response = yield from response.read_and_close()
    return response.decode('utf-8')

@asyncio.coroutine
def print_page(url):
    page = yield from fetch(url)
    print(page)


l = []
urls = ('http://a-restaurant.com/menu', 'http://another-restaurant.com/menu')
for url in urls:
    for food in range(10):
        menu_url = url + '/' + str(food)
        l.append(print_page(menu_url))

loop.run_until_complete(asyncio.wait(l))
Run Code Online (Sandbox Code Playgroud)

它以非连续的顺序提取和打印所有内容.好吧,我猜这就是那些协同程序的全部想法.我应该不使用aiohttp并只使用urllib获取?但是第一家餐馆的食物是否阻止了其他餐馆的提取?我只是觉得这完全错了吗?(这只是一个尝试按顺序获取东西的测试.还没有进入队列部分.)

dan*_*ano 5

您当前的代码将适用于不关心请求的顺序排序的餐厅.对菜单的所有十个请求将同时运行,并且一旦完成就会打印到stdout.

显然,这对需要顺序请求的餐厅不起作用.你需要重构一点才能工作:

@asyncio.coroutine
def fetch(url):
    response = yield from aiohttp.request('GET', url)
    response = yield from response.read_and_close()
    return response.decode('utf-8')

@asyncio.coroutine
def print_page(url):
    page = yield from fetch(url)
    print(page)

@syncio.coroutine
def print_pages_sequential(url, num_pages):
    for food in range(num_pages):
        menu_url = url + '/' + str(food)
        yield from print_page(menu_url)

l = [print_pages_sequential('http://a-restaurant.com/menu', 10)]

conc_url = 'http://another-restaurant.com/menu'
for food in range(10):
    menu_url = conc_url + '/' + str(food)
    l.append(print_page(menu_url))

loop.run_until_complete(asyncio.wait(l))
Run Code Online (Sandbox Code Playgroud)

我们不是将连续餐馆的所有十个请求添加到列表中,而是在列表中添加一个协程,它将按顺序迭代所有十个页面.这种方式的工作方式是在请求完成之前yield from print_page停止执行,但它会在不阻塞任何其他同时运行的协同程序的情况下执行此操作(就像您追加的所有调用一样).print_pages_sequentialprint_pageprint_pagel

通过这种方式,您的所有"另一个餐厅"请求可以完全同时运行,就像您想要的那样,并且您的"餐厅"请求将按顺序运行,但不会阻止任何"另一个餐厅"请求.

编辑:

如果所有站点都具有相同的顺序提取要求,则逻辑可以更简化:

l = []
urls = ["http://a-restaurant.com/menu", "http://another-restaurant.com/menu"]
for url in urls:
    menu_url = url + '/' + str(food)
    l.append(print_page_sequential(menu_url, 10))

loop.run_until_complete(asyncio.wait(l))
Run Code Online (Sandbox Code Playgroud)