我目前正在研究.NET的Reactive Extensions框架,我正在研究我发现的各种介绍资源(主要是http://www.introtorx.com)
我们的应用程序涉及许多检测网络帧的硬件接口,这些接口将是我的IObservables,然后我有各种组件将使用这些帧或对数据执行某种方式的转换并生成新类型的帧.例如,还有其他组件需要显示每个第n帧.我确信Rx对我们的应用程序有用,但是我正在努力处理IObserver接口的实现细节.
我读过的大多数(如果不是全部)资源都说我不应该自己实现IObservable接口,而是使用提供的函数或类之一.从我的研究看来,创建一个Subject<IBaseFrame>将为我提供我需要的东西,我会让我的单线程从硬件接口读取数据,然后调用我的Subject<IBaseFrame>实例的OnNext函数.然后,不同的IObserver组件将从该Subject接收其通知.
我的困惑来自本教程附录中的建议,其中说:
避免使用主题类型.Rx实际上是一种函数式编程范例.使用主题意味着我们现在正在管理状态,这可能会发生变异.同时处理变异状态和异步编程很难做到.此外,许多运算符(扩展方法)都经过精心编写,以确保维护订阅和序列的正确和一致的生命周期; 当你介绍科目时,你可以打破这个.如果您明确使用主题,未来版本也可能会出现明显的性能下降.
我的应用程序对性能至关重要,我显然会在进入生产代码之前测试使用Rx模式的性能; 但是我担心我正在通过使用Subject类来做违背Rx框架精神的事情,并且该框架的未来版本将损害性能.
有没有更好的方式做我想要的?无论是否有任何观察者,硬件轮询线程都将连续运行(否则HW缓冲区将备份),因此这是一个非常热门的序列.然后我需要将收到的帧传递给多个观察者.
任何建议将不胜感激.
我有一个要解析的项目列表,但其中一个项目的解析可能会失败.
什么是"Rx-Way"来捕获错误但继续执行序列
代码示例:
var observable = Rx.Observable.from([0,1,2,3,4,5])
.map(
function(value){
if(value == 3){
throw new Error("Value cannot be 3");
}
return value;
});
observable.subscribe(
function(value){
console.log("onNext " + value);
},
function(error){
console.log("Error: " + error.message);
},
function(){
console.log("Completed!");
});Run Code Online (Sandbox Code Playgroud)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>Run Code Online (Sandbox Code Playgroud)
我想以非Rx方式做什么:
var items = [0,1,2,3,4,5];
for (var item in items){
try{
if(item == 3){
throw new Error("Value cannot be 3");
}
console.log(item);
}catch(error){
console.log("Error: " + error.message);
}
}Run Code Online (Sandbox Code Playgroud)
提前致谢
有人能解释一下defer和create方法之间的区别Observable吗?我不知道什么时候应该使用defer,什么时候应该使用create..
参考文献:
推迟:http://reactivex.io/documentation/operators/defer.html
创建:http://reactivex.io/documentation/operators/create.html
谢谢
我正在阅读rxjs的官方文档,然后我意识到他们都在做同样的事情.
对我来说,他们似乎都完全相似.
请有人指出它们之间的区别(如果有的话)
我希望我的观察能够立即射击,并且每秒钟都会射击.interval不会立即开火.我发现这个问题建议使用startWith,哪个会立即触发,但我得到一个重复的第一个条目.
Rx.Observable.interval(1000).take(4).startWith(0).subscribe(onNext);
Run Code Online (Sandbox Code Playgroud)
https://plnkr.co/edit/Cl5DQ7znJRDe0VTv0Ux5?p=preview
如何立即点火,但不能复制第一个条目?
我正试图绕过黄金法则(如果有的话):
何时使用BehaviorSubject?
和
何时使用PublishSubject?
他们之间的区别非常明显
有很多种科目.对于这个特定的要求,PublishSubject运行良好,因为我们希望从它停止的位置继续序列.所以假设事件1,2,3在(B)中发出,在(A)连接之后我们只想看到4,5,6.如果我们使用ReplaySubject,我们会看到[1,2,3],4, 5,6; 或者如果我们使用了BehaviorSubject,我们会看到3,4,5,6等等(来源:如何考虑RxJava中的主题(第1部分))
我已经看到它Subject用于两个上下文(至少),UI上下文和监听器上下文.
例如这里一个BehaviorSubject被使用,并且他们为什么使用它显然Subject并没有Observable,但我已经改变了BehaviorSubject到PublishSubject,但应用程序的行为仍然是相同的.
他们为什么要创建项目领域BehaviorSubject而不是PublishSubject?
它有时被称为"功能反应式编程",但这是用词不当.ReactiveX可能是功能性的,它可能是反应性的,但"功能反应性编程" 是一种不同的动物.一个主要的不同点是功能性反应式编程对随时间连续变化的值进行操作,而ReactiveX对随时间发射的离散值进行操作.
同时,从Wikipedia的Functional Reactive Programming页面,ReactiveX列在"实现"部分:
实现[编辑]
- cellx,超快速实现javascript的反应性
- Elm,编译为HTML,CSS和JavaScript的FRP语言
- Ruby中的Frappuccino FRP实现
- Flapjax,JavaScript中的行为/事件FRP实现
- Reactive.jl,Julia中的FRP实现
- ReactiveX,多种语言的FRP实现,包括Java,JavaScript,Python,Swift等等
- Haskell中的reactive-banana FRP实现
- ReactiveCocoa FRP在Swift和Objective-C中实现
- ReactiveKit FRP在纯Swift中实现
- Haskell中的Reflex FRP实现
- Scala(和Scala.js)中的Scala.Rx FRP实现
- 在C#,C++,Haskell(不推荐使用[12]),Java,> Rust和Scala中使用Sodium,FRP实现
- Haskell中的Yampa FRP实现
我非常了解ReactiveX的作用,并对"反应式编程"和"功能反应式编程"进行了一些研究,但我仍无法区分它们之间的关系.
事实上,我有点认为维基百科页面用词不当或错误地列出了"实现"部分中的示例,因为我知道cellx和ReactiveX(在示例中都列出了它们)是为了解决完全不同的问题而构建的.
我的用例如下:我得到的事件有时会突然爆发.如果发生爆发,我只需要处理一次.去抖是这样做的.
然而,debounce只给了我一个爆发的最后一个元素,但我需要知道一个爆发中的所有元素聚合在它们上面(使用flatmap).
这可以通过定时窗口或缓冲区来完成,但是,这些是固定间隔,因此缓冲区/窗口超时可能发生在突发中间,因此将突发分成两部分而不是1.
所以我想要的是类似的东西
.
.
event: a
.
. -> a
.
.
.
.
.
.event: b
.event: c
.event: d
.
.-> b,c,d
.
.
.
.
.event : e
.
. -> e
.
Run Code Online (Sandbox Code Playgroud) 我想观察房产UITextfield.editing.我正在使用此代码:
self.money.rx_observe(Bool.self, "editing").subscribeNext { (value) in
print("")
}.addDisposableTo(disposeBag)
Run Code Online (Sandbox Code Playgroud)
但是在运行过程中,它只执行一次.请问我该如何解决这个问题
使用时出现 UndeliverableExceptioncompletable
public Completable createBucketWithStorageClassAndLocation() {
return Completable.complete()
.doFinally(() -> {
Bucket bucket =
storage.create(
BucketInfo.newBuilder(googleUploadObjectConfiguration.bucketName())
.setStorageClass(storageClass)
.setLocation(googleUploadObjectConfiguration.locationName())
.build());
}).doOnError(error -> LOG.error(error.getMessage()));
}
Run Code Online (Sandbox Code Playgroud)
异常是从 Google 存储中抛出的,这是正确的,但尝试处理doOnError方法
Caused by: com.google.cloud.storage.StorageException: You already own this bucket. Please select another name.
Run Code Online (Sandbox Code Playgroud)
RXJava 异常
io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | com.google.cloud.storage.StorageException: You already own this bucket. …Run Code Online (Sandbox Code Playgroud)