是否可以在Rx中的不同线程上调用订阅者的OnNexts?

ada*_*ada 3 c# system.reactive

我是Rx的新手.我想知道是否可以向不同的订阅者发送消息,以便它们在不同的线程上运行?IObserable如何控制呢?简单的Subject实现,据我所知,它在一个线程上一个接一个地调用订阅者.


public class Subsciber : IObserver<int>
{
    public void OnNext(int a)
    {
        // Do something
    }
    public void OnError(Exception e)
    {
        // Do something
    }
    public void OnCompeleted()
    {
    }

} 

public static class Program
{
   public void static Main()
   {
       var observable = new <....SomeClass....>();
       var sub1 = new Subscriber();
       var sub2 = new Subscriber();
       observable.Subscribe(sub1);
       observable.Subscribe(sub2);
       // some waiting function 
   }
}
Run Code Online (Sandbox Code Playgroud)

如果我使用Subject作为'SomeClass',那么在sub1的OnNext()完成之前,不会调用sub2的OnNext().如果sub1花了很多时间,我不希望它延迟sub2的接收.有人能告诉我Rx如何为SomeClass提供这种实现.

Eni*_*ity 7

您编写的代码几乎可以并行运行observable.如果你把你的观察者写成:

public class Subscriber : IObserver<int>
{
    public void OnNext(int a)
    {
        Console.WriteLine("{0} on {1} at {2}",
            a,
            Thread.CurrentThread.ManagedThreadId,
            DateTime.Now.ToString());
    }
    public void OnError(Exception e)
    { }
    public void OnCompleted()
    { }
} 
Run Code Online (Sandbox Code Playgroud)

然后运行此代码:

var observable =
    Observable
        .Interval(TimeSpan.FromSeconds(1.0))
        .Select(x => (int)x)
        .Take(5)
        .ObserveOn(Scheduler.ThreadPool);
var sub1 = new Subscriber();
var sub2 = new Subscriber();
observable.Subscribe(sub1);
observable.Subscribe(sub2);
Thread.Sleep(10000);
Run Code Online (Sandbox Code Playgroud)

将产生以下内容:

0 on 28 at 2011/10/20 00:13:49
0 on 16 at 2011/10/20 00:13:49
1 on 29 at 2011/10/20 00:13:50
1 on 22 at 2011/10/20 00:13:50
2 on 27 at 2011/10/20 00:13:51
2 on 29 at 2011/10/20 00:13:51
3 on 27 at 2011/10/20 00:13:52
3 on 19 at 2011/10/20 00:13:52
4 on 27 at 2011/10/20 00:13:53
4 on 27 at 2011/10/20 00:13:53
Run Code Online (Sandbox Code Playgroud)

它已经在不同的线程上并行运行订阅.

我使用的重要的是.ObserveOn扩展方法 - 这是使这项工作的原因.

您应该记住,观察者通常不共享同一个可观察对象的实例.订阅观察者有效地将可观察操作符的唯一"链"从可观察源连接到观察者.这与GetEnumerator在枚举上调用两次非常相同,您将不会共享相同的枚举器实例,您将获得两个唯一的实例.

现在,我想描述链条的含义.我将从Observable.Generate&提供Reflector.NET提取的代码Observable.Where来说明这一点.

以此代码为例:

var xs = Observable.Generate(0, x => x < 10, x => x + 1, x => x);
var ys = xs.Where(x => x % 2 == 0);
ys.Subscribe(y => { /* produces 0, 2, 4, 6, 8 */ });
Run Code Online (Sandbox Code Playgroud)

引擎盖下都GenerateWhere每个创建内部的Rx类的新实例AnonymousObservable<T>.构造函数AnonymousObservable<T>接受Func<IObserver<T>, IDisposable>它在接收调用时使用的委托Subscribe.

Observable.Generate<T>(...)来自Reflector.NET 的略微清理的代码是:

public static IObservable<TResult> Generate<TState, TResult>(
    TState initialState,
    Func<TState, bool> condition,
    Func<TState, TState> iterate,
    Func<TState, TResult> resultSelector,
    IScheduler scheduler)
{
    return new AnonymousObservable<TResult>((IObserver<TResult> observer) =>
    {
        TState state = initialState;
        bool first = true;
        return scheduler.Schedule((Action self) =>
        {
            bool flag = false;
            TResult local = default(TResult);
            try
            {
                if (first)
                {
                    first = false;
                }
                else
                {
                    state = iterate(state);
                }
                flag = condition(state);
                if (flag)
                {
                    local = resultSelector(state);
                }
            }
            catch (Exception exception)
            {
                observer.OnError(exception);
                return;
            }
            if (flag)
            {
                observer.OnNext(local);
                self();
            }
            else
            {
                observer.OnCompleted();
            }
        });
    });
}
Run Code Online (Sandbox Code Playgroud)

Action self参数是一个递归调用,它迭代输出值.您会注意到此代码中没有任何地方observer存储get或者值被粘贴到多个观察者.此代码对每个新观察者运行一次.

Observable.Where<T>(...)来自Reflector.NET 的略微清理的代码是:

public static IObservable<TSource> Where<TSource>(
    this IObservable<TSource> source,
    Func<TSource, bool> predicate)
{
    return new AnonymousObservable<TSource>(observer =>
        source.Subscribe(x =>
        {
            bool flag;
            try
            {
                flag = predicate(x);
            }
            catch (Exception exception)
            {
                observer.OnError(exception);
                return;
            }
            if (flag)
            {
                observer.OnNext(x);
            }
        }, ex => observer.OnError(ex), () => observer.OnCompleted));
}
Run Code Online (Sandbox Code Playgroud)

同样,此代码不会跟踪多个观察者.它调用Subscribe有效地将自己的代码作为观察者传递给底层的sourceobservable.

您应该看到,在上面的示例代码中,订阅Where创建了一个订阅Generate,因此这是一个可观察链.事实上,它在一系列AnonymousObservable对象上链接订阅调用.

如果您有两个订阅,则您有两个链.如果您有1,000个订阅,则您拥有1,000个链.

现在,作为旁注 - 即使有IObservable<T>IObserver<T>接口 - 你应该很少在你自己的类中实际实现它们.内置的类和操作符处理所有情况的99.99%.这有点像IEnumerable<T>- 你经常需要自己实现这个界面吗?

如果这有帮助,如果您需要任何进一步的解释,请告诉我.