标签: reactive-programming

如何在 RXJS 中实现带背压的轮询

我想要轮询一个响应速度可能很慢的 HTTP API,因此我不想同时多次调用此 API。

我想做的一个例子可能是:

const interval = Rx.Observable.interval(250).take(5); // Poll every 250ms

function simulateMaybeSlowHttpCall() {
  return interval.delay(500).take(1); // The service takes 500ms to answer
}

 interval
   .mergeMap(val =>simulateMaybeSlowHttpCall().map(x => val), 1) // max concurrent is 1
   .subscribe(val => console.log(val));
Run Code Online (Sandbox Code Playgroud)

此处,此代码将显示 1 2 3 4 5

但我不想打无用的电话。上面的代码运行 250*5 = 1250 毫秒,1 次调用需要 500 毫秒,所以我想显示:

1 3 5

所以我的问题是:当设置并发1(或任何其他值)时,如何丢弃所有未立即完成的调用?

JsFiddle: https: //jsfiddle.net/zra3zxhs/63/

javascript reactive-programming observable rxjs

2
推荐指数
1
解决办法
626
查看次数

如何在 Spring Webflux 中返回 Mono<Map<String, Flux<Integer>>> 响应?

所以现在,我正在返回一个类似的响应

    @GetMapping("/integers")
    @ResponseStatus(code = HttpStatus.OK)
    public Mono<Map<String, Flux<Integer>>> getIntegers() {
        Mono<Map<String, Flux<Integer>>> integers = 
               Mono.just(Map.of("Integers", integerService.getIntegers()));
        return integers;
    }
Run Code Online (Sandbox Code Playgroud)

这给了我一个回应

{"Integers":{"scanAvailable":true,"prefetch":-1}}
Run Code Online (Sandbox Code Playgroud)

我希望它Flux<Integer>也能播放该部分,但事实并非如此。我该如何在 Spring webflux 中做到这一点?

spring-mvc reactive-programming spring-boot project-reactor spring-webflux

2
推荐指数
1
解决办法
6952
查看次数

反应本机谷歌翻译

我使用react-native开发了一个android移动应用程序。我的应用程序的母语是英语,但我也想提供普通话版本。所以我的问题是如何将静态数据和获取的数据转换为普通话。请分享一个解决方案。

reactive-programming reactjs react-native

2
推荐指数
1
解决办法
9482
查看次数

使用 Mono 值并用它来调用另一个 Mono

我的代码是这样构造的 -

Mono<Address> m1 = method1() // this call returns address
Mono<Boolean> m2 = method2() // this call uses ReactiveMongoTemplate and updates document in Mongo
Run Code Online (Sandbox Code Playgroud)

我正在努力实现这一目标:

当 method1() 返回地址时,我需要使用它并调用 method2() 来更新 MongoDB 文档中的地址。也没有抛出异常。但我在 method2() 中没有看到任何日志

代码 :

Mono<Object> m1 = method1().map(address -> method2(address));
Run Code Online (Sandbox Code Playgroud)

尽管调用了 method2(),但 MongoDB 中的文档更新并未发生。

reactive-programming project-reactor spring-webflux

2
推荐指数
1
解决办法
3919
查看次数

反应式应用程序的 ORM 框架

我一直在寻找spring-data-r2dbc用于反应式数据库访问。但它似乎不提供任何 ORM 支持,正如项目页面上所述,它不是一个 ORM 框架。现有或计划使用哪些选项来支持反应式应用程序中的 ORM?至于今天,为字段指定列名(JPA 中的@Column)、使用类层次结构(JPA 中的@MappedSuperclass)以及最重要的是,在使用spring-data-r2dbc时进行联接的最佳方法是什么?

reactive-programming spring-data spring-data-r2dbc r2dbc

2
推荐指数
1
解决办法
4793
查看次数

Spring 5 和响应式编程

我\xe2\x80\x99注意到Spring框架在spring 5.0版本中引入了反应式堆栈,它是基于项目反应器反应式流规范构建的。我\xe2\x80\x99m很想知道,如果有人有这些信息,为什么他们选择项目反应堆,而RxJava似乎是一个更好的选择(广泛使用,伟大的社区,更长的历史通常意味着更少的错误,实现反应流规范, ETC。)

\n

java spring reactive-programming

2
推荐指数
1
解决办法
134
查看次数

项目 Reactor:如何控制通量排放

我有一个会发出一些Date. 这Date映射到我在某些Executer.

我想做的是等待所有 1024 个 HTTP 请求,然后再发出下一个Date.

目前运行时,onNext()被调用多次,然后稳定在某个稳定的速率。

我怎样才能改变这种行为?

PS 如果需要的话,我愿意改变架构。

