嵌套的Observable在Wait()上挂起

Joh*_*Koz 4 c# console-application system.reactive observable async-await

在C#控制台应用程序中,使用System.Reactive.Linq,我正在尝试创建一个observable,其中每个项目是由另一个observable进行某些处理的字符串结果.我用字符串和字符创建了一个简单的repro.警告,此示例完全是CONTRIVED,重点是嵌套的.Wait()挂起.

class Program
{
    static void Main(string[] args)
    {
        string[] fileNames = { "file1.doxc", "file2.xlsx", "file3.pptx" };
        IObservable<string> files = fileNames.ToObservable();
        string[] extensions = files.Select(fn =>
        {
            var extension = fn.ToObservable()
            .TakeLast(4)
            .ToArray()
            .Wait(); // <<<<<<<<<<<<< HANG HERE
            return new string(extension);
        })
        .ToArray()
        .Wait();
    }
}
Run Code Online (Sandbox Code Playgroud)

同样,这不是我如何找到许多文件名的后缀.问题是我如何生成一个Observable of strings,其中字符串是从一个完整的observable计算出来的.

如果我拿出这个代码并单独运行它就可以了.

     var extension = fn.ToObservable()
        .TakeLast(4)
        .ToArray()
        .Wait();
Run Code Online (Sandbox Code Playgroud)

有一些关于异步方法的嵌套Wait(),我不明白.

我如何编写嵌套的异步observable,所以我可以生成一个简单的字符串数组?

谢谢

-约翰

Jam*_*rld 5

您的代码阻塞的原因是您在ToObservable()不指定调度程序的情况下使用.在这种情况下,它将使用CurrentThreadScheduler.

因此,filesobservable 使用当前线程发出它的第一个OnNext()[A](发送"file1.doxc").在OnNext()返回之前,它无法继续迭代.但是,内部fnobservable 使用ToObservable()Wait()块直到fn完成 - 它将第一个OnNext()(发送"f")排队到当前线程调度程序,但它永远不能发送它,因为现在第一个OnNext()[A]永远不会返回.

两个简单的修复:

要么files像这样更改observable:

IObservable<string> files = fileNames.ToObservable(NewThreadScheduler.Default);
Run Code Online (Sandbox Code Playgroud)

或者,避免使用内部Wait()带有SelectMany(这绝对是更惯用的Rx):

string[] extensions = files.SelectMany(fn =>
{
    return fn.ToObservable()
             .TakeLast(4)
             .ToArray()
             .Select(x => new string(x));
})
.ToArray()
.Wait();

// display results etc.
Run Code Online (Sandbox Code Playgroud)

每种方法都有完全不同的执行语义 - 第一种方法将像嵌套循环一样运行,每个内部observable在下一次外部迭代之前完成.第二个将更加交错,因为Wait()删除了阻塞行为.如果你使用我编写的Spy方法并在两次ToObservable()调用后附加它,你会很清楚地看到这种行为.

  • 谢谢詹姆斯,很好的答案,重点和信息.我没有充分的理由不使用SelectMany,事实上这是我现在的终极解决方案.我尝试添加NewThreadScheduler.Default仅用于测试,当然也是如此.我得到了观察者在OnNext上有效陷入僵局的概念,但我想我会和Spy玩一下并尝试让自己更清楚.再次感谢! (2认同)