nuv*_*vio 5 reactive-programming reactive-streams java-9
我正在尝试 Java 9 中的一些新功能。所以我组织了一个测试,让发布者以给定的速率发出数字。我还实现了一个订阅者来收听这些出版物并将它们打印到控制台。
虽然我可能不完全理解如何使用这个 Api,因为该onNext()方法不打印任何内容,getLastItem()只返回 0。
唯一似乎有效的部分是onSubscribe()正确初始化lastItem变量的部分。
@Test
public void testReactiveStreams(){
//Create Publisher
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
//Register Subscriber
TestIntegerSubscriber subscriber = new TestIntegerSubscriber();
publisher.subscribe(subscriber);
assertTrue(publisher.hasSubscribers());
//Publish items
System.out.println("Publishing Items...");
List.of(1,2,3,4,5).stream().forEach(i -> {
publisher.submit(i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// blah
}
});
assertEquals(5, subscriber.getLastItem());
publisher.close();
}
private class TestIntegerSubscriber implements Flow.Subscriber<Integer> {
private int lastItem;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("Subscribed");
lastItem = 0;
}
@Override
public void onNext(Integer item) {
System.out.println("Received : "+item);
lastItem += 1; // expect increment by 1
assertTrue(lastItem == item);
}
@Override
public void onError(Throwable throwable) {
// nothing for the moment
}
@Override
public void onComplete() {
System.out.println("Completed");
}
public int getLastItem(){
return lastItem;
}
}
Run Code Online (Sandbox Code Playgroud)
有人可以告诉我我在测试中做错了什么吗?我希望测试打印这些数字并返回 5 作为最后一项。
我不得不说我只在 Angular2 中使用 Observables 和 Subjects,尽管它们看起来更容易理解。
Subscription我做了一些更多的研究,我可以看到可以通过在实现中添加成员来使代码正常工作,Subscriber如下所示:
public class MySubscriber<T> implements Subscriber<T> {
private Subscription subscription;
Run Code Online (Sandbox Code Playgroud)
...
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.println("Subscribed");
subscription.request(1);
lastItem = 0;
}
Run Code Online (Sandbox Code Playgroud)
...
@Override
public void onNext(Integer item) {
System.out.println("Received : "+item);
lastItem += 1; // expect increment by 1
assertTrue(lastItem == item);
subscription.request(1);
}
Run Code Online (Sandbox Code Playgroud)
我认为subscription.request(1); 这意味着我正在消费出版商的 1 件商品。我不知道……如果有人向我澄清的话,那就太好了。