private void run() throws Exception {

    Executor executor = Executors.newFixedThreadPool(2);

    Flux<Date> source = Flux.generate(emitter ->
        emitter.next(new Date())
    );

    source
            .log()
            .limitRate(1)
            .doOnNext(date -> System.out.println("on next: " + date))
            .map(date -> Flux.range(0, 1024))
            .flatMap(i -> Mono.fromCallable(Pipeline::simulateHttp)
                    .subscribeOn(Schedulers.fromExecutor(executor)))
            .subscribe(s -> System.out.println(s));

    Thread.currentThread().join();
}
Run Code Online (Sandbox Code Playgroud)

HTTP请求模拟:

private static String simulateHttp() {
    try {
        System.out.println("start http call");
        Thread.sleep(3_000);
    } catch (Exception e) {}

    return …
Run Code Online (Sandbox Code Playgroud)

java reactive-programming project-reactor reactive-streams

2
推荐指数
1
解决办法
3475
查看次数

使用项目反应器 mergeWith() 运算符来实现“if/elseif/else”分支逻辑

我正在尝试使用项目反应 mergeWith器运算符来实现if/elseif/else此处描述的分支逻辑:RxJS,其中是 If-Else Operator

提供的示例是用 RxJS 编写的,但基本思想保持不变。

filter基本上这个想法是在 3 上使用运算符(因此具有 3 个不同的谓词)并按如下方式monos/publishers合并 3 (这里当然是 RxJS):monosObservables

const somethings$ = source$
  .filter(isSomething)
  .do(something);

const betterThings$ = source$
  .filter(isBetterThings)
  .do(betterThings);

const defaultThings$ = source$
  .filter((val) => !isSomething(val) && !isBetterThings(val))
  .do(defaultThing);

// merge them together
const onlyTheRightThings$ = somethings$
  .merge(
    betterThings$,
    defaultThings$,
  )
  .do(correctThings);
Run Code Online (Sandbox Code Playgroud)

我复制并粘贴了上述文章中的相关示例。

考虑something$,betterThings$defaultThings$是我们的单元isSomething&isBetterThings是谓词。

现在这是我的 3 个真实的monos/publishers(用 java 编写的): …

reactive-programming project-reactor

2
推荐指数
1
解决办法
1474
查看次数

webflux Mono 响应为空

我的项目中有一个非常简单的 spring webflux 休息端点。

@Bean
public RouterFunction authRoute() {
    return RouterFunctions.route(POST("/auth/signin").and(accept(APPLICATION_JSON)), this::signIn)
            .andRoute(POST("/auth/signup").and(accept(APPLICATION_JSON)), this::signUp)
            .andRoute(POST("/auth/test").and(accept(APPLICATION_JSON)), this::test);
}
Run Code Online (Sandbox Code Playgroud)

端点/auth/test只需回复提供的用户名即可。

public Mono<ServerResponse> test(ServerRequest request) {
    System.out.println("Start test ");
    Mono<JwtRequest> jwtRequestMono = request.bodyToMono(JwtRequest.class);
    jwtRequestMono.subscribe(v -> System.out.println(v.getUsername() + ":" + v.getPassword()));
    return jwtRequestMono
            .flatMap(j -> ServerResponse.ok().contentType(APPLICATION_JSON).bodyValue(j.getUsername()));
}
Run Code Online (Sandbox Code Playgroud)

终端

我面临的问题是响应正文为空,它应该是用户名。我还验证了当我返回硬编码字符串时,它会通过。当我依赖时它失败了jwtRequestMono.flatMap(...

![邮差

mono reactive-programming spring-boot spring-webflux

2
推荐指数
1
解决办法
3958
查看次数

Spring WebFlux switchIfEmpty 返回不同类型

public Mono<ServerResponse> getMessage(ServerRequest request) {
    //this call returns Mono<ApiClientResponse>
    return apiClient.hystrixWrappedGetMessages(request.headers().asHttpHeaders(), request.queryParams())
            .switchIfEmpty(/* Here */)
}
Run Code Online (Sandbox Code Playgroud)

请原谅代码稍微不完整,当我遇到这个问题时,我正在重组它。要点是 /* Here */ 在调用中switchIfEmpty(),编译器强制使用类型Mono<ApiClientResponse>,但是当hystrixWrappedGetMessages()返回时Mono.empty()我想通过返回 204 来处理它Mono<ServerResponse>,否则我想返回 200。我怎样才能做到这一点?

理想情况下,我可以在地图调用中检查它是否是 Mono.empty() ,但如果它是空的 Mono,它似乎不会输入地图。考虑过使用可选选项,但它们似乎与 Monos 不能很好地配合。

java reactive-programming reactor-netty spring-webflux spring-webclient

2
推荐指数
1
解决办法
7661
查看次数