pau*_*aul 5 java reactive-streams java-9 java-flow
我一直在玩Java Flow操作系统,offer但在阅读完文档后做了我的测试我不明白.
在这里我的测试
@Test
public void offer() throws InterruptedException {
//Create Publisher for expected items Strings
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
//Register Subscriber
publisher.subscribe(new CustomSubscriber<>());
publisher.subscribe(new CustomSubscriber<>());
publisher.subscribe(new CustomSubscriber<>());
publisher.offer("item", (subscriber, value) -> false);
Thread.sleep(500);
}
Run Code Online (Sandbox Code Playgroud)
offer服务器接收一个要发出的项和一个BiPredicate函数,据我所知阅读文档,只有在谓词函数为true的情况下才会发出它.
Bur通过测试结果是
Subscription done:
Subscription done:
Subscription done:
Got : item --> onNext() callback
Got : item --> onNext() callback
Got : item --> onNext() callback
Run Code Online (Sandbox Code Playgroud)
结果没有变化,如果不是假,我返回true.
任何人都可以向我解释一下这个算子好一点.
不,谓词函数用于决定是否重试文档中提到的发布操作:
onDrop- 如果为非null,则处理程序在下降到订阅者时调用,具有订阅者和项目的参数; 如果它返回true,则重新尝试(一次)
它不会影响最初是否要发送该项目.
编辑:使用该offer方法时如何发生下降的示例
我想出了一个示例,说明调用offer方法时如何发生丢弃.我不认为输出是100%确定性的,但是当它运行几次时有明显的区别.您可以只更改处理程序以返回true而不是false,以查看重试如何减少饱和缓冲区导致的丢弃.在此示例中,通常会发生丢弃,因为最大缓冲区容量明显很小(传递给构造函数SubmissionPublisher).但是在小睡眠期后启用重试时,会删除掉落:
public class SubmissionPubliserDropTest {
public static void main(String[] args) throws InterruptedException {
// Create Publisher for expected items Strings
// Note the small buffer max capacity to be able to cause drops
SubmissionPublisher<String> publisher =
new SubmissionPublisher<>(ForkJoinPool.commonPool(), 2);
// Register Subscriber
publisher.subscribe(new CustomSubscriber<>());
publisher.subscribe(new CustomSubscriber<>());
publisher.subscribe(new CustomSubscriber<>());
// publish 3 items for each subscriber
for(int i = 0; i < 3; i++) {
int result = publisher.offer("item" + i, (subscriber, value) -> {
// sleep for a small period before deciding whether to retry or not
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
return false; // you can switch to true to see that drops are reduced
});
// show the number of dropped items
if(result < 0) {
System.err.println("dropped: " + result);
}
}
Thread.sleep(3000);
publisher.close();
}
}
class CustomSubscriber<T> implements Flow.Subscriber<T> {
private Subscription sub;
@Override
public void onComplete() {
System.out.println("onComplete");
}
@Override
public void onError(Throwable th) {
th.printStackTrace();
sub.cancel();
}
@Override
public void onNext(T arg0) {
System.out.println("Got : " + arg0 + " --> onNext() callback");
sub.request(1);
}
@Override
public void onSubscribe(Subscription sub) {
System.out.println("Subscription done");
this.sub = sub;
sub.request(1);
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
546 次 |
| 最近记录: |