标签: reactive-programming

Observables(Rx.js)与ES2015发生器相比如何?

据我了解,以下是解决异步编程工作流的技术:

  1. 回调(CSP)
  2. 承诺

较新的方法:

  1. Rx.js Observables(或大多数,bacon.js,xstream等)
  2. ES6发电机
  3. 异步/等待

我们现在正在回避这些新方法的回调和承诺.我目前理解的是 - 在ES2015生成器之上,Async/Await更像是一个更清晰的抽象.

我无法理解的是Observables和Generators之间的概念差异.我已广泛使用它们并且使用它们没有任何问题.

令我困惑的是Observables和Generators的用例.我得出的结论是,他们最终解决了同样的问题 - 异步性.我看到的只有潜在的差异是生成器固有地为代码提供命令式语义,而使用Rxjs的Observable似乎提供了反应范式.但是这样吗?

这应该是Observable和Generator之间选择的标准吗?优缺点都有什么.

我错过了大局吗?

随着Observable最终制作成未来的Ecmascript,Promises(带有可取消令牌)/ Observable/Generators会相互竞争吗?

javascript generator reactive-programming ecmascript-6 rxjs5

24
推荐指数
2
解决办法
3018
查看次数

Reactive Cocoa中RACAble(),RACObserve()和RACBind()之间的区别

我是Reactive Programming的新手.我已经通过无可可的文档了,但无法实现之间的差异RACAble(),RACObserve()RACBind().

请帮助我,通过一些示例代码片段来理解方面.

我认为它RACAble()被替换RACObserve()为一些选项/参数.如果我不对,请在这方面纠正我.

RACObserve() skip:相似的RACAble()

objective-c reactive-programming reactive-cocoa

23
推荐指数
1
解决办法
4211
查看次数

流星`Deps.autorun`对`Collection.observe`

什么是使用的优点/缺点Deps.autorunCollection.observe让第三方插件同步与反应Meteor.Collection.

例如,我使用jsTree直观地显示我存储在MongoDB中的目录树.我正在使用此代码使其反应:

// automatically reload the fileTree if the data changes
FileTree.find().observeChanges({
  added: function() {
    $.jstree.reference('#fileTree').refresh();
  },
  changed: function() {
    $.jstree.reference('#fileTree').refresh();
  },
  removed: function() {
    $.jstree.reference('#fileTree').refresh();
  }
});
Run Code Online (Sandbox Code Playgroud)

使用此方法的优点/缺点与Deps.autorun看起来像这样的调用有什么关系:(未经测试)

Deps.autorun(function() {
  jsonData = FileTree.find().fetch();
  $.jstree.reference('#fileTree')({'core': {'data': jsonData} });
});
Run Code Online (Sandbox Code Playgroud)

这只是一个例子.我一般都在询问优缺点,而不是这个具体的用例.

json mongodb reactive-programming jstree meteor

23
推荐指数
1
解决办法
6548
查看次数

如何将rxJava2的Observable转换为Completable?

我有Observable流,我想将它转换为Completable,我怎么能这样做?

java reactive-programming observable rx-java rx-java2

23
推荐指数
2
解决办法
1万
查看次数

Spring WebFlux和Reactor的线程模型

目前正在尝试使用Spring 5.0.0.RC2,Reactor 3.1.0.M2Spring Boot 2.0.0.M2进行反应式编程.

想知道WebFlux和Reactor使用的并发和线程模型来正确编写应用程序并处理可变状态.

Reactor doc声明该库被认为是并发不可知的,并提到了Scheduler抽象.WebFlux文档不提供信息.

然而,当通过Spring Boot使用WebFlux时,定义了一个线程模型.

从我的实验中得到的是:

  • 该模型既不是1个事件线程,也不是1个事件线程+工作者
  • 使用了几个线程池
  • " reactor-http-nio-3 "线程:可能每个核心一个,处理传入的HTTP请求
  • " Thread-7 "线程:由对MongoDB或HTTP资源的异步请求使用
  • " parallel-1 "线程:每个核心一个,由Reactor的Schedulers.parallel()创建,由延迟运算符等使用
  • 共享可变状态必须由应用程序同步
  • ThreadLocal(用于应用程序状态,MDC日志记录等)不是请求作用域,因此不是很有趣

它是否正确 ?什么是WebFlux的并发和线程模型:例如,什么是默认线程池?

感谢您的信息

java multithreading reactive-programming project-reactor spring-webflux

23
推荐指数
1
解决办法
9518
查看次数

用Reactor抛出异常的正确方法

我是新手,一般都会对Reactor和反应式编程进行预测.

我目前正在编写一段类似于此的代码:

