Python中的异步方法调用?

Ste*_*ini 170 python asynchronous

我想知道在Python中是否有任何用于异步方法调用的库.如果你可以做类似的事情会很棒

@async
def longComputation():
    <code>


token = longComputation()
token.registerCallback(callback_function)
# alternative, polling
while not token.finished():
    doSomethingElse()
    if token.finished():
        result = token.result()
Run Code Online (Sandbox Code Playgroud)

或者异步调用非异步例程

def longComputation()
    <code>

token = asynccall(longComputation())
Run Code Online (Sandbox Code Playgroud)

如果在语言核心中使用更加精细的策略,那就太棒了.这是考虑过吗?

Dra*_*sha 196

就像是:

import threading

thr = threading.Thread(target=foo, args=(), kwargs={})
thr.start() # Will run "foo"
....
thr.is_alive() # Will return whether foo is running currently
....
thr.join() # Will wait till "foo" is done
Run Code Online (Sandbox Code Playgroud)

有关详细信息,请参阅https://docs.python.org/2/library/threading.html#module-threading中的文档; 此代码也适用于Python 3.

  • 重要说明:由于"全局解释器锁定",线程的标准实现(CPython)对计算绑定任务没有帮助.参见图书馆doc:[link](http://docs.python.org/2/library/threading.html#module-threading) (21认同)
  • 使用thread.join()真的是异步的吗?如果您不想阻塞线程(例如UI线程)并且不使用大量资源在其上进行while循环该怎么办? (3认同)

Luc*_* S. 137

您可以使用Python 2.6中添加的多处理模块.您可以使用进程池,然后异步获取结果:

apply_async(func[, args[, kwds[, callback]]])
Run Code Online (Sandbox Code Playgroud)

例如:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=1)              # Start a worker processes.
    result = pool.apply_async(f, [10], callback) # Evaluate "f(10)" asynchronously calling callback when finished.
Run Code Online (Sandbox Code Playgroud)

这只是一种选择.该模块提供了许多设施来实现您的需求.从这里制作装饰器也很容易.

  • 这有效:result = pool.apply_async(f,[10],callback = finish) (11认同)
  • 值得记住的是,这会产生单独的进程,而不是在进程中分离线程.这可能会有一些影响. (6认同)
  • 要在python中异步执行任何操作,需要使用多处理模块来生成新进程.仅仅创建新线程仍然受Global Interpreter Lock的支配,它可以阻止python进程同时执行多个操作. (6认同)
  • 不幸的是,Lucas S.,你的例子不起作用.回调函数永远不会被调用. (4认同)
  • 如果您不想在使用此解决方案时生成新进程 - 将导入更改为`from multiprocessing.dummy import Pool`.multiprocessing.dummy具有在线程而不是进程上实现的完全相同的行为 (2认同)

cam*_*beh 42

从Python 3.5开始,您可以使用增强型生成器来实现异步功能.

import asyncio
import datetime
Run Code Online (Sandbox Code Playgroud)

增强的生成器语法

@asyncio.coroutine
def display_date(loop):
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        yield from asyncio.sleep(1)


loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()
Run Code Online (Sandbox Code Playgroud)

async/await语法:

async def display_date(loop):
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)


loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()
Run Code Online (Sandbox Code Playgroud)

  • @carnabeh,您是否可以扩展该示例以包含OP的"def longComputation()"函数?大多数示例使用"await asyncio.sleep(1)",但如果longComputation()返回一个double,则不能只使用"await longComputation()". (4认同)
  • 十年后,这应该是现在公认的答案。当您谈论 python3.5+ 中的异步时,首先想到的应该是 asyncio 和 async 关键字。 (2认同)

Mer*_*son 30

它不是语言核心,而是一个非常成熟的图书馆,可以做你想要的就是Twisted.它引入了Deferred对象,您可以将回调或错误处理程序("errbacks")附加到.延迟基本上是一个"承诺",函数最终会产生结果.


xpe*_*oni 19

你可以实现一个装饰器来使你的函数异步,虽然这有点棘手.该multiprocessing模块充满了小怪癖和看似随意的限制 - 更有理由将它封装在一个友好的界面后面.

from inspect import getmodule
from multiprocessing import Pool


