Rx框架中的内容允许我在创建过程中等待其他方法时返回IObservable <T>?

cas*_*One 6 .net task-parallel-library system.reactive c#-5.0 .net-4.5

我一直在IObservable<T>使用Reactive Extensions for Twitter的流API创建实现.

从高级别发送HTTP请求并保持连接打开.以长度为前缀的项目被发送到消费.

基本上,它是循环调用Stream.ReadAsync使用await关键字.一个IObserver<T>接口实现(从Observable.Create或从一个块数据流库,没关系,这是一个实现细节)被传送到这个循环,然后调用上的metods IObserver<T>实施,产生可观察到的.

但是,此循环开始处理之前必须完成许多事情,这需要调用Task<T>-reurning方法,所有这些都可以使用await关键字在C#5.0中更容易地调用.像这样的东西:

public async Task<IObservable<string>> Create(string parameter,
    CancellationToken cancellationToken)
{
     // Make some call that requires await.
     var response = await _httpClient.SendAsync(parameter, cancellationToken).
         ConfigureAwait(false);

     // Create a BufferBlock which will be the observable.
     var block = new BufferBlock<T>();

     // Start some background task which will use the block and publish to it
     // on a separate task.  This is not something that is awaited for.
     ConsumeResponse(response, block, cancellationToken);

     // Create the observable.
     return block.AsObservable();
}
Run Code Online (Sandbox Code Playgroud)

也就是说,我现在正在Task<IObservable<T>>从我的方法中返回一个,但是我觉得我在Reactive Extensions中遗漏了一些东西,它可以让我await用来促进我需要做的调用,但也返回一个IObservable<T>而不是一个Task<IObservable<T>>.

Reactive Extensions中的哪个方法允许我创建一个observable,它需要在从创建方法返回之前等待方法?

我发现的最接近的是Observable.DeferAsync.假设对我的方法的调用和observable的使用是这样的:

public async Task Observe()
{
    // NOT the real name of the interface, but explains it's role here.
    IObservableFactory factory;

    // Create is really named something else.
    IObservable<string> observable = factory.Create("parameter");

    // Subscribe.
    observable.Subscribe(o => Console.WriteLine("Observed: {0}", o));

    // Wait.
    await observable;
}
Run Code Online (Sandbox Code Playgroud)

使用DeferAsync将不起作用,因为调用Subscribe将发送第一个请求,然后读取,然后调用awaiton observable将创建第二个订阅,但对不同的 observable.

或者,最终是Task<IObservable<T>>在Reactive Framework中返回适当的方法吗?

随后,由于该方法返回a Task<T>,最好通过a CancellationToken来取消操作.也就是说,我可以理解CancellationToken用于取消observable 的创建,但它是否也应该用于取消实际的 observable(因为它可以被传递下去以读取流等).

我的直觉说不,因为这里存在违反问题分离以及取消的DRY原则:

  • 取消创建和取消观察是两个不同的事情.
  • 调用Subscribe将返回IDisposable将取消订阅的实现.

Bra*_*don 8

我不会回来的Task<IObservable<T>>.在公共API中混合任务和Observable最终会让人感到困惑.请记住,任务可以被视为产生单个值的可观察对象.这也意味着不要将CancellationTokens与公共API中的observable混合使用.您可以通过订阅和取消订阅来控制可观察对象.

这并不意味着你不能混合幕后的概念.这是如何做你想要的Observable.Using,Task.ToObservableCancellationDisposable

首先,修改您的方法以返回Task<ISourceBlock<string>>:

public async Task<ISourceBlock<string>> CreateBlock(string parameter, CancellationToken cancellationToken)
{
     // Make some call that requires await.
     var response = await _httpClient.SendAsync(parameter, cancellationToken).ConfigureAwait(false);

     // Create a BufferBlock which will be the observable.
     var block = new BufferBlock<T>();

     // Start some background task which will use the block and publish to it
     // on a separate task.  This is not something that is awaited for.
     ConsumeResponse(response, block, cancellationToken);

     return block;
}
Run Code Online (Sandbox Code Playgroud)

现在,这是使用上述方法的新Create方法:

public IObservable<string> Create(string parameter)
{
    // Create a cancellation token that will be canceled when the observable is unsubscribed, use this token in your call to CreateBlock.
    // Use ToObservable() to convert the Task to an observable so we can then
    // use SelectMany to subscribe to the block itself once it is available
    return Observable.Using(() => new CancellationDisposable(),
           cd => CreateBlock(parameter, cd.Token)
               .ToObservable()
               .SelectMany(block => block.AsObservable()));
}
Run Code Online (Sandbox Code Playgroud)

编辑:我发现Rx已经实现了这种模式FromAsync:

public IObservable<string> Create(string parameter)
{
    return Observable.FromAsync(token => CreateBlock(parameter, token))
                     .SelectMany(block => block.AsObservable());
}
Run Code Online (Sandbox Code Playgroud)

而且DeferAsync,这更合适,因为你Task实际上正在创建你真正想要观察的Observable(例如你的块):

public IObservable<string> Create(string parameter)
{
    return Observable.DeferAsync(async token => (await CreateBlock(parameter, token)).AsObservable());
}
Run Code Online (Sandbox Code Playgroud)