标签: reactive-streams

Mono vs Flux in Reactive Stream

根据文件:

Flux是一个可以发出0..N元素的流:

Flux<String> fl = Flux.just("a", "b", "c");
Run Code Online (Sandbox Code Playgroud)

Mono是一个0..1元素的流:

Mono<String> mn = Mono.just("hello");
Run Code Online (Sandbox Code Playgroud)

因为两者都是反应流中Publisher接口的实现.

我们不能在大多数情况下只使用Flux,因为它也可以发出0..1,从而满足Mono的条件?

或者只有一些特定的条件,只需要使用Mono并且Flux无法处理操作?请建议.

reactive-programming project-reactor reactive-streams

30
推荐指数
3
解决办法
2万
查看次数

在项目反应堆或akka流中,接收器和用户之间的概念差异是什么?

接收器和用户的概念看起来与我类似.此外,我没有看到在反应流规范中明确定义接收器的概念.

project-reactor reactive-streams akka-stream

29
推荐指数
1
解决办法
3267
查看次数

为什么 Sinks.many().multicast().onBackPressureBuffer() 在订阅者之一取消订阅后完成以及如何避免它

Sinks.Many<String>在使用向多个订阅者通知某些事件时,我遇到了一种我不明白的行为:

fun main() {

    val sink : Sinks.Many<String>  = Sinks.many().multicast().onBackpressureBuffer()
    val flux = sink.asFlux().log()

    val d = flux.subscribe {
        println("--> $it")
    }

    sink.emitNext("1", Sinks.EmitFailureHandler.FAIL_FAST)

    val d2 = flux.subscribe {
        println("--2> $it")
    }

    sink.emitNext("2", Sinks.EmitFailureHandler.FAIL_FAST)
}
Run Code Online (Sandbox Code Playgroud)

此代码显示第一个订阅者获取值 1 和 2,第二个订阅者获取值 2。到目前为止一切顺利:

11:49:06.936 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:49:06.938 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(1)
--> 1
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:49:06.943 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(2)
--> …
Run Code Online (Sandbox Code Playgroud)

kotlin project-reactor reactive-streams spring-webflux

19
推荐指数
1
解决办法
6215
查看次数

图表创建错误:要求失败:入口[]和出口[]必须对应于入口[in]和出口[out]

我正在使用akka streams graphDSL来创建一个可运行的图.流组件的入口/出口没有编译时错误.运行时抛出以下错误:

任何想法我应该验证什么才能让它运行?

requirement failed: The inlets [] and outlets [] must correspond to the inlets [in] and outlets [out]
at scala.Predef$.require(Predef.scala:219)
at akka.stream.Shape.requireSamePortsAs(Shape.scala:168)
at akka.stream.impl.StreamLayout$CompositeModule.replaceShape(StreamLayout.scala:390)
at akka.stream.scaladsl.GraphApply$class.create(GraphApply.scala:18)
at akka.stream.scaladsl.GraphDSL$.create(Graph.scala:813)
at com.flipkart.connekt.busybees.streams.Topology$.bootstrap(Topology.scala:109)
at com.flipkart.connekt.busybees.BusyBeesBoot$.start(BusyBeesBoot.scala:65)
at com.flipkart.connekt.boot.Boot$.delayedEndpoint$com$flipkart$connekt$boot$Boot$1(Boot.scala:39)
at com.flipkart.connekt.boot.Boot$delayedInit$body.apply(Boot.scala:13)
Run Code Online (Sandbox Code Playgroud)

图结构:

source ~> flowRate ~> render ~> platformPartition.in
platformPartition.out(0) ~> formatIOS ~> apnsDispatcher ~> apnsEventCreator ~> merger.in(0)
platformPartition.out(1) ~> formatAndroid ~> httpDispatcher ~> gcmPoolFlow ~> rHandlerGCM ~> merger.in(1)
merger.out ~> evtCreator ~> Sink.ignore
Run Code Online (Sandbox Code Playgroud)

scala typesafe reactive-streams akka-stream

14
推荐指数
1
解决办法
1269
查看次数

Akka Streams:Mat代表什么来源[out,Mat]

在Akka流中,Mat [Out,Mat]或Sink [In,Mat]中的Mat代表什么.它什么时候才会被使用?

akka reactive-streams akka-stream

14
推荐指数
1
解决办法
1991
查看次数

RxJava 2.0 - 如何将Observable转换为Publisher

