我想为python做一个类似'future'异步API的scala

le-*_*ude 4 python asynchronous scala

我需要一些期待在Python/Django中等待其他人,我不得不使用网络和concurrent.futures(backport到2.7)的源来破解我自己的"Promise"类.

这是我的代码(不是很漂亮,但正是我所需要的),我从博客文章中得到了大部分的想法(我现在找不到它的链接......稍后会更新):

class Promise(object):
    def __init__(self, func):
        self.func = func
        LOG.debug('Create Promise from callable %s %s' % (func, str(inspect.getargspec(func))))

    def resolve_on_type(self, arg):
        if isinstance(arg, Future):
            return self.resolve_on_type(arg.result()) # happens when chaining two promises.
        elif isinstance(arg, types.GeneratorType):
            iterable = list()
            for a in arg:
                iterable.append(self.resolve_on_type(a))
            return iterable
        else:
            return arg

    def resolve(self, *args, **kwargs):
        resolved_args = []
        resolved_kwargs = {}

        #TODO need a more efficient way to wait on all results
        for i, arg in enumerate(args):
            resolved_args.append(self.resolve_on_type(arg))

        for kw, arg in kwargs.items():
            resolved_kwargs[kw] = self.resolve_on_type(arg)

        try:
            return self.func(*resolved_args, **resolved_kwargs)
        except:
            LOG.exception('<Promise> Error on task execution.')
            raise

    def __call__(self, *args, **kwargs):
        LOG.debug('Promise %s called' % self.func)
        return thread_pool.submit(self.resolve, *args, **kwargs)
Run Code Online (Sandbox Code Playgroud)

就个人而言,我想要像scala一样的东西.

def my_function(some, args, here):
    #do stuff that takes time and blocks or whatever
    return something

future_1 = future(my_function, _some, _args, _here_please).map(mapping_function).recover(error_handling_function)
future_1.result()

list_of_futures = map(async_functions, some_args)
future_of_list = sequence(list_of_futures) 
Run Code Online (Sandbox Code Playgroud)

我会很感激任何提示......或者有什么东西能运作良好吗?我必须说concurrent.futures简化了我的任务,但我不知道如何map工作.我想我需要专家(在我使用它进行文件和dbs的维护脚本之前,我在活跃的python开发中已经不到几个月了).我想如果我得到序列和地图,我可以得到如何为自己工作休息.

use*_*877 5

Python的concurrent.futures确实在启用未来组合和反应式编程方面做得很差.

看看这个库,它实现了Scala-like FuturesPromises.实现很小且经过充分测试,并且应该很容易移植到Python 2.7以满足您的需求.

它包装concurrent.futures*ThreadPoolExecutor*以返回增强的Future对象:

from rx.executors import ThreadPoolExecutor
from rx.futures import Future

with ThreadPoolExecutor(10) as tp:
    futures = [tp.submit(foo, i) for i in range(10)] # list of futures
    future_all = Future.all(futures) # combines into single future with list of results
    future_all_sum = future_all.map(sum) # maps result list to sum of elements
    print(future_all_sum.result(timeout=10))
Run Code Online (Sandbox Code Playgroud)