Java 9 - 发布者和订阅者的工作方式

sec*_*tar 7 publisher subscriber subscription java-9

我想了解如何SubscriberPublisher在工作的java 9.

这里我在subscriber这里创建了一个并SubmissionPublisher用于发布项目.

我正在尝试发布100个字符串subscriber.如果我不编写Client程序sleep(请参阅注释代码MyReactiveApp),我看不到所有项目都已发布.

为什么不等待这里处理的所有字符串:

strs.stream().forEach(i -> publisher.submit(i)); // what happens here? 
Run Code Online (Sandbox Code Playgroud)

如果我用上面的代码替换,我看到所有字符串都在控制台中打印

strs.stream().forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)

使用发布的客户端程序SubmissionPublisher.

import java.util.List;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class MyReactiveApp {

    public static void main(String args[]) throws InterruptedException {

        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

        MySubscriber subs = new MySubscriber();
        publisher.subscribe(subs);


        List<String> strs = getStrs();

        System.out.println("Publishing Items to Subscriber");
        strs.stream().forEach(i -> publisher.submit(i));

        /*while (strs.size() != subs.getCounter()) {
            Thread.sleep(10);
        }*/

        //publisher.close();

        System.out.println("Exiting the app");

    }

    private static List<String> getStrs(){

        return Stream.generate(new Supplier<String>() {
            int i =1;
            @Override
            public String get() {
                return "name "+ (i++);
            }
        }).limit(100).collect(Collectors.toList());
    }

}
Run Code Online (Sandbox Code Playgroud)

订户

import java.util.concurrent.Flow.Subscription;

public class MySubscriber implements java.util.concurrent.Flow.Subscriber<String>{

    private Subscription subscription;

    private int counter = 0;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(100);

    }

    @Override
    public void onNext(String item) {
        System.out.println(this.getClass().getSimpleName()+" item "+item);
        //subscription.request(1);
        counter++;

    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println(this.getClass().getName()+ " an error occured "+throwable);

    }

    @Override
    public void onComplete() {
        System.out.println("activity completed");

    }
    public int getCounter() {
        return counter;
    }

}
Run Code Online (Sandbox Code Playgroud)

输出:

Publishing Items to Subscriber
MySubscriber item name 1
MySubscriber item name 2
MySubscriber item name 3
MySubscriber item name 4
MySubscriber item name 5
Exiting the app
MySubscriber item name 6
MySubscriber item name 7
MySubscriber item name 8
MySubscriber item name 9
MySubscriber item name 10
MySubscriber item name 11
MySubscriber item name 12
Run Code Online (Sandbox Code Playgroud)

Ste*_*cht 6

SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
Run Code Online (Sandbox Code Playgroud)

使用 ForkJoinPool.commonPool() 创建一个新的 SubmissionPublisher 以异步交付给订阅者

请参阅: https: //docs.oracle.com/javase/9 ​​/docs/api/java/util/concurrent/SubmissionPublisher.html#SubmissionPublisher--

所以实际上

    strs.stream().forEach(i -> publisher.submit(i));
Run Code Online (Sandbox Code Playgroud)

将所有提交入队并在另一个线程上异步传递它们。但随后该应用程序被终止。这与工作线程的进度无关。这意味着无论工作线程已经传递了多少元素,应用程序都会终止。

每次运行的情况可能有所不同。在最坏的情况下,应用程序可能会在第一个项目交付之前终止。

线程数

如果你想验证 MyReactiveApp 的 main 方法和 MySubscriber 的 onNext 中的传递是否发生在不同的线程上,你可以打印出相应线程的名称,例如在 MyReactiveApp 的 main 中:

System.out.println(Thread.currentThread().getName()) 
Run Code Online (Sandbox Code Playgroud)

将输出main为线程名称。

而 MySubscriber 的 onNext 方法将输出类似的内容ForkJoinPool.commonPool-worker-1

用户线程和守护线程

尽管我们仍然有一个正在运行的线程,为什么应用程序会终止?

Java中有两种线程:

  • 用户线程
  • 守护线程

当不再有任何用户线程运行时,即使守护线程仍在运行,Java 程序也会终止。

主线程是用户线程。SubmissionPublisher 在这里使用来自 ForkJoinPool.commonPool() 的工作线程。这些是守护线程。

所有工作线程均通过 Thread.isDaemon() 设置为 true 进行初始化。

https://docs.oracle.com/javase/9​​/docs/api/java/util/concurrent/ForkJoinPool.html