我是 spring webflux 的初学者。在研究时我发现了一些代码,例如:
Mono result = someMethodThatReturnMono().cache();
Run Code Online (Sandbox Code Playgroud)
“缓存”这个名字告诉我关于缓存某些东西,但是缓存在哪里以及如何检索缓存的东西?是咖啡因之类的东西吗?
caching reactive-programming project-reactor spring-webflux webflux
我们有这样的代码:
guiState :: Discrete GuiState
guiState = stepperD (GuiState []) $
union (mkGuiState <$> changes model) evtAutoLayout
evtAutoLayout :: Event GuiState
evtAutoLayout = fmap fromJust . filterE isJust . fmap autoLayout $ changes guiState
Run Code Online (Sandbox Code Playgroud)
你可以看到evtAutoLayout提供给guiState,它被输入到evtAutoLayout中 - 所以那里有一个循环.这是故意的.自动布局调整gui状态直到达到平衡,然后它返回Nothing,因此它应该停止循环.当然,新的模型改变可以再次启动它.
但是,当我们将它们组合在一起时,我们会在编译函数调用中遇到无限循环.即使autoLayout = Nothing,它仍会在编译期间导致堆栈溢出.
如果我删除了guiState中的union调用并删除了图片中的evtAutoLayout ...
guiState :: Discrete GuiState
guiState = stepperD (GuiState []) $ mkGuiState <$> changes model
Run Code Online (Sandbox Code Playgroud)
它工作正常.
有什么建议?
我正在使用ReactiveMongo驱动程序编写Scala应用程序.访问db的方法总是返回Future[T].以下代码是否相同?
(有onComplete)
val results: Future[List[Tag]] = Tags.all.toList
results onComplete {
case Success(list) => //do something with list
case Failure(t) => //throw the error
}
Run Code Online (Sandbox Code Playgroud)
(有flatMap)
Tags.all.toList.flatMap(list => //do something with list)
Run Code Online (Sandbox Code Playgroud)
有什么不同?
flatMap不会抛出失败?这flatMap是一个回调,onComplete或者等到Tags.all.toList语句没有完成?
Java 8 lambda流有一个peek()运算符,允许您对每个项执行void操作.这通常用于调试,但它也是一种欺骗和启动void操作而不映射到某些东西的好方法.
在RxJava中是否有相同的功能?也许我没有遵循良好的做法或反应性思考......但是在操作之前和之后创建状态标签会非常方便吗?如果peek()不受支持,是否有更好的模式可供遵循?
Observable<Item> Item= ...;
Label statusLabel = new Label();
Label resultLabel = new Label();
Observable<CalculatedItem> calculatedItem = calculated.subscribeOn(Schedulers.computation())
.peek(c -> statusLabel.setText("Working.."))
.map(c -> performExpensiveCalculation(c))
.peek(r -> statusLabel.setText(""));
calculatedItem.subscribe(c -> resultLabel.setText(c));
Run Code Online (Sandbox Code Playgroud) 我看过Spring Tips:使用Spring Framework 5.0的功能反应端点,并阅读了一些关于spring reactor的内容,但我无法理解它.
鉴于我有netty和spring reactor有效,让端点返回Flux/ Mono实例(jacksonified)而不是直接dto对象(jacksonified)有什么好处?我最初假设在http请求/响应上下文中,反应流将更像是websockets,其中服务器通过开放通道将数据推送到接收器,但似乎并非如此.
另外netty在反应式编程中实际上比tomcat做得更好?
如果这些问题看起来很愚蠢,我很抱歉,但我不太明白这个新框架方向的目的.它为什么会出现,它是如何工作的以及它解决了哪些问题?
spring reactive-programming netty project-reactor spring-webflux
我试图在rxjs Observable中抛出一个错误
new Observable(subscriber => {
Observable.throw("error1");
return Observable.throw("error2");
})
.subscribe(
() => {},
err => console.error(err)
);
Run Code Online (Sandbox Code Playgroud)
错误1未被捕获.
error2给出了编译错误:
Argument of type '(this: Observable<{}>, subscriber: Subscriber<{}>) => ErrorObservable<string>' is not assignable to parameter of type '(this: Observable<{}>, subscriber: Subscriber<{}>) => TeardownLogic'. Type 'ErrorObservable<string>' is not assignable to type 'TeardownLogic'
Run Code Online (Sandbox Code Playgroud)
在一个observable中抛出错误的正确方法是什么?
我是RxJava的初学者,我很好奇"背压"的含义.
这是否意味着生产者会给消费者带来压力?
或者这是否意味着消费者对生产者施加压力?(相反方向的压力)
想象一下我正在使用集团来处理网络请求。如果请求失败,则根据平台的不同,处理失败的方法也会有所不同。在我的Web应用程序上,我想将用户重定向到错误页面,而在IOS应用程序上,我想显示一个对话框。
由于仅应使用bloc并共享它来处理业务逻辑,并且错误处理部分与业务逻辑无关,因此,我们应该请UI部分照顾错误处理。
UI可以将错误回调发送到块,并且当发生错误时,块将运行它。我们还可以通过在不同平台上发送不同的回调,以特定于平台的方式处理错误。
接下来是我的两个问题:
在flutter中,我们只能在initState生命周期方法之后访问bloc (因为我们是从builder上下文中获取bloc的,而后者仅在之后initState)。然后,我们只能在build方法中发送回调。
这样,每次重建时,我们都会重复向回调发送bloc(这些重复没有意义)。使用react,可以在生命周期(例如)中完成一次一次性初始化componentDidMount。在颤振中,我们如何达到只运行一次初始化的目标?
让我们假设控制器的这两种情况产生一些延迟的随机数:
1)Reactive Spring 5反应性应用:
@GetMapping("/randomNumbers")
public Flux<Double> getReactiveRandomNumbers() {
return generateRandomNumbers(10, 500);
}
/**
* Non-blocking randon number generator
* @param amount - # of numbers to generate
* @param delay - delay between each number generation in milliseconds
* @return
*/
public Flux<Double> generateRandomNumbers(int amount, int delay){
return Flux.range(1, amount)
.delayMillis(delay)
.map(i -> Math.random());
}
Run Code Online (Sandbox Code Playgroud)
2)传统的Spring MVC DeferredResult:
@GetMapping("/randomNumbers")
public DeferredResult<Double[]> getReactiveRandomNumbers() {
DeferredResult<Double[]> dr = new DeferredResult<Double[]>();
CompletableFuture.supplyAsync(() -> {
return generateRandomNumbers(10, 500);
}).whenCompleteAsync((p1, p2) -> { …Run Code Online (Sandbox Code Playgroud) 我正在玩rxJava/rxAndroid,并且有一些非常基本的东西不像我期望的那样.我有一个可观察的和两个订阅者:
Observable<Integer> dataStream = Observable.just(1, 2, 3).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
Log.d(TAG, "subscribing sub1...");
dataStream.subscribe(v -> System.out.println("Subscriber #1: "+ integer));
Log.d(TAG, "subscribing sub2...");
dataStream.subscribe(v -> System.out.println("Subscriber #2: "+ integer));
Run Code Online (Sandbox Code Playgroud)
这是输出:
D/RxJava: subscribing sub1...
D/RxJava: subscribing sub2...
D/RxJava: Subscriber #1: 1
D/RxJava: Subscriber #1: 2
D/RxJava: Subscriber #1: 3
D/RxJava: Subscriber #2: 1
D/RxJava: Subscriber #2: 2
D/RxJava: Subscriber #2: 3
Run Code Online (Sandbox Code Playgroud)
现在,我知道我可以通过使用避免重复计数,publish().autoConnect()但我试图首先理解这种默认行为.每当有人订阅observable时,它就会开始发出数字序列.我明白了.因此,当Subscriber 1连接时它开始发射物品.Subscriber 2马上连接,为什么不能获得价值呢?
这是我理解它的方式,从可观察的角度来看:
有人订阅了我,我应该开始发出物品
[订阅者:1] [项目到EMIT:1,2,3]
向订户发出项目"1"
[订阅者:1] [项目到EMIT:2,3]
有人订阅了我,当我完成后我将再次发出1,2,3
[订阅者:1&2] [项目到EMIT:2,3,1,2,3]
向订户发出项目'2'
[订阅者:1&2] [项目到EMIT:3,1,2,3]
向订户发出项目'3'
[订阅者:1&2] …
rx-java ×4
spring ×2
asynchronous ×1
backpressure ×1
caching ×1
flutter ×1
future ×1
haskell ×1
java ×1
javascript ×1
netty ×1
reactiveui ×1
rxjs ×1
scala ×1
webflux ×1