我想要轮询一个响应速度可能很慢的 HTTP API,因此我不想同时多次调用此 API。
我想做的一个例子可能是:
const interval = Rx.Observable.interval(250).take(5); // Poll every 250ms
function simulateMaybeSlowHttpCall() {
return interval.delay(500).take(1); // The service takes 500ms to answer
}
interval
.mergeMap(val =>simulateMaybeSlowHttpCall().map(x => val), 1) // max concurrent is 1
.subscribe(val => console.log(val));
Run Code Online (Sandbox Code Playgroud)
此处,此代码将显示 1 2 3 4 5
但我不想打无用的电话。上面的代码运行 250*5 = 1250 毫秒,1 次调用需要 500 毫秒,所以我想显示:
1 3 5
所以我的问题是:当设置并发1(或任何其他值)时,如何丢弃所有未立即完成的调用?
JsFiddle: https: //jsfiddle.net/zra3zxhs/63/
所以现在,我正在返回一个类似的响应
@GetMapping("/integers")
@ResponseStatus(code = HttpStatus.OK)
public Mono<Map<String, Flux<Integer>>> getIntegers() {
Mono<Map<String, Flux<Integer>>> integers =
Mono.just(Map.of("Integers", integerService.getIntegers()));
return integers;
}
Run Code Online (Sandbox Code Playgroud)
这给了我一个回应
{"Integers":{"scanAvailable":true,"prefetch":-1}}
Run Code Online (Sandbox Code Playgroud)
我希望它Flux<Integer>也能播放该部分,但事实并非如此。我该如何在 Spring webflux 中做到这一点?
spring-mvc reactive-programming spring-boot project-reactor spring-webflux
我使用react-native开发了一个android移动应用程序。我的应用程序的母语是英语,但我也想提供普通话版本。所以我的问题是如何将静态数据和获取的数据转换为普通话。请分享一个解决方案。
我的代码是这样构造的 -
Mono<Address> m1 = method1() // this call returns address
Mono<Boolean> m2 = method2() // this call uses ReactiveMongoTemplate and updates document in Mongo
Run Code Online (Sandbox Code Playgroud)
我正在努力实现这一目标:
当 method1() 返回地址时,我需要使用它并调用 method2() 来更新 MongoDB 文档中的地址。也没有抛出异常。但我在 method2() 中没有看到任何日志
代码 :
Mono<Object> m1 = method1().map(address -> method2(address));
Run Code Online (Sandbox Code Playgroud)
尽管调用了 method2(),但 MongoDB 中的文档更新并未发生。
我一直在寻找spring-data-r2dbc用于反应式数据库访问。但它似乎不提供任何 ORM 支持,正如项目页面上所述,它不是一个 ORM 框架。现有或计划使用哪些选项来支持反应式应用程序中的 ORM?至于今天,为字段指定列名(JPA 中的@Column)、使用类层次结构(JPA 中的@MappedSuperclass)以及最重要的是,在使用spring-data-r2dbc时进行联接的最佳方法是什么?
我有一个会发出一些Date. 这Date映射到我在某些Executer.
我想做的是等待所有 1024 个 HTTP 请求,然后再发出下一个Date.
目前运行时,onNext()被调用多次,然后稳定在某个稳定的速率。
我怎样才能改变这种行为?
PS 如果需要的话,我愿意改变架构。
private void run() throws Exception {
Executor executor = Executors.newFixedThreadPool(2);
Flux<Date> source = Flux.generate(emitter ->
emitter.next(new Date())
);
source
.log()
.limitRate(1)
.doOnNext(date -> System.out.println("on next: " + date))
.map(date -> Flux.range(0, 1024))
.flatMap(i -> Mono.fromCallable(Pipeline::simulateHttp)
.subscribeOn(Schedulers.fromExecutor(executor)))
.subscribe(s -> System.out.println(s));
Thread.currentThread().join();
}
Run Code Online (Sandbox Code Playgroud)
HTTP请求模拟:
private static String simulateHttp() {
try {
System.out.println("start http call");
Thread.sleep(3_000);
} catch (Exception e) {}
return …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用项目反应 mergeWith器运算符来实现if/elseif/else此处描述的分支逻辑:RxJS,其中是 If-Else Operator。
提供的示例是用 RxJS 编写的,但基本思想保持不变。
filter基本上这个想法是在 3 上使用运算符(因此具有 3 个不同的谓词)并按如下方式monos/publishers合并 3 (这里当然是 RxJS):monosObservables
const somethings$ = source$
.filter(isSomething)
.do(something);
const betterThings$ = source$
.filter(isBetterThings)
.do(betterThings);
const defaultThings$ = source$
.filter((val) => !isSomething(val) && !isBetterThings(val))
.do(defaultThing);
// merge them together
const onlyTheRightThings$ = somethings$
.merge(
betterThings$,
defaultThings$,
)
.do(correctThings);
Run Code Online (Sandbox Code Playgroud)
我复制并粘贴了上述文章中的相关示例。
考虑something$,betterThings$和defaultThings$是我们的单元isSomething&isBetterThings是谓词。
现在这是我的 3 个真实的monos/publishers(用 java 编写的): …
我的项目中有一个非常简单的 spring webflux 休息端点。
@Bean
public RouterFunction authRoute() {
return RouterFunctions.route(POST("/auth/signin").and(accept(APPLICATION_JSON)), this::signIn)
.andRoute(POST("/auth/signup").and(accept(APPLICATION_JSON)), this::signUp)
.andRoute(POST("/auth/test").and(accept(APPLICATION_JSON)), this::test);
}
Run Code Online (Sandbox Code Playgroud)
端点/auth/test只需回复提供的用户名即可。
public Mono<ServerResponse> test(ServerRequest request) {
System.out.println("Start test ");
Mono<JwtRequest> jwtRequestMono = request.bodyToMono(JwtRequest.class);
jwtRequestMono.subscribe(v -> System.out.println(v.getUsername() + ":" + v.getPassword()));
return jwtRequestMono
.flatMap(j -> ServerResponse.ok().contentType(APPLICATION_JSON).bodyValue(j.getUsername()));
}
Run Code Online (Sandbox Code Playgroud)
我面临的问题是响应正文为空,它应该是用户名。我还验证了当我返回硬编码字符串时,它会通过。当我依赖时它失败了jwtRequestMono.flatMap(...
public Mono<ServerResponse> getMessage(ServerRequest request) {
//this call returns Mono<ApiClientResponse>
return apiClient.hystrixWrappedGetMessages(request.headers().asHttpHeaders(), request.queryParams())
.switchIfEmpty(/* Here */)
}
Run Code Online (Sandbox Code Playgroud)
请原谅代码稍微不完整,当我遇到这个问题时,我正在重组它。要点是 /* Here */ 在调用中switchIfEmpty(),编译器强制使用类型Mono<ApiClientResponse>,但是当hystrixWrappedGetMessages()返回时Mono.empty()我想通过返回 204 来处理它Mono<ServerResponse>,否则我想返回 200。我怎样才能做到这一点?
理想情况下,我可以在地图调用中检查它是否是 Mono.empty() ,但如果它是空的 Mono,它似乎不会输入地图。考虑过使用可选选项,但它们似乎与 Monos 不能很好地配合。
java reactive-programming reactor-netty spring-webflux spring-webclient
java ×3
spring-boot ×2
javascript ×1
mono ×1
observable ×1
r2dbc ×1
react-native ×1
reactjs ×1
rxjs ×1
spring ×1
spring-data ×1
spring-mvc ×1