标签: reactive-programming

一旦观察者得到理想的结果,就可以安全/有效地处理活跃的Observable

我已经订阅了一个推送内容的频率很高的Observable,这些内容来自网络I/O,所以每次推送都来自不同的线程,然后我有一些观察者可能会尝试获取一些内容然后快速取消订阅以确保有没有其他内容传入,所以代码示例如下:

        IDisposable dsp = null;
        dsp = TargetObservable.Subscribe((incomingContent) =>
        {
            if (incomingContent == "something")
            {
                myList.Add(incomingContent);
                dsp.Dispose();
            }
            else
            {
                otherList.Add(incomingContent);
            }
        });
Run Code Online (Sandbox Code Playgroud)

现在,OnNext显然不是线程安全的,意味着当Observer在调用Dispose()之前得到"东西"时,其他内容可能仍然传入并添加到'otherList',即使我放了一个'lock(.. .)'为整个'onNext(...)'.
这不是我们想要的,所以任何想法都要避免这种情况?我可以考虑的一种方法是修改Observable逐个推送内容(通过使用'lock'),然后性能必须伤害很多.谢谢.

.net c# multithreading reactive-programming system.reactive

0
推荐指数
1
解决办法
2279
查看次数

使用TcpClient和Reactive Extensions从Stream读取连续字节流

请考虑以下代码:

internal class Program
{
    private static void Main(string[] args)
    {
        var client = new TcpClient();
        client.ConnectAsync("localhost", 7105).Wait();
        var stream = client.GetStream();
        var observable = stream.ReadDataObservable().Repeat();

        var s = from d in observable.Buffer(4)
                let headerLength = IPAddress.NetworkToHostOrder(BitConverter.ToInt16(d.ToArray(), 2))
                let b = observable.Take(headerLength)
                select b.ToEnumerable().ToArray();
        s.Subscribe(a => Console.WriteLine("{0}", a));
        Console.ReadLine();
    }
}

public static class Extensions
{
    public static IObservable<byte> ReadDataObservable(this Stream stream)
    {
        return Observable.Defer(async () =>
        {
            var buffer = new byte[1024];
            var readBytes = await stream.ReadAsync(buffer, 0, buffer.Length); …
Run Code Online (Sandbox Code Playgroud)

c# stream tcpclient reactive-programming system.reactive

0
推荐指数
1
解决办法
2410
查看次数

如何检测Observable中的变化?

假设我有IObservable并且我想要一个忽略原始数字的重复数字的可观察量,我该怎么做?我尝试了以下内容

我已经尝试了GroupBy(),但它是一个热门的观察者,它不会起作用.我需要比较的是前一个.

reactive-programming system.reactive observable

0
推荐指数
1
解决办法
71
查看次数

RxJs中switchLatest和flatmapLastest之间有什么区别

我很难看到RxJs中switchLatest和flatmapLatest之间的区别,其中一个区别是嵌套的可观察事件,就像scala中的事实一样,而另一个则相当于在返回新的flattened集合之前执行它?

我错过了什么.

javascript scala reactive-programming reactive-extensions-js rxjs

0
推荐指数
1
解决办法
1536
查看次数

如何使用React创建一个"类似"的剩余字符

我正在寻找一个带有"目标字符数"的反应计数器,就像Twitter一样,它会随着用户的输入而减少.

例如,在"元描述"字段中,目标字符数为160.因此,如果字段为空,则数字为160.当用户键入时,计数会随着每个字符添加到输入字段而减少,直到它达到零.

如果计数高于目标,则数字将以红色写入,前面带有减号(再次,就像推特一样).

一种方法是在textarea上监听onChange事件,并更新组件的状态(具有textarea和计数器),然后使用它来计算长度并渲染剩余的char计数器.

有更有效的方法吗?

javascript twitter reactive-programming twitter-bootstrap reactjs

0
推荐指数
1
解决办法
7194
查看次数

RxJS比较最后并发出

我对ReactiveJS相当陌生,我想实现以下目标:

仅在值(对象)与上一个不同的时间戳(updated_at)时才发出。

var checkJobsRx = new Rx.Subject();
checkJobsRx
.dosomethinghere()
.subscribe(function (data) {})
Run Code Online (Sandbox Code Playgroud)

javascript reactive-programming rxjs

0
推荐指数
1
解决办法
476
查看次数

RxJS + Node.js Http请求

Nodejs服务器端实现:如何使用https://www.npmjs.com/package/requesthttps://www.npmjs.com/package/rx一起向https://www.reddit.com/发出GET请求r/javascript.json

目标:无论何时我正在使用的网站api url都有数据更改,我都会尝试实现持续流式处理.

javascript httprequest reactive-programming node.js rxjs

0
推荐指数
1
解决办法
6799
查看次数

RxJava订阅不起作用

我有以下代码 -

package com.test.rxjava;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import io.reactivex.Flowable;

public class App1 {

    public static void main(String[] args) {

        Subscriber<Integer> subscriber = new Subscriber<Integer>() {

            @Override
            public void onSubscribe(Subscription s) {
            }

            @Override
            public void onNext(Integer t) {
                System.out.printf("Entry %d\n", t);
            }

            @Override
            public void onError(Throwable t) {
                System.err.printf("Failed to process: %s\n", t);
            }

            @Override
            public void onComplete() {
                System.out.println("Done");
            }

        };
        Flowable.just(123).subscribe(subscriber);

    }

}
Run Code Online (Sandbox Code Playgroud)

我期待在onNext方法中执行代码.但是没有任何反应.但是如果我用下面的代码替换最后一行,我确实得到了输出.

Flowable.just(123).subscribe((t) -> System.out.println(t));
Run Code Online (Sandbox Code Playgroud)

我不确定这里缺少什么.但绝对是肯定的.我是Rx世界的新手,可以帮助找出问题所在.提前致谢!

java reactive-programming rx-java

0
推荐指数
1
解决办法
1857
查看次数

Rx.Observable.fromEvent(document,'click')vs.document.addEventListener('click',callback)

我刚开始学习反应式编程和使用RxJS库.

让我头脑发热的一件事就是为什么我会使用Rx而JavaScript中有事件的概念.

例如,Rx.Observable.fromEvent(document, 'click')和之间的差异是什么document.addEventListener('click', callback).两者都是click异步处理事件.

那么为什么我会在这种情况下使用observable?

javascript events reactive-programming rxjs

0
推荐指数
2
解决办法
3425
查看次数

iOS RxSwift如何检查Result == .success?

我只对.success来自查询的类型的结果感兴趣。

如何设置过滤器以仅通过Result<Value>枚举的.success结果?

public enum Result<Value> {
    case success(Value)
    case failure(Error)
}


query.filter{ (result: Result<Double>) in
                switch result {
                case .success:
                    return true
                case .failure:
                    return false
                }
            }
Run Code Online (Sandbox Code Playgroud)

我只想对成功进行速记​​检查,但是编译器不允许这样做。

.filter{result in
    return result == .success
}
Run Code Online (Sandbox Code Playgroud)

enums reactive-programming ios swift rx-swift

0
推荐指数
1
解决办法
404
查看次数