phd*_*ign 5 python multithreading python-multithreading rx-py reactivex
基于这个优秀的 SO 答案,我可以在 RxPy 中并行处理多个任务,我的问题是你如何等待它们全部完成?我知道我可以使用线程,.join()但是 Rx 调度器似乎没有任何这样的选项。.to_blocking()也无济于事, MainThread 在触发所有通知并调用完整处理程序之前完成。下面是一个例子:
from __future__ import print_function
import os, sys
import time
import random
from rx import Observable
from rx.core import Scheduler
from threading import current_thread
def printthread(val):
print("{}, thread: {}".format(val, current_thread().name))
def intense_calculation(value):
printthread("calc {}".format(value))
time.sleep(random.randint(5, 20) * .1)
return value
if __name__ == "__main__":
Observable.range(1, 3) \
.select_many(lambda i: Observable.start(lambda: intense_calculation(i), scheduler=Scheduler.timeout)) \
.observe_on(Scheduler.event_loop) \
.subscribe(
on_next=lambda x: printthread("on_next: {}".format(x)),
on_completed=lambda: printthread("on_completed"),
on_error=lambda err: printthread("on_error: {}".format(err)))
printthread("\nAll done")
# time.sleep(2)
Run Code Online (Sandbox Code Playgroud)
calc 1, thread: Thread-1
calc 2, thread: Thread-2
calc 3, thread: Thread-3
on_next: 2, thread: Thread-4
on_next: 3, thread: Thread-4
on_next: 1, thread: Thread-4
on_completed, thread: Thread-4
All done, thread: MainThread
Run Code Online (Sandbox Code Playgroud)
calc 1, thread: Thread-1
calc 2, thread: Thread-2
calc 3, thread: Thread-3
All done, thread: MainThread
Run Code Online (Sandbox Code Playgroud)
calc 1, thread: Thread-1
calc 2, thread: Thread-2
calc 3, thread: Thread-3
All done, thread: MainThread
on_next: 2, thread: Thread-4
on_next: 3, thread: Thread-4
on_next: 1, thread: Thread-4
on_completed, thread: Thread-4
Run Code Online (Sandbox Code Playgroud)
在此处发布完整的解决方案:
from __future__ import print_function
import os, sys
import time
import random
from rx import Observable
from rx.core import Scheduler
from threading import current_thread
from rx.concurrency import ThreadPoolScheduler
def printthread(val):
print("{}, thread: {}".format(val, current_thread().name))
def intense_calculation(value):
printthread("calc {}".format(value))
time.sleep(random.randint(5, 20) * .1)
return value
if __name__ == "__main__":
scheduler = ThreadPoolScheduler(4)
Observable.range(1, 3) \
.select_many(lambda i: Observable.start(lambda: intense_calculation(i), scheduler=scheduler)) \
.observe_on(Scheduler.event_loop) \
.subscribe(
on_next=lambda x: printthread("on_next: {}".format(x)),
on_completed=lambda: printthread("on_completed"),
on_error=lambda err: printthread("on_error: {}".format(err)))
printthread("\nAll done")
scheduler.executor.shutdown()
# time.sleep(2)
Run Code Online (Sandbox Code Playgroud)
小智 3
对于ThreadPoolScheduler,您可以:
scheduler.executor.shutdown()然后,一旦全部完成,您就可以得到所有结果。