Dan*_*ler 5 c# system.reactive reactive
有人可以解释为什么AsObservable即使到达流末尾,以下方法也会创建无限循环吗?
public static class StreamExt {
public static IObservable<byte> AsObservable(this Stream stream, int bufferSize) {
return Observable
.FromAsync(cancel => stream.ReadBytes(bufferSize, cancel))
.Repeat()
.TakeWhile(bytes => bytes != null) // EndOfStream
.SelectMany(bytes => bytes);
}
private static async Task<byte[]> ReadBytes(this Stream stream, int bufferSize, CancellationToken cancel) {
var buf = new byte[bufferSize];
var bytesRead = await stream
.ReadAsync(buf, 0, bufferSize, cancel)
.ConfigureAwait(false);
if (bytesRead < 1) return null; // EndOfStream
var result_size = Math.Min(bytesRead, bufferSize);
Array.Resize(ref buf, result_size);
return buf;
}
}
Run Code Online (Sandbox Code Playgroud)
快速测试表明它会产生无限循环:
class Program {
static void Main(string[] args) {
using (var stream = new MemoryStream(new byte[] { 1, 2, 3 })) {
var testResult = stream
.AsObservable(1024)
.ToEnumerable()
.ToArray();
Console.WriteLine(testResult.Length);
}
}
}
Run Code Online (Sandbox Code Playgroud)
当然,我可以添加一个.SubscribeOn(TaskPoolScheduler.Default),但是,无限循环保持活动状态(阻止任务池调度程序 + 无限读取Stream)。
[更新 2017-05-09]
Shlomo 发布了一个更好的例子来重现这个问题:
int i = 0;
var testResult = Observable.FromAsync(() => Task.FromResult(i++))
.Repeat()
.TakeWhile(l => l < 3);
testResult.Subscribe(b => Console.WriteLine(b), e => { }, () => Console.WriteLine("OnCompleted"));
Console.WriteLine("This is never printed.");
Run Code Online (Sandbox Code Playgroud)
对于最终来到这里并需要答案而不仅仅是解释的任何人:问题似乎是 的默认调度程序,如这个自我回答的问题FromAsync所示。如果您调整为“当前线程”调度程序的行为会更加可预测。例如(问题摘录):Repeat().TakeWhile(...)
.FromAsync(cancel => stream.ReadBytes(bufferSize, cancel),
System.Reactive.Concurrency.Scheduler.CurrentThread)
.Repeat()
.TakeWhile(bytes => bytes != null) // EndOfStream
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1060 次 |
| 最近记录: |