标签: reactive-programming

如何使用Mono的“缓存”方法

我是 spring webflux 的初学者。在研究时我发现了一些代码,例如:

Mono result = someMethodThatReturnMono().cache();
Run Code Online (Sandbox Code Playgroud)

“缓存”这个名字告诉我关于缓存某些东西,但是缓存在哪里以及如何检索缓存的东西?是咖啡因之类的东西吗?

caching reactive-programming project-reactor spring-webflux webflux

16
推荐指数
1
解决办法
1万
查看次数

反应性香蕉可以在网络中处理循环吗?

我们有这样的代码:

 guiState :: Discrete GuiState
 guiState = stepperD (GuiState []) $
   union (mkGuiState <$> changes model) evtAutoLayout

 evtAutoLayout :: Event GuiState
 evtAutoLayout = fmap fromJust . filterE isJust . fmap autoLayout $ changes guiState
Run Code Online (Sandbox Code Playgroud)

你可以看到evtAutoLayout提供给guiState,它被输入到evtAutoLayout中 - 所以那里有一个循环.这是故意的.自动布局调整gui状态直到达到平衡,然后它返回Nothing,因此它应该停止循环.当然,新的模型改变可以再次启动它.

但是,当我们将它们组合在一起时,我们会在编译函数调用中遇到无限循环.即使autoLayout = Nothing,它仍会在编译期间导致堆栈溢出.

如果我删除了guiState中的union调用并删除了图片中的evtAutoLayout ...

 guiState :: Discrete GuiState
 guiState = stepperD (GuiState []) $ mkGuiState <$> changes model
Run Code Online (Sandbox Code Playgroud)

它工作正常.

有什么建议?

haskell reactive-programming reactive-banana

15
推荐指数
1
解决办法
666
查看次数

未来的onComplete和flatMap有什么区别?

我正在使用ReactiveMongo驱动程序编写Scala应用程序.访问db的方法总是返回Future[T].以下代码是否相同?

(有onComplete)

val results: Future[List[Tag]] = Tags.all.toList
results onComplete {
    case Success(list) => //do something with list
    case Failure(t) => //throw the error
}
Run Code Online (Sandbox Code Playgroud)

(有flatMap)

Tags.all.toList.flatMap(list => //do something with list)
Run Code Online (Sandbox Code Playgroud)

有什么不同?

flatMap不会抛出失败?这flatMap是一个回调,onComplete或者等到Tags.all.toList语句没有完成?

asynchronous scala future reactive-programming

15
推荐指数
1
解决办法
8671
查看次数

RxJava-在Observable链中执行peek()或void操作?

Java 8 lambda流有一个peek()运算符,允许您对每个项执行void操作.这通常用于调试,但它也是一种欺骗和启动void操作而不映射到某些东西的好方法.

在RxJava中是否有相同的功能?也许我没有遵循良好的做法或反应性思考......但是在操作之前和之后创建状态标签会非常方便吗?如果peek()不受支持,是否有更好的模式可供遵循?

Observable<Item> Item= ...;

Label statusLabel = new Label();
Label resultLabel = new Label();

Observable<CalculatedItem> calculatedItem = calculated.subscribeOn(Schedulers.computation())
.peek(c -> statusLabel.setText("Working.."))
.map(c -> performExpensiveCalculation(c))
.peek(r -> statusLabel.setText(""));

calculatedItem.subscribe(c -> resultLabel.setText(c));
Run Code Online (Sandbox Code Playgroud)

java reactive-programming rx-java

15
推荐指数
1
解决办法
3725
查看次数

使用HTTP端点返回Flux/Mono实例而不是DTO的好处

我看过Spring Tips:使用Spring Framework 5.0的功能反应端点,并阅读了一些关于spring reactor的内容,但我无法理解它.

鉴于我有netty和spring reactor有效,让端点返回Flux/ Mono实例(jacksonified)而不是直接dto对象(jacksonified)有什么好处?我最初假设在http请求/响应上下文中,反应流将更像是websockets,其中服务器通过开放通道将数据推送到接收器,但似乎并非如此.

另外netty在反应式编程中实际上比tomcat做得更好?

如果这些问题看起来很愚蠢,我很抱歉,但我不太明白这个新框架方向的目的.它为什么会出现,它是如何工作的以及它解决了哪些问题?

spring reactive-programming netty project-reactor spring-webflux

15
推荐指数
1
解决办法
8470
查看次数

在rxjs Observable中抛出错误

我试图在rxjs Observable中抛出一个错误

 new Observable(subscriber => {
   Observable.throw("error1");
   return Observable.throw("error2");
 })
 .subscribe(
   () => {},
   err => console.error(err)
 );
Run Code Online (Sandbox Code Playgroud)

错误1未被捕获.

error2给出了编译错误:

Argument of type '(this: Observable<{}>, subscriber: Subscriber<{}>) => ErrorObservable<string>' is not assignable to parameter of type '(this: Observable<{}>, subscriber: Subscriber<{}>) => TeardownLogic'. Type 'ErrorObservable<string>' is not assignable to type 'TeardownLogic'
Run Code Online (Sandbox Code Playgroud)

在一个observable中抛出错误的正确方法是什么?

javascript error-handling reactive-programming rxjs

15
推荐指数
3
解决办法
2万
查看次数

Rxjava中的"背压"一词是什么意思?

我是RxJava的初学者,我很好奇"背压"的含义.

这是否意味着生产者会给消费者带来压力?

或者这是否意味着消费者对生产者施加压力?(相反方向的压力)

reactive-programming backpressure rx-java

15
推荐指数
3
解决办法
2372
查看次数

如何在Flutter中使用Bloc模式进行错误处理?

想象一下我正在使用集团来处理网络请求。如果请求失败,则根据平台的不同,处理失败的方法也会有所不同。在我的Web应用程序上,我想将用户重定向到错误页面,而在IOS应用程序上,我想显示一个对话框。

由于仅应使用bloc并共享它来处理业务逻辑,并且错误处理部分与业务逻辑无关,因此,我们应该请UI部分照顾错误处理。

UI可以将错误回调发送到块,并且当发生错误时,块将运行它。我们还可以通过在不同平台上发送不同的回调,以特定于平台的方式处理错误。

接下来是我的两个问题:

有没有更合适的方法可以做到这一点?

如何将回调发送到集团?

在flutter中,我们只能在initState生命周期方法之后访问bloc (因为我们是从builder上下文中获取bloc的,而后者仅在之后initState)。然后,我们只能在build方法中发送回调。

这样,每次重建时,我们都会重复向回调发送bloc(这些重复没有意义)。使用react,可以在生命周期(例如)中完成一次一次性初始化componentDidMount。在颤振中,我们如何达到只运行一次初始化的目标?

error-handling reactive-programming reactiveui flutter

15
推荐指数
1
解决办法
1727
查看次数

Web反应式编程 - 从HTTP客户端的角度来看,有哪些优势?

让我们假设控制器的这两种情况产生一些延迟的随机数:

1)Reactive Spring 5反应性应用:

