Java 9 Flow SubmissionPublisher提供方法的行为

pau*_*aul 5 java reactive-streams java-9 java-flow

我一直在玩Java Flow操作系统,offer但在阅读完文档后做了我的测试我不明白.

在这里我的测试

@Test
public void offer() throws InterruptedException {
    //Create Publisher for expected items Strings
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    //Register Subscriber
    publisher.subscribe(new CustomSubscriber<>());
    publisher.subscribe(new CustomSubscriber<>());
    publisher.subscribe(new CustomSubscriber<>());
    publisher.offer("item", (subscriber, value) -> false);
    Thread.sleep(500);
}
Run Code Online (Sandbox Code Playgroud)

offer服务器接收一个要发出的项和一个BiPredicate函数,据我所知阅读文档,只有在谓词函数为true的情况下才会发出它.

Bur通过测试结果是

Subscription done:
Subscription done:
Subscription done:
Got : item --> onNext() callback
Got : item --> onNext() callback
Got : item --> onNext() callback
Run Code Online (Sandbox Code Playgroud)

结果没有变化,如果不是假,我返回true.

任何人都可以向我解释一下这个算子好一点.

man*_*uti 5

不,谓词函数用于决定是否重试文档中提到的发布操作:

onDrop - 如果为非null,则处理程序在下降到订阅者时调用,具有订阅者和项目的参数; 如果它返回true,则重新尝试(一次)

它不会影响最初是否要发送该项目.

编辑:使用该offer方法时如何发生下降的示例

我想出了一个示例,说明调用offer方法时如何发生丢弃.我不认为输出是100%确定性的,但是当它运行几次时有明显的区别.您可以只更改处理程序以返回true而不是false,以查看重试如何减少饱和缓冲区导致的丢弃.在此示例中,通常会发生丢弃,因为最大缓冲区容量明显很小(传递给构造函数SubmissionPublisher).但是在小睡眠期后启用重试时,会删除掉落:

public class SubmissionPubliserDropTest {

    public static void main(String[] args) throws InterruptedException {
        // Create Publisher for expected items Strings
        // Note the small buffer max capacity to be able to cause drops
        SubmissionPublisher<String> publisher =
                               new SubmissionPublisher<>(ForkJoinPool.commonPool(), 2);
        // Register Subscriber
        publisher.subscribe(new CustomSubscriber<>());
        publisher.subscribe(new CustomSubscriber<>());
        publisher.subscribe(new CustomSubscriber<>());
        // publish 3 items for each subscriber
        for(int i = 0; i < 3; i++) {
            int result = publisher.offer("item" + i, (subscriber, value) -> {
                // sleep for a small period before deciding whether to retry or not
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return false;  // you can switch to true to see that drops are reduced
            });
            // show the number of dropped items
            if(result < 0) {
                System.err.println("dropped: " + result);
            }
        }
        Thread.sleep(3000);
        publisher.close();
    }
}

class CustomSubscriber<T> implements Flow.Subscriber<T> {

    private Subscription sub;

    @Override
    public void onComplete() {
        System.out.println("onComplete");
    }

    @Override
    public void onError(Throwable th) {
        th.printStackTrace();
        sub.cancel();
    }

    @Override
    public void onNext(T arg0) {
        System.out.println("Got : " + arg0 + " --> onNext() callback");
        sub.request(1);
    }

    @Override
    public void onSubscribe(Subscription sub) {
        System.out.println("Subscription done");
        this.sub = sub;
        sub.request(1);
    }

}
Run Code Online (Sandbox Code Playgroud)