le-*_*ude 4 python asynchronous scala
我需要一些期待在Python/Django中等待其他人,我不得不使用网络和concurrent.futures(backport到2.7)的源来破解我自己的"Promise"类.
这是我的代码(不是很漂亮,但正是我所需要的),我从博客文章中得到了大部分的想法(我现在找不到它的链接......稍后会更新):
class Promise(object):
def __init__(self, func):
self.func = func
LOG.debug('Create Promise from callable %s %s' % (func, str(inspect.getargspec(func))))
def resolve_on_type(self, arg):
if isinstance(arg, Future):
return self.resolve_on_type(arg.result()) # happens when chaining two promises.
elif isinstance(arg, types.GeneratorType):
iterable = list()
for a in arg:
iterable.append(self.resolve_on_type(a))
return iterable
else:
return arg
def resolve(self, *args, **kwargs):
resolved_args = []
resolved_kwargs = {}
#TODO need a more efficient way to wait on all results
for i, arg in enumerate(args):
resolved_args.append(self.resolve_on_type(arg))
for kw, arg in kwargs.items():
resolved_kwargs[kw] = self.resolve_on_type(arg)
try:
return self.func(*resolved_args, **resolved_kwargs)
except:
LOG.exception('<Promise> Error on task execution.')
raise
def __call__(self, *args, **kwargs):
LOG.debug('Promise %s called' % self.func)
return thread_pool.submit(self.resolve, *args, **kwargs)
Run Code Online (Sandbox Code Playgroud)
就个人而言,我想要像scala一样的东西.
def my_function(some, args, here):
#do stuff that takes time and blocks or whatever
return something
future_1 = future(my_function, _some, _args, _here_please).map(mapping_function).recover(error_handling_function)
future_1.result()
list_of_futures = map(async_functions, some_args)
future_of_list = sequence(list_of_futures)
Run Code Online (Sandbox Code Playgroud)
我会很感激任何提示......或者有什么东西能运作良好吗?我必须说concurrent.futures简化了我的任务,但我不知道如何map工作.我想我需要专家(在我使用它进行文件和dbs的维护脚本之前,我在活跃的python开发中已经不到几个月了).我想如果我得到序列和地图,我可以得到如何为自己工作休息.
Python的concurrent.futures确实在启用未来组合和反应式编程方面做得很差.
看看这个库,它实现了Scala-like Futures和Promises.实现很小且经过充分测试,并且应该很容易移植到Python 2.7以满足您的需求.
它包装concurrent.futures*ThreadPoolExecutor*以返回增强的Future对象:
from rx.executors import ThreadPoolExecutor
from rx.futures import Future
with ThreadPoolExecutor(10) as tp:
futures = [tp.submit(foo, i) for i in range(10)] # list of futures
future_all = Future.all(futures) # combines into single future with list of results
future_all_sum = future_all.map(sum) # maps result list to sum of elements
print(future_all_sum.result(timeout=10))
Run Code Online (Sandbox Code Playgroud)