标签: reactive-programming

反应式编程 - Node.js中的RxJS与EventEmitter

最近我开始研究RxJS和RxJava(来自Netflix)的图书馆,这些图书馆致力于反应式编程的概念.

Node.js基于事件循环工作,它为您提供了异步编程的所有工具,后续的节点库(如"cluster")可帮助您充分利用多核机器.Node.js还为您提供了EventEmitter功能,您可以在其中订阅事件并以异步方式对其进行操作.

另一方面,如果我理解正确RxJS(和一般的反应式编程)工作原理事件流,订阅事件流,异步转换事件流数据.

所以,问题是在Node.js中使用Rx包是什么意思.Node的事件循环,事件发射器和订阅Rx的流和订阅有多么不同.

javascript asynchronous reactive-programming node.js rxjs

49
推荐指数
2
解决办法
2万
查看次数

观察者模式和反应式编程之间有什么区别?

最近我听到了很多关于反应式编程的术语.但是当我搜索它时,我发现的只是观察者模式的一些相似之处.实际上,我发现他们之间没有任何不同.它们之间的概念差异以及为什么反应式编程这个术语会被嗡嗡声?

reactive-programming observer-pattern

45
推荐指数
2
解决办法
9904
查看次数

Knockout.js和Rx.js有什么区别?

有谁知道RxJsKnockout之间的区别?从表面上看,他们似乎试图解决同样的问题,构建一个事件驱动的UI.但是有过两种经验的人,他们有什么不同/他们有什么相似之处?你能描述一些关于它们的东西来帮助我选择吗?

reactive-programming rxjs knockout.js

44
推荐指数
2
解决办法
9501
查看次数

发生错误时,rxjs observable无法完成

当我从头创建一个observable并且有观察者错误然后完成时,永远不会调用订阅的完成部分.

var observer = Rx.Observable.create(function(observer){
    observer.onError(new Error('no!'));
    observer.onCompleted();
})

observer.subscribe(
    function(x) { console.log('succeeded with ' + x ) },
    function(x) { console.log('errored with ' + x ) },
    function() { console.log('completed') }
)
Run Code Online (Sandbox Code Playgroud)

输出是:

errored with Error: no!
Run Code Online (Sandbox Code Playgroud)

我希望它是:

errored with Error: no!
completed
Run Code Online (Sandbox Code Playgroud)

如果我更改代码以调用onNext而不是onError,则observable正确完成:

var observer = Rx.Observable.create(function(observer){
    observer.onNext('Hi!');
    observer.onCompleted();
})

observer.subscribe(
    function(x) { console.log('succeeded with ' + x ) },
    function(x) { console.log('errored with ' + x ) },
    function() { console.log('completed') }
)
Run Code Online (Sandbox Code Playgroud)

我得到了预期的输出:

succeeded with Hi! …
Run Code Online (Sandbox Code Playgroud)

javascript reactive-programming rxjs angular

44
推荐指数
4
解决办法
2万
查看次数

订阅Vs订阅RxJava2(Android)?

何时调用subscribeWith方法而不是普通订阅?什么是用例?

compositeDisposable.add(get()
    .observeOn(AndroidSchedulers.mainThread())
    .subscribeOn(Schedulers.io())
    .subscribe(this::handleResponse, this::handleError));
Run Code Online (Sandbox Code Playgroud)

VS

   compositeDisposable.add(get()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io())
              //  .subscribe(this::handleResponse, this::handleError);
                .subscribeWith(new DisposableObserver<News>() {
                    @Override public void onNext(News value) {
                        handleResponse(value);
                    }

                    @Override public void onError(Throwable e) {
                        handleError(e);
                    }

                    @Override public void onComplete() {
                       // dispose here ? why? when the whole thing will get disposed later
                       //via  compositeDisposable.dispose();  in onDestroy();
                    }
                }));
Run Code Online (Sandbox Code Playgroud)

谢谢


稍后添加

根据文档,两者都返回一次性SingleObserver实例:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <E extends SingleObserver<? super T>> E subscribeWith(E observer) {
    subscribe(observer);
    return observer;
}

@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable …
Run Code Online (Sandbox Code Playgroud)

android reactive-programming rx-java rx-java2