def async(decorated):
    r'''Wraps a top-level function around an asynchronous dispatcher.

        when the decorated function is called, a task is submitted to a
        process pool, and a future object is returned, providing access to an
        eventual return value.

        The future object has a blocking get() method to access the task
        result: it will return immediately if the job is already done, or block
        until it completes.

        This decorator won't work on methods, due to limitations in Python's
        pickling machinery (in principle methods could be made pickleable, but
        good luck on that).
    '''
    # Keeps the original function visible from the module global namespace,
    # under a name consistent to its __name__ attribute. This is necessary for
    # the multiprocessing pickling machinery to work properly.
    module = getmodule(decorated)
    decorated.__name__ += '_original'
    setattr(module, decorated.__name__, decorated)

    def send(*args, **opts):
        return async.pool.apply_async(decorated, args, opts)

    return send
Run Code Online (Sandbox Code Playgroud)

下面的代码说明了装饰器的用法:

@async
def printsum(uid, values):
    summed = 0
    for value in values:
        summed += value

    print("Worker %i: sum value is %i" % (uid, summed))

    return (uid, summed)


if __name__ == '__main__':
    from random import sample

    # The process pool must be created inside __main__.
    async.pool = Pool(4)

    p = range(0, 1000)
    results = []
    for i in range(4):
        result = printsum(i, sample(p, 100))
        results.append(result)

    for result in results:
        print("Worker %i: sum value is %i" % result.get())
Run Code Online (Sandbox Code Playgroud)

在一个真实的案例中,我会在装饰器上详细说明一下,提供一些方法来关闭它以进行调试(同时保持未来的接口),或者可能是处理异常的工具; 但我认为这足以证明这一原则.


Ant*_*luk 15

只是

import threading, time

def f():
    print "f started"
    time.sleep(3)
    print "f finished"

threading.Thread(target=f).start()
Run Code Online (Sandbox Code Playgroud)


小智 7

我的解决方案是:

import threading

class TimeoutError(RuntimeError):
    pass

class AsyncCall(object):
    def __init__(self, fnc, callback = None):
        self.Callable = fnc
        self.Callback = callback

    def __call__(self, *args, **kwargs):
        self.Thread = threading.Thread(target = self.run, name = self.Callable.__name__, args = args, kwargs = kwargs)
        self.Thread.start()
        return self

    def wait(self, timeout = None):
        self.Thread.join(timeout)
        if self.Thread.isAlive():
            raise TimeoutError()
        else:
            return self.Result

    def run(self, *args, **kwargs):
        self.Result = self.Callable(*args, **kwargs)
        if self.Callback:
            self.Callback(self.Result)

class AsyncMethod(object):
    def __init__(self, fnc, callback=None):
        self.Callable = fnc
        self.Callback = callback

    def __call__(self, *args, **kwargs):
        return AsyncCall(self.Callable, self.Callback)(*args, **kwargs)

def Async(fnc = None, callback = None):
    if fnc == None:
        def AddAsyncCallback(fnc):
            return AsyncMethod(fnc, callback)
        return AddAsyncCallback
    else:
        return AsyncMethod(fnc, callback)
Run Code Online (Sandbox Code Playgroud)

并按照要求完成工作:

@Async
def fnc():
    pass
Run Code Online (Sandbox Code Playgroud)


Raj*_*Raj 7

你可以使用eventlet.它允许您编写看似同步代码的内容,但让它在网络上异步操作.

以下是超级最小爬虫的示例:

urls = ["http://www.google.com/intl/en_ALL/images/logo.gif",
     "https://wiki.secondlife.com/w/images/secondlife.jpg",
     "http://us.i1.yimg.com/us.yimg.com/i/ww/beta/y3.gif"]

import eventlet
from eventlet.green import urllib2

def fetch(url):

  return urllib2.urlopen(url).read()

pool = eventlet.GreenPool()

for body in pool.imap(fetch, urls):
  print "got body", len(body)
Run Code Online (Sandbox Code Playgroud)


Ben*_*ari 6

Python 3.7及更高版本中较新的asyncio运行方法是使用而不是创建、调用以及关闭它:asyncio.run()looploop.run_until_complete()

import asyncio
import datetime

async def display_date(delay):
    loop = asyncio.get_running_loop()
    end_time = loop.time() + delay
    while True:
        print("Blocking...", datetime.datetime.now())
        await asyncio.sleep(1)
        if loop.time() > end_time:
            print("Done.")
            break


asyncio.run(display_date(5))
Run Code Online (Sandbox Code Playgroud)


Nic*_*ton 5

这样的东西对我有用,然后你可以调用该函数,它会将自己调度到一个新的线程上.

from thread import start_new_thread

def dowork(asynchronous=True):
    if asynchronous:
        args = (False)
        start_new_thread(dowork,args) #Call itself on a new thread.
    else:
        while True:
            #do something...
            time.sleep(60) #sleep for a minute
    return
Run Code Online (Sandbox Code Playgroud)