如何从同步代码Python调用异步函数

Paa*_*sen 5 python asynchronous python-3.6 fme

因此,我被锁定在桌面应用程序之后的python 3.6.2解释器中。

我想要的是从同步方法或函数调用异步函数。

从桌面应用程序调用python函数时,它必须是无法等待的普通函数。

我可以从桌面应用程序发送一个URL列表,而我想要的是在异步情况下从每个URL发送回响应。

这是我的尝试,我已将SyntaxError标记为我不知道如何绕过。

import fmeobjects
import asyncio
import aiohttp
import async_timeout
logger = fmeobjects.FMELogFile()
timeout = 10

class FeatureProcessor(object):
    def __init__(self):
        pass
    def input(self, feature):
        urls_and_coords = zip(feature.getAttribute('_list{}._wms'),\
        feature.getAttribute('_list{}._xmin'),\
        feature.getAttribute('_list{}._ymin'),\
        feature.getAttribute('_list{}._xmax'),\
        feature.getAttribute('_list{}._ymax'))
        -> SyntaxError: newfeature = await main(urls_and_coords)
        self.pyoutput(newfeature)

    def close(self):
       pass 

async def main(urls):
    loop = asyncio.get_event_loop()
    async with aiohttp.ClientSession(loop=loop) as session:
        feature = loop.run_until_complete(fetch_all(session, urls, loop))
        return feature

async def fetch_all(session, urls, loop):
    results = await asyncio.gather(*[loop.create_task(fetch(session, url)) for url in urls])
    return results


async def fetch(session, url):
    with async_timeout.timeout(10):
        async with session.get(url[0]) as response:
            newFeature = fmeobjects.FMEFeature()
            response_data = await response
            newFeature.setAttribute('response', response_data)
            newFeature.setAttribute('_xmin',url[1])
            newFeature.setAttribute('_xmax',url[2])
            newFeature.setAttribute('_ymin',url[3])
            newFeature.setAttribute('_ymax',url[4])
            return newFeature
Run Code Online (Sandbox Code Playgroud)

我尝试进行以下更改:导入fme导入fmeobjects导入asyncio导入aiohttp导入async_timeout logger = fmeobjects.FMELogFile()

class FeatureProcessor(object):
    def __init__(self):
        pass
    def input(self, feature):
        urls_and_coords = zip(feature.getAttribute('_list{}._wms'),\
        feature.getAttribute('_list{}._xmin'),\
        feature.getAttribute('_list{}._ymin'),\
        feature.getAttribute('_list{}._xmax'),\
        feature.getAttribute('_list{}._ymax'))
        loop = asyncio.get_event_loop()
        result = loop.run_until_complete(main(loop, urls_and_coords))
        #feature.setAttribute('result',result)
        self.pyoutput(feature)

    def close(self):
       pass 

async def main(loop, urls):
    async with aiohttp.ClientSession(loop=loop) as session:
        return await fetch_all(session, urls, loop)


async def fetch_all(session, urls, loop):
    results = await asyncio.gather(*[loop.create_task(fetch(session, url)) for url in urls])
    return results


async def fetch(session, url):
    with async_timeout.timeout(10):
        async with session.get(url[0]) as response:
            #newFeature = fmeobjects.FMEFeature()
            response = await response
            #newFeature.setAttribute('response', response_data)
            #newFeature.setAttribute('_xmin',url[1])
            #newFeature.setAttribute('_xmax',url[2])
            #newFeature.setAttribute('_ymin',url[3])
            #newFeature.setAttribute('_ymax',url[4])
            return response, url[1], url[2], url[3], url[4]
Run Code Online (Sandbox Code Playgroud)

但是现在我遇到了这个错误:

Python Exception <TypeError>: object ClientResponse can't be used in 'await' 
expression
Traceback (most recent call last):
  File "<string>", line 20, in input
  File "asyncio\base_events.py", line 467, in run_until_complete
  File "<string>", line 29, in main
  File "<string>", line 33, in fetch_all
  File "<string>", line 41, in fetch
TypeError: object ClientResponse can't be used in 'await' expression
Run Code Online (Sandbox Code Playgroud)

Mat*_*eld 17

还有一些库可以处理这个问题并始终做正确的事情。asgiref.sync这里描述了一个示例,async_to_syncsync_to_async具有执行这些转换的方法:

from asgiref.sync import async_to_sync

@async_to_sync
async def print_data():
    print(await get_data())

print_data()  # Can be called synchronously
Run Code Online (Sandbox Code Playgroud)

来自文档的更多信息asgiref.sync

AsyncToSync 让同步子线程在主线程的事件循环上调用异步函数时停止并等待,然后在异步函数完成时将控制权返回给线程。

SyncToAsync 让异步代码调用同步函数,该函数在线程池中运行,并在同步函数完成时将控制权返回给异步协程。

还有其他类似的项目,例如koil

  • ...因为添加脆弱的第三方依赖项来执行 Python 标准库已经可以轻松完成的开箱即用的操作是一个坏主意 - *总是*。只需直接调用 `asyncio.run()` 或 `asyncio.get_event_loop().run_until_complete()` 即可。无论哪种情况,这都是一句微不足道的俏皮话。“asyncio”模块的存在是有原因的。`&lt;/捂脸&gt;` (10认同)
  • 这不是一件简单的事情,标准库以我在这里看到的任何方式处理它。正如周围所评论的,函数可以嵌套在其他线程、其他运行循环中,并调用同步和异步代码的层次结构,并且当前线程可能已经存在也可能没有运行循环。需要一个能够很好地处理和记录所有这些的功能。 (5认同)

dec*_*eze 8

您将使用事件循环来执行异步功能以完成操作:

newfeature = asyncio.get_event_loop().run_until_complete(main(urls_and_coords))
Run Code Online (Sandbox Code Playgroud)

(这项技术已经被用于内部main。而且我不知道为什么,因为mainasync你可以/应该用await fetch_all(...)在那里。)

  • 但是我可能需要重写 main,因为它已经有一个 event_loop? (3认同)

Fra*_*las 6

@deceze答案可能是您在Python 3.6中可以做的最好的事情。但是在Python 3.7中,您可以通过以下方式直接使用asyncio.run

newfeature = asyncio.run(main(urls))
Run Code Online (Sandbox Code Playgroud)

它将正确创建,处理和关闭event_loop

  • @mgutsche这并不能解决原始问题中的问题,除非该函数被声明为“async def”,否则您不能使用await,这可能无法做到,例如对于无法更改的魔术方法或接口。 (11认同)
  • 但是如果代码已经在“asyncio.run”调用中运行怎么办?如果您在使用“asyncio.run”调用的函数内使用“asyncio.run”,则会收到“RuntimeError:无法从正在运行的事件循环调用 asyncio.run()” (10认同)
  • @birgersp 如果你已经在 event_loop 中,你可以简单地通过 `result = wait main(urls)` 调用它。 (4认同)