she*_*per 6 python asynchronous python-3.x python-decorators python-asyncio
我正在尝试设计一个可以轻松创建数据处理管道的异步管道.管道由几个功能组成.输入数据在管道的一端进入,在另一端出现.
我想以下列方式设计管道:
这是我想出的:
import asyncio
@asyncio.coroutine
def add(x):
return x + 1
@asyncio.coroutine
def prod(x):
return x * 2
@asyncio.coroutine
def power(x):
return x ** 3
def connect(funcs):
def wrapper(*args, **kwargs):
data_out = yield from funcs[0](*args, **kwargs)
for func in funcs[1:]:
data_out = yield from func(data_out)
return data_out
return wrapper
pipeline = connect([add, prod, power])
input = 1
output = asyncio.get_event_loop().run_until_complete(pipeline(input))
print(output)
Run Code Online (Sandbox Code Playgroud)
当然,这是有效的,但问题是如果我想在此管道中添加另一个函数(或从中弹出一个函数),我必须再次反汇编并重新连接每个函数.
我想知道是否有更好的方案或设计模式来创建这样的管道?
我以前做过类似的事情,只使用多处理库。它有点手动,但它使您能够轻松地创建和修改您的管道,正如您在问题中所要求的那样。
这个想法是创建可以存在于多处理池中的函数,它们唯一的参数是输入队列和输出队列。您可以通过将不同的队列传递给这些阶段来将这些阶段联系在一起。每个阶段在其输入队列上接收一些工作,做一些更多的工作,并将结果通过其输出队列传递到下一个阶段。
工作人员不断尝试从他们的队列中获取一些东西,当他们得到一些东西时,他们会做自己的工作并将结果传递给下一个阶段。所有工作以通过管道传递“毒丸”结束,导致所有阶段退出:
这个例子只是在多个工作阶段构建一个字符串:
import multiprocessing as mp
POISON_PILL = "STOP"
def stage1(q_in, q_out):
while True:
# get either work or a poison pill from the previous stage (or main)
val = q_in.get()
# check to see if we got the poison pill - pass it along if we did
if val == POISON_PILL:
q_out.put(val)
return
# do stage 1 work
val = val + "Stage 1 did some work.\n"
# pass the result to the next stage
q_out.put(val)
def stage2(q_in, q_out):
while True:
val = q_in.get()
if val == POISON_PILL:
q_out.put(val)
return
val = val + "Stage 2 did some work.\n"
q_out.put(val)
def main():
pool = mp.Pool()
manager = mp.Manager()
# create managed queues
q_main_to_s1 = manager.Queue()
q_s1_to_s2 = manager.Queue()
q_s2_to_main = manager.Queue()
# launch workers, passing them the queues they need
results_s1 = pool.apply_async(stage1, (q_main_to_s1, q_s1_to_s2))
results_s2 = pool.apply_async(stage2, (q_s1_to_s2, q_s2_to_main))
# Send a message into the pipeline
q_main_to_s1.put("Main started the job.\n")
# Wait for work to complete
print(q_s2_to_main.get()+"Main finished the job.")
q_main_to_s1.put(POISON_PILL)
pool.close()
pool.join()
return
if __name__ == "__main__":
main()
Run Code Online (Sandbox Code Playgroud)
代码产生以下输出:
主要开始工作。
第一阶段做了一些工作。
第 2 阶段做了一些工作。
主要完成了工作。
您可以轻松地在管道中放置更多阶段或通过更改哪些函数获得哪些队列来重新排列它们。我对asyncio模块不是很熟悉,所以我不能说使用多处理库会丢失哪些功能,但是这种方法非常易于实现和理解,所以我喜欢它的简单性。
我不知道这是否是最好的方法,但这是我的解决方案。
虽然我认为可以使用列表或字典来控制管道,但我发现使用生成器更容易、更有效。
考虑以下生成器:
def controller():
old = value = None
while True:
new = (yield value)
value = old
old = new
Run Code Online (Sandbox Code Playgroud)
这基本上是一个单元素队列,它存储您发送的值并在下一次调用send(或next) 时释放它。
例子:
>>> c = controller()
>>> next(c) # prime the generator
>>> c.send(8) # send a value
>>> next(c) # pull the value from the generator
8
Run Code Online (Sandbox Code Playgroud)
通过将管道中的每个协程与其控制器相关联,我们将拥有一个外部句柄,可用于推送每个协程的目标。我们只需要定义我们的协程,让它们每个周期都会从我们的控制器中提取新的目标。
现在考虑以下协程:
def source(controller):
while True:
target = next(controller)
print("source sending to", target.__name__)
yield (yield from target)
def add():
return (yield) + 1
def prod():
return (yield) * 2
Run Code Online (Sandbox Code Playgroud)
源是一个协程,return它不会在第一个周期后自行终止。其他协程是“接收器”,不需要控制器。您可以在管道中使用这些协程,如下例所示。我们最初设置了一条路线source --> add,在收到第一个结果后,我们将路线更改为source --> prod。
# create a controller for the source and prime it
cont_source = controller()
next(cont_source)
# create three coroutines
# associate the source with its controller
coro_source = source(cont_source)
coro_add = add()
coro_prod = prod()
# create a pipeline
cont_source.send(coro_add)
# prime the source and send a value to it
coro_source.send(None)
print("add =", coro_source.send(4))
# change target of the source
cont_source.send(coro_prod)
# reset the source, send another value
coro_source.send(None)
print("prod =", coro_source.send(8))
Run Code Online (Sandbox Code Playgroud)
输出:
source sending to add
add = 5
source sending to prod
prod = 16
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2712 次 |
| 最近记录: |