实现异步"从流中读取所有当前可用的数据"操作

Jon*_*Jon 7 c# console asynchronous stream filestream

我最近提供了这个问题的答案:C# - 实时控制台输出重定向.

正如经常发生的那样,解释一些东西(这里的"东西"就是我解决类似问题的方式)会让你更加了解和/或就像这里的"oops"时刻一样.我意识到我的解决方案已实施,但有一个错误.这个bug没什么实际意义,但作为一个开发人员,它对我来说非常重要:我知道我的代码有可能爆炸,我不能轻易放松.

压缩bug是这个问题的目的.我为长篇介绍道歉,让我们变得肮脏.

我想构建一个允许我从控制台的标准输出接收输入的类Stream.控制台输出流是类型FileStream; 如果需要,实现可以转换为该实现.StreamReader杠杆作用已经存在相关联.

我需要在这个类中实现一件事来实现我想要的功能:异步"读取此刻可用的所有数据"操作.读到流的末尾是不可行的,因为除非进程关闭控制台输出句柄,否则流不会结束,并且它不会这样做,因为它是交互式的并且在继续之前期望输入.

我将使用该假设的异步操作来实现基于事件的通知,这对我的调用者来说会更方便.

该类的公共接口是这样的:

public class ConsoleAutomator {
    public event EventHandler<ConsoleOutputReadEventArgs> StandardOutputRead;

    public void StartSendingEvents();
    public void StopSendingEvents();
}
Run Code Online (Sandbox Code Playgroud)

StartSendingEventsStopSendingEvents做他们做广告的事情; 出于本讨论的目的,我们可以假设事件总是在不失一般性的情况下发送.

该类在内部使用这两个字段:

    protected readonly StringBuilder inputAccumulator = new StringBuilder();

    protected readonly byte[] buffer = new byte[256];
Run Code Online (Sandbox Code Playgroud)

该类的功能在以下方法中实现.为了让球滚动:

    public void StartSendingEvents();
    {
        this.stopAutomation = false;
        this.BeginReadAsync();
    }
Run Code Online (Sandbox Code Playgroud)

要从Stream无阻塞中读取数据,并且不需要回车符,BeginRead则称为:

    protected void BeginReadAsync()
    {
        if (!this.stopAutomation) {
            this.StandardOutput.BaseStream.BeginRead(
                this.buffer, 0, this.buffer.Length, this.ReadHappened, null);
        }
    }
Run Code Online (Sandbox Code Playgroud)

挑战性的部分:

BeginRead需要使用缓冲区.这意味着当从流中读取时,可用于读取的字节("传入块")可能大于缓冲区.请记住,这里的目标是为每个块读取所有块和调用事件订阅者一次.

为此,如果缓冲区已满EndRead,我们不会立即将其内容发送给订阅者,而是将它们附加到a StringBuilder.StringBuilder只有在没有更多内容从流中读取时,才会发回内容.

    private void ReadHappened(IAsyncResult asyncResult)
    {
        var bytesRead = this.StandardOutput.BaseStream.EndRead(asyncResult);
        if (bytesRead == 0) {
            this.OnAutomationStopped();
            return;
        }

        var input = this.StandardOutput.CurrentEncoding.GetString(
            this.buffer, 0, bytesRead);
        this.inputAccumulator.Append(input);

        if (bytesRead < this.buffer.Length) {
            this.OnInputRead(); // only send back if we 're sure we got it all
        }

        this.BeginReadAsync(); // continue "looping" with BeginRead
    }
Run Code Online (Sandbox Code Playgroud)

在任何不足以填充缓冲区的读取之后(在这种情况下,我们知道在上次读取操作期间没有更多数据要读取),所有累积的数据都将发送给订户:

    private void OnInputRead()
    {
        var handler = this.StandardOutputRead;
        if (handler == null) {
            return;
        }

        handler(this, 
                new ConsoleOutputReadEventArgs(this.inputAccumulator.ToString()));
        this.inputAccumulator.Clear();
    }
Run Code Online (Sandbox Code Playgroud)

(我知道只要没有订阅者,数据就会永远累积.这是一个深思熟虑的决定).

好的

这个方案几乎完美地运作:

  • 异步功能,不会产生任何线程
  • 调用代码非常方便(只需订阅一个事件)
  • 每次可以读取数据时,不得超过一个事件
  • 几乎与缓冲区大小无关

