标签: reactive-streams

端到端的反应流RESTful服务(也称为HTTP上的背压)

我一直试图在网上澄清这个问题一段时间没有成功,所以我会试着在这里问一下.

我想找到一些资源或示例,它展示了我如何构建端到端完全反压的REST服务+客户端.我的意思是,我希望看到,给定一个实现Reactive Streams的REST客户端(无论是在Akka,JS还是其他),我将拥有(并能够"可视化")整个处理过程中的背压.构建REST服务器,例如使用Akka-Http.

为了清楚起见,我正在寻找类似下面的话题(但我找不到幻灯片或视频来确认):http://oredev.org/2014/sessions/reactive-streaming-restful-applications-with-akka -http

我对大多数示例的怀疑是关于这样一个事实:我可以找到很多案例,其中REST服务(服务器)使用Akka Http和Akka流作为后端,但我不确定背压是通过HTTP"通信"的和REST,如果客户端正在实现Reactive Streams.在这种情况下,我是否可以通过TCP/HTTP或仅仅2个独立的流桥接一个"流"?这是我的主要怀疑和困惑.

希望我足够清楚,有人能够对此事有所了解.
无论如何,谢谢!

rest scala akka reactive-streams akka-stream

4
推荐指数
1
解决办法
1784
查看次数

同一个流中有多个接收器

我有一个像这样的流和两个接收器,但一次只使用一个:

Source.fromElements(1, 2, 3)
.via(flow)
.runWith(sink1)
Run Code Online (Sandbox Code Playgroud)

要么

Source.fromElements(1, 2, 3)
.via(flow)
.runWith(sink2)
Run Code Online (Sandbox Code Playgroud)

它是可配置的我们使用的接收器,但是如果我并行使用两个接收器怎么办呢.我怎样才能做到这一点?

我想到了Sink.combine,但它还需要一个合并策略,我不想以任何方式结合这些接收器的结果.我并不真正关心它们,所以我只希望通过HTTP将相同的数据发送到某个端点,同时将它们发送到数据库.Sink组合与广播非常相似,但是从头开始实现广播降低了我的代码的可读性,现在我只有简单的源,流和接收器,没有低级图形阶段.

你知道如何做到这一点的正确方法(背压和其他只使用一个水槽的东西)?

scala akka reactive-streams akka-stream

4
推荐指数
1
解决办法
2236
查看次数

reactive-kafka与默认调度程序?

我正在与Kafka和Akka Streams合作使用reactive-kafka连接器.我们发现reactive-kafka使用它自己的调度程序(akka.kafka.default-dispatcher),但是如果,实例,我们使用默认的akka​​调度程序,一切都更快(reactive-kafka dispatcher~300 messages/s,默认调度程序) ~1300条消息/ s)

我想知道是否使用默认调度程序是安全的.

提前致谢.

streaming reactive-programming apache-kafka reactive-streams akka-stream

3
推荐指数
1
解决办法
308
查看次数

通常使用Reactive Streams Processor作为事件总线是否可以?

我开始学习反应流,因为我很好奇使用RxJava作为更传统事件总线的替代品的新趋势. 这篇博文是对其完成方式的典型描述.如果我理解正确,RxJava 1.x并不是Reactive Streams的严格实现,但它非常相似.2.0版包含一些符合要求的类,或至少通过TCK,因此该代码的更新版本可能看起来有点不同.

public class UserLocationModel {

  private PublishSubject<LatLng> subject = PublishSubject.create();

  public void setLocation(LatLng latLng) {
    subject.onNext(latLng);
  }

  public Observable<LatLng> getUserLocation() {
    return subject;
  }
}
Run Code Online (Sandbox Code Playgroud)

在Reactive Streams术语中,我认为subject是a Processor,它既是a Publisher又是a Subscriber.

问题是,呼吁onNextSubscriber未订购任何东西,似乎违反了无流规范,特别是排除1.9.

这仅仅是一个实现细节吗? 我是否正确地认为您通常不能依赖此协议来实现兼容的Reactive Streams实施?

event-bus rx-java reactive-streams

3
推荐指数
1
解决办法
236
查看次数

Flux.concat 和 Flux.concatWith 之间的区别

我是反应流新手,正在学习使用 concat/concatWith 方法组合两个发布者(具体来说是 Flux)。

