RxPY 中带有 from_iterable/range 的 subscribe_on

Phi*_*lab 5 python scheduler reactive-programming rx-py

我正在尝试了解 python 反应式扩展的调度。我想用来subscribe_on并行处理多个可观察量。如果可观察量是使用 创建的,则效果很好,但如果使用just例如range或 ,则效果不佳。from_

just默认为Scheduler.immediate,而其他生成器默认为Scheduler.current_thread。这导致了差异,但对我来说感觉不一致。可能是因为我没有掌握完整的问题。
考虑以下示例:

import rx
from rx.concurrency.scheduler import Scheduler
import time
import threading


def work(x):
    print "processing %s on thread %s" % (x, threading.currentThread().name)
    time.sleep(1)


def finish(x):
    print "finished %s on thread %s" % (x, threading.currentThread().name)


# Creates new thread (I like)
rx.Observable.just(3)\
    .do_action(work)\
    .subscribe_on(Scheduler.new_thread)\
    .subscribe(finish)

# Runs on MainThread (I don't like)
rx.Observable.range(1, 3) \
    .do_action(work) \
    .subscribe_on(Scheduler.new_thread) \
    .subscribe(finish)
Run Code Online (Sandbox Code Playgroud)

它可以与调度程序一起使用observe_on,或者如果调度程序直接传递给生成器,但我想将可观察的创建与处理分离并实现如下所示:

import rx
from rx.concurrency.scheduler import Scheduler
import time
import threading


def work(x):
    print "processing %s on thread %s" % (x, threading.currentThread().name)
    time.sleep(1)


def finish(x):
    print "finished %s on thread %s" % (x, threading.currentThread().name)


def factory_single():
    return rx.Observable.just(1).do_action(work)


def factory_multiple():
    return rx.Observable.range(2, 4).do_action(work)


def process(factory):
    factory().subscribe_on(Scheduler.new_thread).subscribe(finish)

# Creates a new thread (I like)
process(factory_single)

# Runs on MainThread (I don't like)
process(factory_multiple)
Run Code Online (Sandbox Code Playgroud)

我是不是误会了subscribe_on?我的做法有错吗?

Yar*_*hiy 4

您的示例中有三个可以独立安排的操作:

  1. 数据馈送操作。justrange默认使用不同的调度器,但它们之间没有太大区别。两者都在当前线程上提供初始值。您可以通过将其作为参数传递给这些方法来将其默认调度程序覆盖为您想要的任何内容。

  2. 订阅行动。Scheduler.current_thread默认使用。即它与数据馈送操作在同一线程上执行。可以通过subscribe_on方法重写。

  3. 观察 ( on_next, on_error, on_completed) 动作。Scheduler.current_thread默认使用。即它与订阅操作在同一线程上执行。可以通过observe_on方法重写。

如果您仅针对其中一项操作覆盖调度程序,则其他操作应遵循上述操作。

关于调度程序

Scheduler.immediate并没有真正安排任何事情。它立即在调度的同一线程上调用操作。

Scheduler.current_thread通过对操作进行排队来避免递归,但仍然在调度的同一线程上调用操作。

Scheduler.new_thread启动单个后台线程来依次执行操作。

Scheduler.timeout为需要执行的每个操作启动新的后台线程。

尝试并行处理

在不同线程中调度工作的最合适的方法似乎是observe_on

thread_pool但问题是RxPy 目前没有调度程序。new_thread调度程序只启动一个线程,因此它不会对您有太大帮助。

timeout调度程序可用于并行,但它无法控制并发线程的数量,因此并发任务数量的爆炸性增长可能会溢出内存并导致系统崩溃。

不是observe_on中的错误

我尝试运行您的示例,observe_on(Scheduler.timeout)但任务仍然没有并行进行。在查看 RxPy 源代码后,我发现它仅在当前事件完成后才安排下一个事件,这实际上禁用了并行处理。我的第一反应是报告实施中的错误observe_on

但经过进一步调查,我发现串行执行不是一个错误,而是预期的行为

并行执行任务的正确方法

这是有效的代码(基于此答案):

Observable.range(1, 3) \
  .select_many(lambda i: Observable.start(lambda: work(i), scheduler=Scheduler.timeout)) \
  .observe_on(Scheduler.event_loop) \
  .subscribe(finish)
Run Code Online (Sandbox Code Playgroud)

Observable.start创建异步可观察量,并通过 安排在单独的线程上Scheduler.timeout

observe_on(Scheduler.event_loop)是可选的。它强制finish在同一线程上调用所有项目的方法。

请注意,不能保证该finish方法按初始range顺序调用。