根据文件:
Flux是一个可以发出0..N元素的流:
Flux<String> fl = Flux.just("a", "b", "c");
Run Code Online (Sandbox Code Playgroud)
Mono是一个0..1元素的流:
Mono<String> mn = Mono.just("hello");
Run Code Online (Sandbox Code Playgroud)
因为两者都是反应流中Publisher接口的实现.
我们不能在大多数情况下只使用Flux,因为它也可以发出0..1,从而满足Mono的条件?
或者只有一些特定的条件,只需要使用Mono并且Flux无法处理操作?请建议.
接收器和用户的概念看起来与我类似.此外,我没有看到在反应流规范中明确定义接收器的概念.
Sinks.Many<String>
在使用向多个订阅者通知某些事件时,我遇到了一种我不明白的行为:
fun main() {
val sink : Sinks.Many<String> = Sinks.many().multicast().onBackpressureBuffer()
val flux = sink.asFlux().log()
val d = flux.subscribe {
println("--> $it")
}
sink.emitNext("1", Sinks.EmitFailureHandler.FAIL_FAST)
val d2 = flux.subscribe {
println("--2> $it")
}
sink.emitNext("2", Sinks.EmitFailureHandler.FAIL_FAST)
}
Run Code Online (Sandbox Code Playgroud)
此代码显示第一个订阅者获取值 1 和 2,第二个订阅者获取值 2。到目前为止一切顺利:
11:49:06.936 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:49:06.938 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(1)
--> 1
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:49:06.943 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(2)
--> …
Run Code Online (Sandbox Code Playgroud) 我正在使用akka streams graphDSL来创建一个可运行的图.流组件的入口/出口没有编译时错误.运行时抛出以下错误:
任何想法我应该验证什么才能让它运行?
requirement failed: The inlets [] and outlets [] must correspond to the inlets [in] and outlets [out]
at scala.Predef$.require(Predef.scala:219)
at akka.stream.Shape.requireSamePortsAs(Shape.scala:168)
at akka.stream.impl.StreamLayout$CompositeModule.replaceShape(StreamLayout.scala:390)
at akka.stream.scaladsl.GraphApply$class.create(GraphApply.scala:18)
at akka.stream.scaladsl.GraphDSL$.create(Graph.scala:813)
at com.flipkart.connekt.busybees.streams.Topology$.bootstrap(Topology.scala:109)
at com.flipkart.connekt.busybees.BusyBeesBoot$.start(BusyBeesBoot.scala:65)
at com.flipkart.connekt.boot.Boot$.delayedEndpoint$com$flipkart$connekt$boot$Boot$1(Boot.scala:39)
at com.flipkart.connekt.boot.Boot$delayedInit$body.apply(Boot.scala:13)
Run Code Online (Sandbox Code Playgroud)
图结构:
source ~> flowRate ~> render ~> platformPartition.in
platformPartition.out(0) ~> formatIOS ~> apnsDispatcher ~> apnsEventCreator ~> merger.in(0)
platformPartition.out(1) ~> formatAndroid ~> httpDispatcher ~> gcmPoolFlow ~> rHandlerGCM ~> merger.in(1)
merger.out ~> evtCreator ~> Sink.ignore
Run Code Online (Sandbox Code Playgroud) 在Akka流中,Mat [Out,Mat]或Sink [In,Mat]中的Mat代表什么.它什么时候才会被使用?
如何在RxJava版本2中将Observable转换为Publisher?
在第一个版本中,我们有https://github.com/ReactiveX/RxJavaReactiveStreams项目,它完全符合我的需要.但是我怎么能在RxJava 2中做到这一点?
java reactive-programming reactive-streams reactivex rx-java2
我的问题是关于与集团模式一起使用的导航.
在我的LoginScreen小部件中,我有一个按钮,可以将事件添加到集团的EventSink中.bloc调用API并验证用户身份.问题是在LoginScreen Widget中我必须监听流,以及在返回成功状态后如何导航到另一个屏幕.
希望我清楚自己.谢谢!
我正试图用来Flux.buffer()
从数据库中批量加载。
用例是从数据库加载记录可能是“突发的”,我想引入一个小缓冲区,以便在可能的情况下将加载分组在一起。
我的概念方法是使用某种形式的处理器,发布到它的接收器,让该缓冲区,然后订阅并过滤所需的结果。
我尝试了多种不同的方法(不同类型的处理器,以不同的方式创建过滤后的Mono)。
下面是我到目前为止所到达的地方-主要是绊脚石。
当前,这将返回一个结果,但是后续的调用将被丢弃(尽管我不确定在哪里)。
class BatchLoadingRepository {
// I've tried all manner of different processors here. I'm unsure if
// TopicProcessor is the correct one to use.
private val bufferPublisher = TopicProcessor.create<String>()
private val resultsStream = bufferPublisher
.bufferTimeout(50, Duration.ofMillis(50))
// I'm unsure if concatMapIterable is the correct operator here,
// but it seems to work.
// I'm really trying to turn the List<MyEntity>
// into a stream of MyEntity, published on the Flux<>
.concatMapIterable { requestedIds …
Run Code Online (Sandbox Code Playgroud) 我正在尝试使用Akka HTTP 2.0-M2编写批量数据上传工具.但我正面临着akka.stream.OverflowStrategy$Fail$BufferOverflowException: Exceeded configured max-open-requests value of [32] error.
我试图隔离一个问题,这里的示例代码也失败了:
public class TestMaxRequests {
private static final class Router extends HttpApp {
@Override
public Route createRoute() {
return route(
path("test").route(
get(handleWith(ctx -> ctx.complete("OK")))
)
);
}
}
public static void main(String[] args) {
ActorSystem actorSystem = ActorSystem.create();
Materializer materializer = ActorMaterializer.create(actorSystem);
Router router = new Router();
router.bindRoute("127.0.0.1", 8082, actorSystem);
LoggingAdapter log = Logging.getLogger(actorSystem, new Object());
for (int i = 0; i < 100; i++) {
final int reqNum …
Run Code Online (Sandbox Code Playgroud) 我有一个Project Reactor链,它包含一个阻塞任务(网络调用,我们需要等待响应).我想同时运行多个阻塞任务.
似乎可以使用ParallelFlux或flatMap(),裸骨示例:
Flux.just(1)
.repeat(10)
.parallel(3)
.runOn(Schedulers.elastic())
.doOnNext(i -> blockingTask())
.sequential()
.subscribe()
Run Code Online (Sandbox Code Playgroud)
要么
Flux.just(1)
.repeat(10)
.flatMap(i -> Mono.fromCallable(() -> {blockingTask(); return i;}).subscribeOn(Schedulers.elastic()), 3)
.subscribe();
Run Code Online (Sandbox Code Playgroud)
这两种技术的优点是什么?一个比另一个更受欢迎吗?还有其他选择吗?
reactive-streams ×10
akka-stream ×4
java ×3
akka ×2
kotlin ×2
akka-http ×1
dart ×1
dart-async ×1
flutter ×1
reactivex ×1
rx-java2 ×1
scala ×1
typesafe ×1