坏的

那最后几乎是一个非常大的.考虑当有一个长度恰好等于缓冲区大小的传入块时会发生什么.将读取和缓冲块,但不会触发该事件.这将是一个BeginRead期望找到更多属于当前块的数据,以便将其全部一个一个地发送回来,但是......流中将不再有数据.

实际上,只要数据以长度恰好等于缓冲区大小的块放入流中,数据就会被缓冲,并且永远不会触发事件.

这种情况在实践中可能不太可能发生,特别是因为我们可以为缓冲区大小选择任何数字,但问题在于此.

解?

不幸的是,检查可用的方法后FileStreamStreamReader,我找不到任何可以让我不期而遇的流,同时在其上使用也允许异步方法.

一个"解决方案" ManualResetEvent是在检测到"缓冲区填充"条件后让线程等待a .如果事件没有在短时间内(通过异步回调)发出信号,那么来自流的更多数据将不会出现,并且到目前为止累积的数据应该发送给订户.但是,这引入了对另一个线程的需求,需要线程同步,并且显然不优雅.

指定超时BeginRead也足够了(不时地回调我的代码,以便我可以检查是否有数据要发回;大部分时间都没有任何事情要做,所以我希望性能命中可以忽略不计).但看起来不支持超时FileStream.

因为我认为带有超时的异步调用裸Win32 中的一个选项,另一种方法可能是PInvoke问题的地狱.但这也是不可取的,因为它会引入复杂性并且只是代码的痛苦.

有没有一种优雅的方法来解决这个问题?

感谢您耐心等待阅读所有这些内容.

更新:

我绝对没有在我最初的写作中很好地传达这个场景.我已经修改了相当多的写作,但要更加确定:

问题是关于如何实现异步"读取此刻可用的所有数据"操作.

我向花时间阅读和回答的人道歉,没有我让我的意图足够明确.

Jas*_*ams 1

如果您按照所描述的方式从 FileStream 中读取数据,那么将读取底层文件的全部内容。因此,您将只有一个“块”数据,您将一小段地将其读入 StringBuilder(效率有些低)。您的实现中没有任何方法可以将数据分成更小的“块”,因为读取将继续填充缓冲区,直到文件耗尽。在这个抽象级别,只有客户端知道这些块的大小应该是多少,因此您必须将数据交给他们以解码为块。这违背了缓冲区的最初目的。

如果您有某种其他形式的流可以突发传输数据(例如控制台输出或通信数据包),那么您将获得突发数据,但您仍然无法保证读取结束时少于缓冲区满data 意味着您已经到达数据包的末尾,只是传输中有一个暂停。通常在这些情况下,您需要缓冲数据并对其进行处理(即了解数据格式)以确定何时接收到完整的块/数据包。在这种情况下,您的缓冲区中将始终有一个“未完成的块”等待,直到收到更多数据以终止该块或启动一个新块,并将其“推出”缓冲区。这可能是通信中的一个问题,因为下一个数据包可能很长时间不会到达。

因此,最终,您需要让读者了解如何将数据划分为块,这意味着您需要客户端进行解码,这就是为什么基本流类尚未以您的方式传递数据的原因正在努力实施。

那么,通过添加这个中级课程,您将获得什么?最好的情况是,它会给您的 I/O 增加一层额外的复杂性和开销(让我们面对现实,您试图从客户端代码中抽象出的只是几行代码)。最坏的情况是,它将无法按照您的要求将数据分解为块,因此根本没有用处。

当心"This scenario may be highly unlikely to occur in practice":当传输大量数据时,您可以放心,即使是“极不可能”的事件也会以相当大的规律性发生 - 当然通常足以让您不能假设它们永远不会发生。

[编辑-添加]

如果您不寻求概括您的解决方案,那么您可以向轻松处理问题的类添加逻辑。

两种可能的解决方案是:

  • 如果您知道将输出给您的控制台行的最大限制,您可以简单地使用足够大的缓冲区,以保证您的边缘情况永远不会发生。(例如,CreateProcess 命令限制为 32k,cmd.exe 将命令限制为 8k。您可能会发现类似的限制适用于您接收的数据“块”)

  • 如果您的块始终是行(以换行符终止的文本块),则只需检查缓冲区中的最后一个字符是否看起来像终止符(0x0a 或 0x0d)。如果没有,则还有更多数据需要读取。