Mono.just(userId)
    .map(repo::findById)
    .map(user-> {
        if(user == null){
            throw new UserNotFoundException();
        }
        return user;
    })
    // ... other mappings
Run Code Online (Sandbox Code Playgroud)

这个例子可能很愚蠢,实现这种情况肯定有更好的方法,但重点是:

throw newmap块中使用异常是否错误,或者我应该用return Mono.error(new UserNotFoundException())?替换它?

这两种做法有什么实际区别吗?

java reactor reactive-programming project-reactor

23
推荐指数
1
解决办法
8638
查看次数

Mono<Void> 和 Mono.empty() 有何不同

据我了解,在 Spring WebFlux 反应器中

Mono<Void>指的是 void Mono

Mono.empty()指的是 void,因为在此之上调用任何内容都会给出空指针。

它们的用法有何不同?

java spring reactor reactive-programming spring-webflux

23
推荐指数
1
解决办法
3万
查看次数

catch:和subscribeError之间的区别:

ReactiveCocoa中,subscribeError:方法与catch:?之间的区别是什么?你为什么要回信号catch:

reactive-programming reactive-cocoa

22
推荐指数
1
解决办法
2291
查看次数

如何在执行Flux.map()时处理错误

我试图找出在Flux中映射元素时如何处理错误.

例如,我将CSV字符串解析为我的一个业务POJO:

myflux.map(stock -> converter.convertHistoricalCSVToStockQuotation(stock));
Run Code Online (Sandbox Code Playgroud)

其中一些行可能包含错误,因此我在日志中得到的是:

 reactor.core.publisher.FluxLog:  onNext([SOME_BOGUS_QUOTE]@38.09 (Fri Apr 08 00:00:00 CEST 2016) H(38.419998)/L(37.849998)/O(37.970001))
 reactor.core.publisher.FluxLog:  onNext([SOME_BOGUS_QUOTE]@38.130001 (Thu Apr 07 00:00:00 CEST 2016) H(38.189999)/L(37.610001)/O(37.799999))
 reactor.core.publisher.FluxLog:  onError(java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo)
 reactor.core.publisher.FluxLog:  java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo
Run Code Online (Sandbox Code Playgroud)

我在API中读到了一些错误处理方法,但大多数都提到返回"错误值"或使用回退Flux,如下所示:

Flux.onErrorResumeWith(myflux, x -> Mono.fromCallable(() -> ... do stuff);
Run Code Online (Sandbox Code Playgroud)

然而,用我的myflux意思是再次处理整个通量.

那么,有没有办法在处理特定元素时处理错误(即忽略它们/记录它们)并继续处理其余的通量?

使用@akarnokd解决方法进行更新

public Flux<StockQuotation> getQuotes(List<String> tickers)
{
    Flux<StockQuotation> processingFlux = Flux.fromIterable(tickers)
    // Get each set of quotes in a separate thread
    .flatMap(s -> Mono.fromCallable(() -> feeder.getCSVQuotes(s)))
    // Convert each list …
Run Code Online (Sandbox Code Playgroud)

java reactive-programming project-reactor

22
推荐指数
3
解决办法
1万
查看次数

如何在单元测试中模拟Spring WebClient

我们编写了一个小型Spring Boot REST应用程序,它在另一个REST端点上执行REST请求.

@RequestMapping("/api/v1")
@SpringBootApplication
@RestController
@Slf4j
public class Application
{
    @Autowired
    private WebClient webClient;

    @RequestMapping(value = "/zyx", method = POST)
    @ResponseBody
    XyzApiResponse zyx(@RequestBody XyzApiRequest request, @RequestHeader HttpHeaders headers)
    {
        webClient.post()
            .uri("/api/v1/someapi")
            .accept(MediaType.APPLICATION_JSON)
            .contentType(MediaType.APPLICATION_JSON)
            .body(BodyInserters.fromObject(request.getData()))
            .exchange()
            .subscribeOn(Schedulers.elastic())
            .flatMap(response ->
                    response.bodyToMono(XyzServiceResponse.class).map(r ->
                    {
                        if (r != null)
                        {
                            r.setStatus(response.statusCode().value());
                        }

                        if (!response.statusCode().is2xxSuccessful())
                        {
                            throw new ProcessResponseException(
                                    "Bad status response code " + response.statusCode() + "!");
                        }

                        return r;
                    }))
            .subscribe(body ->
            {
                // Do various things
            }, throwable ->
            {
                // …
Run Code Online (Sandbox Code Playgroud)

rest spring unit-testing mocking reactive-programming

22
推荐指数
6
解决办法
1万
查看次数