如何在RxJava版本2中将Observable转换为Publisher?

在第一个版本中,我们有https://github.com/ReactiveX/RxJavaReactiveStreams项目,它完全符合我的需要.但是我怎么能在RxJava 2中做到这一点?

java reactive-programming reactive-streams reactivex rx-java2

14
推荐指数
1
解决办法
9249
查看次数

Flutter Bloc Pattern - 如何在流事件后导航到另一个屏幕?

我的问题是关于与集团模式一起使用的导航.

在我的LoginScreen小部件中,我有一个按钮,可以将事件添加到集团的EventSink中.bloc调用API并验证用户身份.问题是在LoginScreen Widget中我必须监听流,以及在返回成功状态后如何导航到另一个屏幕.

希望我清楚自己.谢谢!

dart dart-async reactive-streams flutter

14
推荐指数
4
解决办法
7325
查看次数

使用反应堆的Flux.buffer批处理工作仅适用于单个项目

我正试图用来Flux.buffer()从数据库中批量加载。

用例是从数据库加载记录可能是“突发的”,我想引入一个小缓冲区,以便在可能的情况下将加载分组在一起。

我的概念方法是使用某种形式的处理器,发布到它的接收器,让该缓冲区,然后订阅并过滤所需的结果。

我尝试了多种不同的方法(不同类型的处理器,以不同的方式创建过滤后的Mono)。

下面是我到目前为止所到达的地方-主要是绊脚石。

当前,这将返回一个结果,但是后续的调用将被丢弃(尽管我不确定在哪里)。

class BatchLoadingRepository {
    // I've tried all manner of different processors here.  I'm unsure if
    // TopicProcessor is the correct one to use.
    private val bufferPublisher = TopicProcessor.create<String>()
    private val resultsStream = bufferPublisher
            .bufferTimeout(50, Duration.ofMillis(50))
            // I'm unsure if concatMapIterable is the correct operator here, 
            // but it seems to work.
            // I'm really trying to turn the List<MyEntity> 
            // into a stream of MyEntity, published on the Flux<>
            .concatMapIterable { requestedIds …
Run Code Online (Sandbox Code Playgroud)

java kotlin project-reactor reactive-streams

14
推荐指数
1
解决办法
270
查看次数

如何为多个(10k - 100k)请求正确调用Akka HTTP客户端?

我正在尝试使用Akka HTTP 2.0-M2编写批量数据上传工具.但我正面临着akka.stream.OverflowStrategy$Fail$BufferOverflowException: Exceeded configured max-open-requests value of [32] error.

我试图隔离一个问题,这里的示例代码也失败了:

public class TestMaxRequests {
    private static final class Router extends HttpApp {
        @Override
        public Route createRoute() {
            return route(
                    path("test").route(
                            get(handleWith(ctx -> ctx.complete("OK")))
                    )
            );
        }
    }


    public static void main(String[] args) {
        ActorSystem actorSystem = ActorSystem.create();
        Materializer materializer = ActorMaterializer.create(actorSystem);

        Router router = new Router();
        router.bindRoute("127.0.0.1", 8082, actorSystem);

        LoggingAdapter log = Logging.getLogger(actorSystem, new Object());

        for (int i = 0; i < 100; i++) {
            final int reqNum …
Run Code Online (Sandbox Code Playgroud)

java akka reactive-streams akka-stream akka-http

13
推荐指数
1
解决办法
5013
查看次数

用于阻塞I/O任务的ParallelFlux与flatMap()

我有一个Project Reactor链,它包含一个阻塞任务(网络调用,我们需要等待响应).我想同时运行多个阻塞任务.

似乎可以使用ParallelFlux或flatMap(),裸骨示例:

Flux.just(1)
    .repeat(10)
    .parallel(3)
    .runOn(Schedulers.elastic())
    .doOnNext(i -> blockingTask())
    .sequential()
    .subscribe()
Run Code Online (Sandbox Code Playgroud)

要么

Flux.just(1)
    .repeat(10)
    .flatMap(i -> Mono.fromCallable(() -> {blockingTask(); return i;}).subscribeOn(Schedulers.elastic()), 3)
    .subscribe();
Run Code Online (Sandbox Code Playgroud)

这两种技术的优点是什么?一个比另一个更受欢迎吗?还有其他选择吗?

project-reactor reactive-streams

13
推荐指数
1
解决办法
2833
查看次数