我是Reactive programming和Spring WebFlux的新手.我想让我的App 1通过Flux发布Server Sent事件,我的App 2继续收听它.
我希望Flux按需发布(例如,当发生某些事情时).我发现的所有示例都是使用Flux.interval来定期发布事件,并且一旦创建了Flux中的内容,似乎无法附加/修改内容.
我怎样才能实现目标?或者我在概念上完全错了.
Ole*_*uka 28
FluxProcessor和"动态"发布FluxSink手动提供数据的技术之一Flux是使用FluxProcessor#sink方法,如以下示例所示
@SpringBootApplication
@RestController
public class DemoApplication {
final FluxProcessor processor;
final FluxSink sink;
final AtomicLong counter;
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
public DemoApplication() {
this.processor = DirectProcessor.create().serialize();
this.sink = processor.sink();
this.counter = new AtomicLong();
}
@GetMapping("/send")
public void test() {
sink.next("Hello World #" + counter.getAndIncrement());
}
@RequestMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent> sse() {
return processor.map(e -> ServerSentEvent.builder(e).build());
}
}
Run Code Online (Sandbox Code Playgroud)
在这里,我创建DirectProcessor了以支持多个订阅者,这将监听数据流.此外,我提供了额外的功能FluxProcessor#serialize,为多产品提供安全支持(从不同线程调用而不违反Reactive Streams规范规则,尤其是规则1.3).最后,通过调用" http:// localhost:8080/send ",我们将看到该消息Hello World #1(当然,仅在以前连接到" http:// localhost:8080 "的情况下)
小智 5
只是另一个想法,使用 EmitterProcessor 作为通量的网关
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
public class MyEmitterProcessor {
EmitterProcessor<String> emitterProcessor;
public static void main(String args[]) {
MyEmitterProcessor myEmitterProcessor = new MyEmitterProcessor();
Flux<String> publisher = myEmitterProcessor.getPublisher();
myEmitterProcessor.onNext("A");
myEmitterProcessor.onNext("B");
myEmitterProcessor.onNext("C");
myEmitterProcessor.complete();
publisher.subscribe(x -> System.out.println(x));
}
public Flux<String> getPublisher() {
emitterProcessor = EmitterProcessor.create();
return emitterProcessor.map(x -> "consume: " + x);
}
public void onNext(String nextString) {
emitterProcessor.onNext(nextString);
}
public void complete() {
emitterProcessor.onComplete();
}
}
Run Code Online (Sandbox Code Playgroud)
更多信息,请参阅此处的 Reactor 文档。文档本身有一条建议:“大多数时候,您应该尽量避免使用处理器。它们更难正确使用,并且容易出现一些极端情况。” 但我不知道哪种极端情况。
| 归档时间: |
|
| 查看次数: |
3536 次 |
| 最近记录: |