标签: reactive-streams

将元素从外部推送到 fs2 中的反应流

我有一个外部(即,我无法更改它)Java API,如下所示:

public interface Sender {
    void send(Event e);
}
Run Code Online (Sandbox Code Playgroud)

我需要实现一个Sender接受每个事件,将其转换为 JSON 对象,将其中一些收集到一个包中并通过 HTTP 发送到某个端点的。这一切都应该异步完成,没有send()阻塞调用线程,使用一些固定大小的缓冲区并在缓冲区已满时丢弃新事件。

使用 akka-streams 这很简单:我创建了一个阶段图(它使用 akka-http 发送 HTTP 请求),将它具体化并使用具体化ActorRef将新事件推送到流:

lazy val eventPipeline = Source.actorRef[Event](Int.MaxValue, OverflowStrategy.fail)
  .via(CustomBuffer(bufferSize))  // buffer all events
  .groupedWithin(batchSize, flushDuration)  // group events into chunks
  .map(toBundle)  // convert each chunk into a JSON message
  .mapAsyncUnordered(1)(sendHttpRequest)  // send an HTTP request
  .toMat(Sink.foreach { response =>
    // print HTTP response for debugging
  })(Keep.both)

lazy val (eventsActor, completeFuture) = eventPipeline.run()

override def …
Run Code Online (Sandbox Code Playgroud)

scala reactive-streams akka-stream fs2

8
推荐指数
1
解决办法
953
查看次数

多个枚举与一个枚举

当我偶然发现一些我不明白为什么这样做的事情时,我正在查看反应流规范的Publisher(AsyncIterablePublisher.java)的示例实现.

static interface Signal {};
enum Cancel implements Signal { Instance; };
enum Subscribe implements Signal { Instance; };
enum Send implements Signal { Instance; };
Run Code Online (Sandbox Code Playgroud)

说实话,我不是这样一个高级程序员,而是写这篇文章的人,我确信有理由这样做.但是我也无法解释为什么它会比这更好(这就是我会这样做的).

enum Signal {
  Cancel,
  Subscribe,
  Send;
}
Run Code Online (Sandbox Code Playgroud)

有人可以向我解释为什么会更好吗?优点缺点?

java enums reactive-streams

7
推荐指数
1
解决办法
398
查看次数

集成测试 Spring SseEmitters

我一直在寻找有关如何最好地测试返回 SseEmitters 的 Spring MVC 控制器方法的提示。我的想法很短,但有一个反复试验的解决方案,可以测试异步、线程化的行为。下面是示例代码,只是为了演示概念,可能有一两个错字:

控制器类:

@Autowired
Publisher<MyResponse> responsePublisher;

@RequestMapping("/mypath")
public SseEmitter index() throws IOException {
    SseEmitter emitter = new SseEmitter();
    Observable<MyResponse> responseObservable = RxReactiveStreams.toObservable(responsePublisher);

    responseObservable.subscribe(
            response -> {
                try {
                    emitter.send(response);
               } catch (IOException ex) {
                    emitter.completeWithError(ex);
               }
            },
            error -> {
                emitter.completeWithError(error);
            },
            emitter::complete
    );

    return emitter;
}
Run Code Online (Sandbox Code Playgroud)

测试类:

