RxJava .subscribeOn(Schedulers.newThread())问题

wuj*_*jek 1 java rx-java

我在普通的JDK 8上.我有这个简单的RxJava示例:

Observable
.from(Arrays.asList("one", "two", "three"))
.doOnNext(word -> System.out.printf("%s uses thread %s%n", word, Thread.currentThread().getName()))
//.subscribeOn(Schedulers.newThread())
.subscribe(word -> System.out.println(word));
Run Code Online (Sandbox Code Playgroud)

它会逐行打印出单词,与线程信息交织在一起,这对于所有下一次调用都是"主要的",如预期的那样.

但是,当我取消注释subscribeOn(Schedulers.newThread())通话时,根本不会打印任何内容.为什么不工作?我原以为它会为每个onNext()调用启动一个新线程并doOnNext()打印该线程的名称.现在,我没有看到任何内容,也适用于其他调度程序.

当我Thread.sleep(10000L)在main的末尾添加调用时,我可以看到输出,这表明RxJava使用的线程都是守护进程.是这样的吗?可以以某种方式更改,但使用自定义ThreadFactory或类似的概念,而不必实现自定义调度程序?

通过上述更改,线程名称始终是RxNewThreadScheduler-1,而newThread的文档说" {@link Thread}为每个工作单元创建新的调度程序".是不是应该为所有排放创建一个新线程?

aka*_*okd 6

正如Vladimir所提到的,RxJava标准调度程序在守护程序线程上运行,后者在您的示例中终止,因为主线程退出.我想强调的是,他们没有在新线程上安排每个值,但是他们在新创建的线程上为每个订阅者安排值流.第二次订阅会给你"RxNewThreadScheduler-2".

您实际上不需要更改默认调度程序,而只需使用Schedulers.from()包装您自己的基于Executor的调度程序,并将其作为参数提供给所需的:

ThreadPoolExecutor exec = new ThreadPoolExecutor(
        0, 64, 2, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
exec.allowCoreThreadTimeOut(true);

Scheduler s = Schedulers.from(exec);

Observable
.from(Arrays.asList("one", "two", "three"))
.doOnNext(word -> System.out.printf("%s uses thread %s%n", word,
    Thread.currentThread().getName()))
.subscribeOn(s)
.subscribe(word -> System.out.println(word));
Run Code Online (Sandbox Code Playgroud)

我有一系列关于RxJava调度程序的博客文章,它们可以帮助您实现"更永久"的变体.