如何处理异常并使用 Rx 重复

Ber*_*ian 1 c# system.reactive observable

我正在使用执行IO操作Observable.FromAsync。我想永远重复这个操作。我不明白的是如何处理异常,对它们做一些事情,然后返回我的循环:

我尝试过的:

IObservable<string> ioObs=Observable.FromAsync<string>([something]);  //at each iteration i do an io operation (reading from a socket);
IObservable<string> loop=Observable.Catch(ioObs).Repeat();
loop.Subscribe(
  onNext:x=> Console.Writeline($"Message:{x}"),
  onCompleted: Console.Writeline("Completed"),
  onError: ex=>Console.Writeline($"Error\tReason:{ex.Message}")
);
Run Code Online (Sandbox Code Playgroud)

现在我不明白为什么我的可观察在第一个异常之后结束。我不告诉它继续吗?

我想做的事:

  • 执行IO操作
  • 如果抛出返回一些自定义值
  • 重复循环

如果我的可观察量是可枚举的,我会想要这种行为:

public IAsyncEnumerable<string> EnumerableBehaviour()
{
   while(true)
   {
      try
      {
        string data=await ReadAsync();  //the `FromAsync` delegate
        yield return data;
      }catch(Exception ex)
          yield return "Error";
      {
   }
}
Run Code Online (Sandbox Code Playgroud)

Repeat即使OnError被触发,我如何继续执行?

应该Observable.Catch和如何Observable.Throw结合Observable.Repeat

Eni*_*ity 5

当你有一个IObservable<string> ioObs = Observable.FromAsync<string>(Something);可观察对象时,你就拥有了一个可以返回值然后完成 ({OnNext}{OnCompleted}) 的可观察对象,或者你有一个会抛出异常 ({OnError}) 的可观察对象。

让源在返回值或错误后重复执行非常简单。

IObservable<string> query = ioObs.Retry().Repeat();
Run Code Online (Sandbox Code Playgroud)

.Retry()说“如果出现错误,请再去一次”。.Repeat()说“如果可观察完成则再次订阅”。

现在这有点危险,因为您已经生成了一个将连续执行的可观察对象。你需要找到一些方法来阻止它。

您的选择是:

  • 处理订阅
  • 取一定数量的值(即.Take(n)
  • 设置一个.Timeout.
  • 或者使用TakeUntil

ioObs当原始文件在完成时返回 null 或空字符串时,最后一个是好的。

你可以这样做:

IObservable<string> query = ioObs.Retry().Repeat()
        .TakeUntil(x => x == null);
        
Run Code Online (Sandbox Code Playgroud)

这是一段测试代码,您可以尝试一下:

private int __counter = 0;
Task<string> Something()
{
    return Task.Run(() =>
    {
        if (Interlocked.Increment(ref __counter) % 7 == 0)
        {
            throw new Exception("Blam!");
        }
        return $"Hello World {__counter}";
    });
}
Run Code Online (Sandbox Code Playgroud)

然后这样做:

IObservable<string> ioObs = Observable.FromAsync<string>(Something);

IObservable<string> query = ioObs.Retry().Repeat()
        .TakeUntil(x => x.EndsWith("19"));
        
Run Code Online (Sandbox Code Playgroud)

当我订阅时我得到:

你好世界1
你好世界2
你好世界3
你好世界4
你好世界5
你好世界6
你好世界8
你好世界9
你好世界10
你好世界11
你好世界12
你好世界13
你好世界15
你好世界16
你好世界17
你好世界18
你好世界19

请注意,缺少7和 ,14因为那是引发异常的时间。