我可以用 concat 方法做的所有事情,都可以使用 concatWith 方法来实现。这是我使用的示例案例:

        Mono<String> mono1 = Mono.just(" karan ");
        Mono<String> mono2 = Mono.just(" | verma ");
        Mono<String> mono3 = Mono.just(" | kv ");

        Flux<String> flux1 = Flux.just(" {1} ","{2} ","{3} ","{4} " );
        Flux<String> flux2 = Flux.just(" |A|"," |B| "," |C| ");

        // FLux emits item each 500ms
        Flux<String> intervalFlux1 = Flux.interval(Duration.ofMillis(1000))
                                        .zipWith(flux1, (i, string) -> string);

        // FLux emits item each 700ms       
        Flux<String> intervalFlux2 = Flux
                                .interval(Duration.ofMillis(1000))
                                .zipWith(flux2, (i, string) -> string);



        System.out.println("**************Flux …
Run Code Online (Sandbox Code Playgroud)

reactive-programming project-reactor reactive-streams

3
推荐指数
1
解决办法
8510
查看次数

如何在 Flux 上同时调用 subscribe 和 blockLast ?

我一直在尝试 Project Reactor 和反应流。subscribeOn我在使用使流在不同线程上运行时遇到问题。将我的代码放在主线程中,我需要主线程块直到流完成,所以我做了这样的事情:

        Flux.just(1, 2, 3, 4)
                .log()
                .subscribeOn(Schedulers.parallel())
                .subscribe((i) -> {
                   // some operation 
                });

        try {
            Thread.sleep(20000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Finished");
Run Code Online (Sandbox Code Playgroud)

然后我注意到有一种blockLast()方法可以进行阻塞。但我不能同时使用 subscribe 和 blockLast 因为它们不 return Flux

有没有一种优雅的方法来做到这一点?

java project-reactor reactive-streams

3
推荐指数
1
解决办法
2487
查看次数

Flow&lt;List&lt;T&gt;&gt; 而不是使用 Flow&lt;T&gt;?

我注意到很多人和例子使用 Flows 作为 List<> 的包装器,例如这样:

@Query("SELECT * from some_model ORDER BY some_field")
fun getData(): Flow<List<some_model>>
Run Code Online (Sandbox Code Playgroud)

据我所知,Flow 就像一种“异步序列”,所以我真的不明白“Flow<List<T>>”的含义。

为什么我们不能直接使用 Flow< T > ,这对我来说似乎很直观,我问这个问题是因为我在代码片段中看到它重复了很多次并且无法理解其背后的目的?

kotlin reactive-streams kotlin-flow

3
推荐指数
1
解决办法
480
查看次数

为什么 Flux.flatMap() 不等待内部发布者完成?

你能解释一下返回的 Flux/Mono 到底发生了HttpClient.response() 什么吗?我认为在 Mono 完成之前,http 客户端生成的值不会传递到下游,但我看到生成了大量请求,但最终以reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException: Pending acquire queue has reached its maximum size of 8异常告终。它按预期工作(项目正在处理一个接一个),如果我更换呼叫testRequest()Mono.fromCallable { }

我错过了什么?

测试代码:

import org.asynchttpclient.netty.util.ByteBufUtils
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.netty.http.client.HttpClient
import reactor.netty.resources.ConnectionProvider

class Test {
    private val client = HttpClient.create(ConnectionProvider.create("meh", 4))

    fun main() {
        Flux.fromIterable(0..99)
                .flatMap { obj ->
                    println("Creating request for: $obj")
                    testRequest()
                            .doOnError { ex ->
                                println("Failed request for: $obj")
                                ex.printStackTrace()
                            }
                            .map { res ->
                                obj to res
                            }
                }
                .doOnNext …
Run Code Online (Sandbox Code Playgroud)

kotlin project-reactor reactive-streams reactor-netty

3
推荐指数
1
解决办法
1380
查看次数

反应流与反应器模式?

我想知道反应流(由反应宣言定义)和反应器模式( https://en.wikipedia.org/wiki/Reactor_pattern )之间有什么关系。我读到 Project Reactor,我认为它是 Reactor 模式的实现,实现了 Reactive Streams API。但是 Project Reactor 如何将 Reactor 模式的概念转化为流概念。这两个领域的抽象是如何链接的?

project-reactor reactive-streams

3
推荐指数
1
解决办法
559
查看次数

Spring Webflux 查找和保存的正确方法

我创建了以下方法来查找 Analysis 对象,更新其结果字段,然后最后将结果保存在数据库中,但不等待返回。

public void updateAnalysisWithResults(String uuidString, String results) {
        findByUUID(uuidString).subscribe(analysis -> {
            analysis.setResults(results);
            computeSCARepository.save(analysis).subscribe();
        });
    }
Run Code Online (Sandbox Code Playgroud)

在订阅中订阅感觉写得不好。这是一个不好的做法吗?有更好的方法来写这个吗?

更新:入口点

@PatchMapping("compute/{uuid}/results")
    public Mono<Void> patchAnalysisWithResults(@PathVariable String uuid, @RequestBody String results) {
        return computeSCAService.updateAnalysisWithResults(uuid,results);
    }
Run Code Online (Sandbox Code Playgroud)
    public Mono<Void> updateAnalysisWithResults(String uuidString, String results) {
//        findByUUID(uuidString).subscribe(analysis -> {
//            analysis.setResults(results);
//            computeSCARepository.save(analysis).subscribe();
//        });
        return findByUUID(uuidString)
                .doOnNext(analysis -> analysis.setResults(results))
                .doOnNext(computeSCARepository::save)
                .then();
    }
Run Code Online (Sandbox Code Playgroud)

java reactive-streams spring-webflux

3
推荐指数
1
解决办法
3883
查看次数