@GetMapping("/randomNumbers")
public Flux<Double> getReactiveRandomNumbers() {
    return generateRandomNumbers(10, 500);
}

/**
 * Non-blocking randon number generator
 * @param amount - # of numbers to generate
 * @param delay - delay between each number generation in milliseconds
 * @return
 */
public Flux<Double> generateRandomNumbers(int amount, int delay){
    return Flux.range(1, amount)
               .delayMillis(delay)
               .map(i -> Math.random());
}
Run Code Online (Sandbox Code Playgroud)

2)传统的Spring MVC DeferredResult:

@GetMapping("/randomNumbers")
public DeferredResult<Double[]> getReactiveRandomNumbers() {
    DeferredResult<Double[]> dr = new DeferredResult<Double[]>();

    CompletableFuture.supplyAsync(() -> {
        return generateRandomNumbers(10, 500);
    }).whenCompleteAsync((p1, p2) -> { …
Run Code Online (Sandbox Code Playgroud)

spring reactive-programming rx-java project-reactor

14
推荐指数
1
解决办法
2248
查看次数

RxJava,一个可观察的多个订阅者:publish().autoConnect()

我正在玩rxJava/rxAndroid,并且有一些非常基本的东西不像我期望的那样.我有一个可观察的和两个订阅者:

Observable<Integer> dataStream = Observable.just(1, 2, 3).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());

Log.d(TAG, "subscribing sub1...");
dataStream.subscribe(v -> System.out.println("Subscriber #1: "+ integer));

Log.d(TAG, "subscribing sub2...");
dataStream.subscribe(v -> System.out.println("Subscriber #2: "+ integer));
Run Code Online (Sandbox Code Playgroud)

这是输出:

D/RxJava: subscribing sub1...
D/RxJava: subscribing sub2...
D/RxJava: Subscriber #1: 1
D/RxJava: Subscriber #1: 2
D/RxJava: Subscriber #1: 3
D/RxJava: Subscriber #2: 1
D/RxJava: Subscriber #2: 2
D/RxJava: Subscriber #2: 3
Run Code Online (Sandbox Code Playgroud)

现在,我知道我可以通过使用避免重复计数,publish().autoConnect()但我试图首先理解这种默认行为.每当有人订阅observable时,它就会开始发出数字序列.我明白了.因此,当Subscriber 1连接时它开始发射物品.Subscriber 2马上连接,为什么不能获得价值呢?

这是我理解它的方式,从可观察的角度来看:

  1. 有人订阅了我,我应该开始发出物品
    [订阅者:1] [项目到EMIT:1,2,3]

  2. 向订户发出项目"1"
    [订阅者:1] [项目到EMIT:2,3]

  3. 有人订阅了我,当我完成后我将再次发出1,2,3
    [订阅者:1&2] [项目到EMIT:2,3,1,2,3]

  4. 向订户发出项目'2'
    [订阅者:1&2] [项目到EMIT:3,1,2,3]

  5. 向订户发出项目'3'
    [订阅者:1&2] …

reactive-programming rx-java

14
推荐指数
2
解决办法
1万
查看次数