我一直在玩Java Flow操作系统,offer但在阅读完文档后做了我的测试我不明白.
在这里我的测试
@Test
public void offer() throws InterruptedException {
//Create Publisher for expected items Strings
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
//Register Subscriber
publisher.subscribe(new CustomSubscriber<>());
publisher.subscribe(new CustomSubscriber<>());
publisher.subscribe(new CustomSubscriber<>());
publisher.offer("item", (subscriber, value) -> false);
Thread.sleep(500);
}
Run Code Online (Sandbox Code Playgroud)
offer服务器接收一个要发出的项和一个BiPredicate函数,据我所知阅读文档,只有在谓词函数为true的情况下才会发出它.
Bur通过测试结果是
Subscription done:
Subscription done:
Subscription done:
Got : item --> onNext() callback
Got : item --> onNext() callback
Got : item --> onNext() callback
Run Code Online (Sandbox Code Playgroud)
结果没有变化,如果不是假,我返回true.
任何人都可以向我解释一下这个算子好一点.
我再次将RxJava与Java 9 Flow进行比较.我看到Flow默认是异步的,我想知道是否有办法让它同步运行.
有时我们只是想将它用于Nio而不是糖语法,并且具有更加同质的代码.
在RxJava中,默认情况下是同步的,您可以使用observerOn和subscribeOn在管道中异步运行它.
Flow中是否有任何运算符使其在主线程中运行?
问候.
我正在使用 JDK 9 Flow API 创建一个用户事件系统,所以我有一个房间(它实现了Flow.Subscriber<Notification>),它可能有很多用户,每个用户都可以随时提供(调度)更新。
当用户进入房间时,我订阅房间的更新user.subscribe(this)。但是没有退订,当用户离开房间时我如何退订?
public abstract class Room implements Flow.Subscriber<Notification> {
private Flow.Subscription subscription;
public void addUser(User user) {
user.subscribe(this);
}
public void removeUser(User user) {
// How can I unsubscribe the user?
}
@Override
public void onSubscribe(final Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onError(final Throwable throwable) {
// ...
}
@Override
public void onNext(final Notification notification) {
// ...
subscription.request(1);
}
@Override
public void onComplete() {
// …Run Code Online (Sandbox Code Playgroud) java publish-subscribe java.util.concurrent java-9 java-flow
在测试中,我想查看 HttpRequest 的主体内部。我想将身体作为字符串。似乎唯一的方法是订阅 BodyPublisher 但这如何工作?