如何查看我的响应式扩展查询的作用?

Jam*_*rld 17 c# debugging system.reactive

我正在编写一个包含大量运算符的复杂Reactive Extensions查询.我怎么能看到发生了什么?

我提出并回答这个问题,因为它很有用,而且可能是很好用的.

Jam*_*rld 41

您可以在开发它们时将这个函数自由地附加到Rx操作符以查看发生了什么:

    public static IObservable<T> Spy<T>(this IObservable<T> source, string opName = null)
    {
        opName = opName ?? "IObservable";
        Console.WriteLine("{0}: Observable obtained on Thread: {1}",
                          opName,
                          Thread.CurrentThread.ManagedThreadId);

        return Observable.Create<T>(obs =>
        {
            Console.WriteLine("{0}: Subscribed to on Thread: {1}",
                              opName,
                              Thread.CurrentThread.ManagedThreadId);

            try
            {
                var subscription = source
                    .Do(x => Console.WriteLine("{0}: OnNext({1}) on Thread: {2}",
                                                opName,
                                                x,
                                                Thread.CurrentThread.ManagedThreadId),
                        ex => Console.WriteLine("{0}: OnError({1}) on Thread: {2}",
                                                 opName,
                                                 ex,
                                                 Thread.CurrentThread.ManagedThreadId),
                        () => Console.WriteLine("{0}: OnCompleted() on Thread: {1}",
                                                 opName,
                                                 Thread.CurrentThread.ManagedThreadId)
                    )
                    .Subscribe(obs);
                return new CompositeDisposable(
                    subscription,
                    Disposable.Create(() => Console.WriteLine(
                          "{0}: Cleaned up on Thread: {1}",
                          opName,
                          Thread.CurrentThread.ManagedThreadId)));
            }
            finally
            {
                Console.WriteLine("{0}: Subscription completed.", opName);
            }
        });
    }
Run Code Online (Sandbox Code Playgroud)

这是一个示例用法,显示了一个微妙的行为差异Range:

Observable.Range(0, 1).Spy("Range").Subscribe();
Run Code Online (Sandbox Code Playgroud)

给出输出:

Range: Observable obtained on Thread: 7
Range: Subscribed to on Thread: 7
Range: Subscription completed.
Range: OnNext(0) on Thread: 7
Range: OnCompleted() on Thread: 7
Range: Cleaned up on Thread: 7
Run Code Online (Sandbox Code Playgroud)

但是这个:

Observable.Range(0, 1, Scheduler.Immediate).Spy("Range").Subscribe();
Run Code Online (Sandbox Code Playgroud)

给出输出:

Range: Observable obtained on Thread: 7
Range: Subscribed to on Thread: 7
Range: OnNext(0) on Thread: 7
Range: OnCompleted() on Thread: 7
Range: Subscription completed.
Range: Cleaned up on Thread: 7
Run Code Online (Sandbox Code Playgroud)

指出不同?

显然你可以改变它来写入日志或调试,或者使用预处理器指令在Release构建等上进行精简传递订阅......

您可以Spy在整个运营商链中申请.例如:

Observable.Range(0,3).Spy("Range")
          .Scan((acc, i) => acc + i).Spy("Scan").Subscribe();
Run Code Online (Sandbox Code Playgroud)

给出输出:

Range: Observable obtained on Thread: 7
Scan: Observable obtained on Thread: 7
Scan: Subscribed to on Thread: 7
Range: Subscribed to on Thread: 7
Range: Subscription completed.
Scan: Subscription completed.
Range: OnNext(1) on Thread: 7
Scan: OnNext(1) on Thread: 7
Range: OnNext(2) on Thread: 7
Scan: OnNext(3) on Thread: 7
Range: OnCompleted() on Thread: 7
Scan: OnCompleted() on Thread: 7
Range: Cleaned up on Thread: 7
Scan: Cleaned up on Thread: 7
Run Code Online (Sandbox Code Playgroud)

我相信你可以找到方法来丰富这个以满足你的目的.

  • 又回来说这种扩展方法仍然经常"拯救我的生命":) (3认同)

Ben*_*jol 8

又三年过去了,我还在沿用你的想法。我的版本现在演变如下:

  • 选择日志记录目的地的重载
  • 记录订阅数量
  • 记录不良订阅者的“下游”异常。

代码:

public static IObservable<T> Spy<T>(this IObservable<T> source, string opName = null)
{
    return Spy(source, opName, Console.WriteLine);
}

public static IObservable<T> Spy<T>(this IObservable<T> source, string opName, 
                                                              Action<string> logger)
{
    opName = opName ?? "IObservable";
    logger($"{opName}: Observable obtained on Thread: {Thread.CurrentThread.ManagedThreadId}");

    var count = 0;
    return Observable.Create<T>(obs =>
    {
        logger($"{opName}: Subscribed to on Thread: {Thread.CurrentThread.ManagedThreadId}");
        try
        {
            var subscription = source
                .Do(x => logger($"{opName}: OnNext({x}) on Thread: {Thread.CurrentThread.ManagedThreadId}"),
                    ex => logger($"{opName}: OnError({ex}) on Thread: {Thread.CurrentThread.ManagedThreadId}"),
                    () => logger($"{opName}: OnCompleted() on Thread: {Thread.CurrentThread.ManagedThreadId}")
                )
                .Subscribe(t =>
                {
                    try
                    {
                        obs.OnNext(t);
                    }
                    catch(Exception ex)
                    {
                        logger($"{opName}: Downstream exception ({ex}) on Thread: {Thread.CurrentThread.ManagedThreadId}");
                        throw;
                    }
                }, obs.OnError, obs.OnCompleted);

            return new CompositeDisposable(
                    Disposable.Create(() => logger($"{opName}: Dispose (Unsubscribe or Observable finished) on Thread: {Thread.CurrentThread.ManagedThreadId}")),
                    subscription,
                    Disposable.Create(() => Interlocked.Decrement(ref count)),
                    Disposable.Create(() => logger($"{opName}: Dispose (Unsubscribe or Observable finished) completed, {count} subscriptions"))
                );
        }
        finally
        {
            Interlocked.Increment(ref count);
            logger($"{opName}: Subscription completed, {count} subscriptions.");
        }
    });
}
Run Code Online (Sandbox Code Playgroud)