43
推荐指数
1
解决办法
1万
查看次数

推荐阅读/教程,了解反应性香蕉FRP库

我对FRP(功能反应编程)反应香蕉 haskell库感兴趣.你会推荐什么读新手才能理解反应性香蕉背后的理论?据我了解,这个领域已取得一些进展,不同的FRP图书馆使用不同的方法,所以我认为任何FRP论文都不会这样做.

haskell frp reactive-programming

42
推荐指数
3
解决办法
5844
查看次数

RxJava - 获取列表中的每个项目

我有一个返回an的方法Observable<ArrayList<Long>>,它是一些Items的id.我想通过这个列表并使用另一个返回的方法下载每个Item Observable<Item>.

我如何使用RxJava运算符执行此操作?

java monads reactive-programming rx-java

41
推荐指数
2
解决办法
4万
查看次数

RxJava观察调用/订阅线程

我有点麻烦了解subscribeOn/observeOn如何在RxJava中工作.我创建了一个带有observable的简单应用程序,可以发出太阳系行星名称,进行一些映射和过滤并打印结果.

据我所知,调度工作到后台线程是通过subscribeOn运算符完成的(它似乎工作正常).

观察后台线程也适用于observeOn运营商.

但是我在理解,如何观察调用线程(如果它是主线程或任何其他线程)时遇到了麻烦.它很容易在Android上运行AndroidSchedulers.mainThread(),但我不知道如何在纯java中实现这一点.

这是我的代码:

public class Main {

    public static void main(String[] args) throws InterruptedException {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 3000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

        System.out.println("Main thread: " + getCurrentThreadInfo());

        Observable<String> stringObservable = Observable.from(Arrays.asList("Merkury", "Wenus", "Ziemia", "Mars", "Jowisz", "Saturn", "Uran", "Neptun", "Pluton"))
                .map(in -> {
                    System.out.println("map on: " + getCurrentThreadInfo());
                    return in.toUpperCase();
                })
                .filter(in -> {
                    System.out.println("filter on: " + getCurrentThreadInfo());
                    return in.contains("A");
                })
                .subscribeOn(Schedulers.from(executor));

        for (int i = 0; i < …
Run Code Online (Sandbox Code Playgroud)

java multithreading reactive-programming rx-java

39
推荐指数
2
解决办法
3万
查看次数

101 Rx示例

编辑:感谢wiki的链接,我认为自从它已经开始,它更容易去那里检查.不过这里的问题也很好,所以不在msdn论坛周围的人会了解wiki及其位置.

简短问题:

你有一个Rx代码示例,可以帮助人们更好地理解它吗?

漫长的漫无边际的问题:

现在已经发布了Rx框架,我认为我们中的许多人都有兴趣获取这些内容并尝试它们.可悲的是,实际上并没有很多例子(经过详尽的搜索后,我几乎确信Rx只是为了在wpf应用上轻松实现).

我不记得我读到或听过的确切位置(我一直在看很多博客和视频),Rx团队似乎对101系列感兴趣...当他们有足够的时间去做...对于那些想要了解它并且现在玩它的人来说非常糟糕(我的意思是,当一个像这样的新技术出现时,自我尊重的开发人员不会觉得自己是一个带有新玩具的孩子) .

我个人现在一直在尝试,但哇有一些疯狂的概念......只是让像MaterialiseZip这样的方法让我想起了TeleportersBack to the Future的东西.

所以,我认为这将是很好,如果有更多的了解的,帮助建立的范例集,阿拉101个LINQ的例子,从基本的使用变为更复杂的东西,几乎涵盖了所有的方法和他们的用途,一种实用的方法(也许还有一点理论,特别是因为这些概念可能需要它)

我认为MS开发人员花时间给我们这样的材料很棒,但我也认为这个社区足以开始构建我们自己的材料,不是吗?

.net c# reactive-programming system.reactive

38
推荐指数
4
解决办法
2万
查看次数

Rx中的IConnectableObservables

有人可以解释Observable和ConnectableObservable之间的区别吗?Rx Extensions文档非常稀疏,我不明白ConnectableObservable在什么情况下是有用的.

此类用于Replay/Prune方法.

.net reactive-programming system.reactive

38
推荐指数
1
解决办法
3098
查看次数