//A threaded dummy publisher to demonstrate async properties.
//Sends 2 responses with a 250ms pause in between.
protected static class MockPublisher implements Publisher<MyResponse> {
    @Override
    public void subscribe(Subscriber<? …
Run Code Online (Sandbox Code Playgroud)

java spring spring-mvc server-sent-events reactive-streams

7
推荐指数
1
解决办法
1754
查看次数

如何在出错时从 Mono&lt;A&gt; 返回 Mono&lt;B&gt;?

假设我有以下链:

public Mono<B> someMethod( Object arg ) {
  Mono<A> monoA = Mono.just( arg ).flatMap( adapter1::doSomething );
  // success chain
  return monoA.map( B::buildSuccessResponse );
}
Run Code Online (Sandbox Code Playgroud)

但是如果出现错误,我需要返回不同类型的 B,比如说,B.buildErrorResponse( a ). 任何 onError / doOnError 方法只能返回原始 Mono 类型 (A),但不能返回我需要返回的类型 (B)。出现错误时如何返回不同的 Mono 类型?

java reactive-programming project-reactor reactive-streams

7
推荐指数
1
解决办法
8293
查看次数

如何合并两个流(没有空值)并在对上应用条件?

考虑我有两个数据流,有没有办法合并它们并在这两个流之间应用数据条件?例如

Stream A : A, B, C, D....
Stream B : -, A, -, -....
Composed : (A,-),(B,A),(C,-),(D,-)....
Run Code Online (Sandbox Code Playgroud)

如何使用rxjs获得上面的组合流?我想在组合流上应用条件来提出一些通知.也可以使用最后已知的非空数据,例如,参见下面的组合流.

Stream A : A, B, C, D....
Stream B : 1, null, 2, null....
Composed : (A,1),(B,1),(C,2),(D,2)....
Run Code Online (Sandbox Code Playgroud)

我刚开始玩反应流的想法,所以如果我误解了反应流的想法,请纠正我.

reactive-programming rxjs reactive-streams

6
推荐指数
1
解决办法
5638
查看次数

Spring 集成与反应流

我正在从事一个 ETL 项目。我已经使用 spring 集成很长时间了。数据源当前是文件或编年史,但可能会更改为实时流,并且数据量可能会增长。未来有可能转向大数据解决方案(hadoop、spark 等)。

基于此我需要对 spring 集成和反应流进行比较?为什么有人会使用其中一个而不是另一个(或者我一开始尝试比较两者就错了)?您认为它们可以一起使用的场景(如果有)?

java spring-integration spring-xd project-reactor reactive-streams

6
推荐指数
1
解决办法
938
查看次数

将 Flux 的结果与 Mono 的结果相结合

我开始使用 Project reactor,其中一个让我苦苦挣扎的地方是如何将 Mono 的东西与 Flux 结合起来。这是我的用例:

public interface GroupRepository {
       Mono<GroupModel> getGroup(Long groupId);
}

public interface UserRepository {
       Flux<User> getUsers(Set<Long> userIds);   
}

Mono<GroupModel> groupMono = getGroup(groupId);
Flux<User> userFlux = getUsers(Set<Long> users);
//run above instrtuction in parallel and associate user to group.
Run Code Online (Sandbox Code Playgroud)

现在我想要实现的是:

如何组合来自 UserFlux 的响应并将这些用户与该组相关联,例如 group.addUsers(userfromFlux)。

有人可以帮助如何组合来自 userFlux 和 groupMono 的结果。我想我使用了 Zip 之类的东西,但它会从源代码进行一对一映射。就我而言,我需要进行 1 到 N 映射。在这里,我有一个组但需要添加到该组的多个用户。返回Mono<List<Users>然后将 zip 运算符与 mono 一起使用并提供此处提到的组合器是个好主意
public static <T1, T2, O> Flux<O> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, final BiFunction<? …

project-reactor reactive-streams spring-webflux

6
推荐指数
2
解决办法
8216
查看次数

Spring Boot Webflux/Netty - 检测关闭连接

我一直在2.0.0.RC1使用webflux starter(spring-boot-starter-webflux)开始使用spring-boot .我创建了一个简单的控制器,返回无限的通量.我希望发布者只有在有客户端(订阅者)时才能正常工作.假设我有一个像这样的控制器:

@RestController
public class Demo {

    @GetMapping(value = "/")
    public Flux<String> getEvents(){
        return Flux.create((FluxSink<String> sink) -> {

            while(!sink.isCancelled()){

                // TODO e.g. fetch data from somewhere

                sink.next("DATA");
            }
            sink.complete();
        }).doFinally(signal -> System.out.println("END"));
    }

}
Run Code Online (Sandbox Code Playgroud)

现在,当我尝试运行该代码并使用Chrome 访问端点http:// localhost:8080 /时,我可以看到数据.但是,一旦我关闭浏览器,while循环就会继续,因为没有触发取消事件.关闭浏览器后,如何终止/取消流媒体?

从这个答案我引用:

目前使用HTTP,确切的背压信息不会通过网络传输,因为HTTP协议不支持此功能.如果我们使用不同的线路协议,这可能会改变.

我认为,由于HTTP协议不支持背压,这意味着也不会发出取消请求.

通过分析网络流量进一步调查,显示浏览器在FIN关闭浏览器后立即发送TCP .有没有办法配置Netty(或其他),以便半封闭连接将触发发布者的取消事件,使while循环停止?

或者我是否必须编写自己的适配器,类似于org.springframework.http.server.reactive.ServletHttpHandlerAdapter我实现自己的订阅服务器的地方?

谢谢你的帮助.

编辑: 一个IOException将在尝试,如果没有客户端将数据写入套接字提高.正如您在堆栈跟踪中看到的那样.

但这还不够好,因为在下一个数据块准备好发送之前可能需要一段时间,因此需要花费相同的时间来检测已经消失的客户端.正如Brian Clozel所说,它是Reactor Netty中的一个已知问题.我试图通过添加依赖项来使用Tomcat POM.xml.像这样:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>
Run Code Online (Sandbox Code Playgroud)

虽然它取代了Netty并使用了Tomcat,但由于浏览器没有显示任何数据,它似乎没有被动反应.但是,控制台中没有警告/信息/异常.spring-boot-starter-webflux这个版本( …

spring spring-boot project-reactor reactive-streams spring-webflux

6
推荐指数
1
解决办法
3984
查看次数

Project Reactor:doOnNext(或其他 doOnXXX)异步

有没有像 doOnNext 这样的方法,但是是异步的?例如,我需要为确定的元素做一些长时间的日志记录(通过电子邮件发送通知)。

Scheduler myParallel = Schedulers.newParallel("my-parallel", 4);

Flux<Integer> ints = Flux.just(1, 2, 3, 4, 5)
        .publishOn(myParallel)
        .doOnNext(v -> {
            // For example, we need to do something time-consuming only for 3

            if (v.equals(3)) {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            System.out.println("LOG FOR " + v);
        });

ints.subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)

但是为什么我要等待 3 的记录?我想异步执行此逻辑。

现在我只有这个解决方案

Thread.sleep(10000);

Scheduler myParallel = Schedulers.newParallel("my-parallel", 4);
Scheduler myParallel2 = Schedulers.newParallel("my-parallel2", 4);

Flux<Integer> ints = Flux.just(1, 2, 3, 4, 5)
        .publishOn(myParallel)
        .doOnNext(v -> { …
Run Code Online (Sandbox Code Playgroud)

java asynchronous project-reactor reactive-streams

6
推荐指数
1
解决办法
3445
查看次数

Project Reactor:ConnectableFlux按需自动连接

我只有一个数据项源,我想与多个下游流共享该Flux。

它与参考指南中的示例非常相似,但是我觉得该示例通过.connect()手动调用而作弊。具体来说,我不知道将有多少个下游订户,并且我没有控制权来调用.connect()“最后”。消费者应该能够订阅,但不能立即触发数据提取。然后在将来实际需要数据的某个地方,它们将在必要时提取。

此外,源对消耗很敏感,因此无法重新获取。
除此之外,它会很大,因此不能选择缓冲和重播。

理想地,最重要的是,整个事情发生在一个线程中,因此没有并发或等待。
(不希望为订阅者提供非常小的等待时间)

对于Monos,我几乎可以达到预期的效果(单个最终结果值):

public class CoConsumptionTest {
    @Test
    public void convenientCoConsumption() {
        // List used just for the example:
        List<Tuple2<String, String>> source = Arrays.asList(
                Tuples.of("a", "1"), Tuples.of("b", "1"), Tuples.of("c", "1"),
                Tuples.of("a", "2"), Tuples.of("b", "2"), Tuples.of("c", "2"),
                Tuples.of("a", "3"), Tuples.of("b", "3"), Tuples.of("c", "3")
        );

        // Source which is sensitive to consumption
        AtomicInteger consumedCount = new AtomicInteger(0);
        Iterator<Tuple2<String, String>> statefulIterator = new Iterator<Tuple2<String, String>>() {
            private ListIterator<Tuple2<String, String>> sourceIterator = source.listIterator();

            @Override …
Run Code Online (Sandbox Code Playgroud)

java reactive-programming project-reactor reactive-streams

6
推荐指数
1
解决办法
754
查看次数