Bar*_*lly 11 .net c# redirect pipe stream
当应用程序将其输出通过管道传送到另一个程序时,.NET Console类及其默认TextWriter实现(Console.Out在例如,可以隐含地显示Console.WriteLine())不会发出任何错误信号,而另一个程序在应用程序完成之前终止或关闭管道.这意味着应用程序可能会运行超过必要的时间,将输出写入黑洞.
如何检测重定向管道另一端的关闭?
更详细的解释如下:
以下是一对演示此问题的示例程序.Produce很慢地打印很多整数,以模拟计算的效果:
using System;
class Produce
{
static void Main()
{
for (int i = 0; i < 10000; ++i)
{
System.Threading.Thread.Sleep(100); // added for effect
Console.WriteLine(i);
}
}
}
Run Code Online (Sandbox Code Playgroud)
Consume 只读取前10行输入然后退出:
using System;
class Consume
{
static void Main()
{
for (int i = 0; i < 10; ++i)
Console.ReadLine();
}
}
Run Code Online (Sandbox Code Playgroud)
如果编译了这两个程序,并将第一个管道输出到第二个,就像这样:
Produce | Consume
Run Code Online (Sandbox Code Playgroud)
...可以观察到,Produce在Consume终止后长时间保持运行.
实际上,我的Consume程序是Unix风格的head,我的Produce程序打印的数据计算成本很高.我想在管道的另一端关闭连接时终止输出.
我怎么能在.NET中这样做?
(我知道一个显而易见的选择是传递一个命令行参数来限制输出,这确实是我现在正在做的事情,但我仍然想知道如何做到这一点,因为我希望能够做更多关于何时终止阅读的可配置判断;例如grep之前通过管道head.)
更新:看起来非常像System.IO.__ConsoleStream.NET中的实现是硬编码忽略错误0x6D(ERROR_BROKEN_PIPE)和0xE8(ERROR_NO_DATA).这可能意味着我需要重新实现控制台流.叹...)
要解决这个问题,我必须在Win32文件句柄上编写自己的基本流实现.这并不是非常困难,因为我不需要实现异步支持,缓冲或搜索.
遗憾的是,需要使用不安全的代码,但对于将在本地运行且完全信任的控制台应用程序而言,这通常不是问题.
这是核心流:
class HandleStream : Stream
{
SafeHandle _handle;
FileAccess _access;
bool _eof;
public HandleStream(SafeHandle handle, FileAccess access)
{
_handle = handle;
_access = access;
}
public override bool CanRead
{
get { return (_access & FileAccess.Read) != 0; }
}
public override bool CanSeek
{
get { return false; }
}
public override bool CanWrite
{
get { return (_access & FileAccess.Write) != 0; }
}
public override void Flush()
{
// use external buffering if you need it.
}
public override long Length
{
get { throw new NotSupportedException(); }
}
public override long Position
{
get { throw new NotSupportedException(); }
set { throw new NotSupportedException(); }
}
static void CheckRange(byte[] buffer, int offset, int count)
{
if (offset < 0 || count < 0 || (offset + count) < 0
|| (offset + count) > buffer.Length)
throw new ArgumentOutOfRangeException();
}
public bool EndOfStream
{
get { return _eof; }
}
public override int Read(byte[] buffer, int offset, int count)
{
CheckRange(buffer, offset, count);
int result = ReadFileNative(_handle, buffer, offset, count);
_eof |= result == 0;
return result;
}
public override void Write(byte[] buffer, int offset, int count)
{
int notUsed;
Write(buffer, offset, count, out notUsed);
}
public void Write(byte[] buffer, int offset, int count, out int written)
{
CheckRange(buffer, offset, count);
int result = WriteFileNative(_handle, buffer, offset, count);
_eof |= result == 0;
written = result;
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
[return: MarshalAs(UnmanagedType.Bool)]
[DllImport("kernel32", SetLastError=true)]
static extern unsafe bool ReadFile(
SafeHandle hFile, byte* lpBuffer, int nNumberOfBytesToRead,
out int lpNumberOfBytesRead, IntPtr lpOverlapped);
[return: MarshalAs(UnmanagedType.Bool)]
[DllImport("kernel32.dll", SetLastError=true)]
static extern unsafe bool WriteFile(
SafeHandle hFile, byte* lpBuffer, int nNumberOfBytesToWrite,
out int lpNumberOfBytesWritten, IntPtr lpOverlapped);
unsafe static int WriteFileNative(SafeHandle hFile, byte[] buffer, int offset, int count)
{
if (buffer.Length == 0)
return 0;
fixed (byte* bufAddr = &buffer[0])
{
int result;
if (!WriteFile(hFile, bufAddr + offset, count, out result, IntPtr.Zero))
{
// Using Win32Exception just to get message resource from OS.
Win32Exception ex = new Win32Exception(Marshal.GetLastWin32Error());
int hr = ex.NativeErrorCode | unchecked((int) 0x80000000);
throw new IOException(ex.Message, hr);
}
return result;
}
}
unsafe static int ReadFileNative(SafeHandle hFile, byte[] buffer, int offset, int count)
{
if (buffer.Length == 0)
return 0;
fixed (byte* bufAddr = &buffer[0])
{
int result;
if (!ReadFile(hFile, bufAddr + offset, count, out result, IntPtr.Zero))
{
Win32Exception ex = new Win32Exception(Marshal.GetLastWin32Error());
int hr = ex.NativeErrorCode | unchecked((int) 0x80000000);
throw new IOException(ex.Message, hr);
}
return result;
}
}
}
Run Code Online (Sandbox Code Playgroud)
BufferedStream如果需要,可以将它包裹起来进行缓冲,但是对于控制台输出,TextWriter无论如何都将进行字符级缓冲,并且只在新行上刷新.
流滥用Win32Exception以提取错误消息,而不是调用FormatMessage自身.
在这个流的基础上,我能够为控制台I/O编写一个简单的包装器:
static class ConsoleStreams
{
enum StdHandle
{
Input = -10,
Output = -11,
Error = -12,
}
[DllImport("kernel32.dll", SetLastError = true)]
static extern IntPtr GetStdHandle(int nStdHandle);
static SafeHandle GetStdHandle(StdHandle h)
{
return new SafeFileHandle(GetStdHandle((int) h), true);
}
public static HandleStream OpenStandardInput()
{
return new HandleStream(GetStdHandle(StdHandle.Input), FileAccess.Read);
}
public static HandleStream OpenStandardOutput()
{
return new HandleStream(GetStdHandle(StdHandle.Output), FileAccess.Write);
}
public static HandleStream OpenStandardError()
{
return new HandleStream(GetStdHandle(StdHandle.Error), FileAccess.Write);
}
static TextReader _in;
static StreamWriter _out;
static StreamWriter _error;
public static TextWriter Out
{
get
{
if (_out == null)
{
_out = new StreamWriter(OpenStandardOutput());
_out.AutoFlush = true;
}
return _out;
}
}
public static TextWriter Error
{
get
{
if (_error == null)
{
_error = new StreamWriter(OpenStandardError());
_error.AutoFlush = true;
}
return _error;
}
}
public static TextReader In
{
get
{
if (_in == null)
_in = new StreamReader(OpenStandardInput());
return _in;
}
}
}
Run Code Online (Sandbox Code Playgroud)
最后的结果是在管道另一端终止连接后写入控制台输出,导致消息出现一个很好的异常:
管道正在关闭
通过捕捉和忽略IOException最外层,看起来我很高兴.