如何使用 System.IO.Pipelines 包创建响应 TCP 侦听器?

Ola*_*son 8 c# kestrel .net-core system.io.pipelines

我想使用 Kestrel 和 System.IO.Pipelines 包创建一个 TCP 侦听器。我收到的消息将始终是HL7 消息。一个示例消息可能是

MSH|^~&|MegaReg|XYZHospC|SuperOE|XYZImgCtr|20060529090131-0500||ADT^A01^ADT_A01|01052901|P|2.5 EVN|||200605290901|2UA5^820909|4UA|20909|2UA|5 ||KLEINSAMPLE^BARRY^Q^JR||19620910|M||2028-9^^HL70005^RA99113^^XYZ|260 GOODWIN CREST Drive^^BIRMINGHAM^AL^35209^^M~NICKELL'S PICKLESAVE00TH1000W^100 ^伯明翰^AL^35200^^O||||||0105I30001^^^99DEF^AN PV1||I|W^389^1^UABH^^^^3||||12345^摩根^REX^ J^^^MD^0010^UAMC^L||67890^GRAINGER^LUCY^X^^^MD^0010^UAMC^L|MED|||||A0||13579^POTTER^SHERMAN^T^^^ MD^0010^UAMC^L||||||||||||||||||||||||200605290900 OBX|1|NM|^体高||1.80|m^米^ISO+|||||F OBX|2|NM|^体重||79|kg^公斤^ISO+|||||F AL1|1||^阿司匹林DG1|1||786.50^胸痛,未指定^I9|||A

唯一需要注意的重要事项是每个传入的 HL7 消息都以垂直制表符开头,因此您知道消息的开始位置。每个 HL7 消息都包含多个段,所以我想我必须遍历每个段。处理请求后,我想发回 HL7 消息作为响应。首先我想出了这个

internal class HL7Listener : ConnectionHandler
{
    public override async Task OnConnectedAsync(ConnectionContext connection)
    {
        IDuplexPipe pipe = connection.Transport;

        await FillPipe(pipe.Output);
        await ReadPipe(pipe.Input);
    }

    private async Task FillPipe(PipeWriter pipeWriter)
    {
        const int minimumBufferSize = 512;

        while (true)
        {
            Memory<byte> memory = pipeWriter.GetMemory(minimumBufferSize);
            
            try
            {
                int bytesRead = 32; // not sure what to do here
                
                if (bytesRead == 0)
                {
                    break;
                }
                
                pipeWriter.Advance(bytesRead);
            }
            catch (Exception ex)
            {
                // ... something failed ...

                break;
            }

            FlushResult result = await pipeWriter.FlushAsync();

            if (result.IsCompleted)
            {
                break;
            }
        }

        pipeWriter.Complete();
    }

    private async Task ReadPipe(PipeReader pipeReader)
    {
        while (true)
        {
            ReadResult result = await pipeReader.ReadAsync();

            ReadOnlySequence<byte> buffer = result.Buffer;
            SequencePosition? position;

            do
            {
                position = buffer.PositionOf((byte)'\v');

                if (position != null)
                {
                    ReadOnlySequence<byte> line = buffer.Slice(0, position.Value);

                    // ... Process the line ...

                    buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
                }
            }
            while (position != null);

            pipeReader.AdvanceTo(buffer.Start, buffer.End);

            if (result.IsCompleted)
            {
                break;
            }
        }

        pipeReader.Complete();
    }
}
Run Code Online (Sandbox Code Playgroud)

不幸的是,我在一些事情上挣扎:

  • 部分int bytesRead = 32;,我如何知道已读取了多少字节?或者如何使用 writer 实例读取?
  • 目前调试器不会在// ... Process the line .... 基本上我必须提取整个 HL7 消息,以便我可以使用我的 HL7 解析器来转换消息字符串。
  • 我必须在哪里回应?打电话后await ReadPipe(pipe.Input);?通过使用await connection.Transport.Output.WriteAsync(/* the HL7 message to send back */);?

Wil*_*ill 6

你看过 David Fowler 的TcpEcho例子吗?我会说这是相当规范的,因为他是发布 devblogs System.IO.Pipelines 公告的人。

他的例子处理原始套接字。我已经将它改编为 ConnectionHandler API 和 HL7 消息(但是,我对 HL7 知之甚少):

internal class HL7Listener : ConnectionHandler
{
    public override async Task OnConnectedAsync(ConnectionContext connection)
    {
        while (true)
        {
            var result = await connection.Transport.Input.ReadAsync();
            var buffer = result.Buffer;

            while (TryReadMessage(ref buffer, out ReadOnlySequence<byte> hl7Message))
            {
                // Process the line.
                var response = ProcessMessage(hl7Message);
                await connection.Transport.Output.WriteAsync(response);
            }

            if (result.IsCompleted)
            {
                break;
            }

            connection.Transport.Input.AdvanceTo(buffer.Start, buffer.End);
        }
    }

    public static bool TryReadMessage(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> hl7Message)
    {
        var endOfMessage = buffer.PositionOf((byte)0x1C);

        if (endOfMessage == null || !TryMatchNextByte(ref buffer, endOfMessage.Value, 0x0D, out var lastBytePosition))
        {
            hl7Message = default;
            return false;
        }

        var messageBounds = buffer.GetPosition(1, lastBytePosition.Value); // Slice() is exclusive on the upper bound
        hl7Message = buffer.Slice(0, messageBounds);
        buffer = buffer.Slice(messageBounds); // remove message from buffer
        return true;
    }

    /// <summary>
    /// Does the next byte after currentPosition match the provided value?
    /// </summary>
    private static bool TryMatchNextByte(ref ReadOnlySequence<byte> buffer, SequencePosition currentPosition, byte value, out SequencePosition? nextPosition)
    {
        nextPosition = buffer.Slice(currentPosition).PositionOf(value);
        if(nextPosition == null || !nextPosition.Value.Equals(buffer.GetPosition(1, currentPosition)))
        {
            nextPosition = null;
            return false;
        }
        return true;
    }

    private ReadOnlyMemory<byte> ProcessMessage(ReadOnlySequence<byte> hl7Message)
    {
        var incomingMessage = Encoding.UTF8.GetString(hl7Message.ToArray());
        // do something with the message and generate your response. I'm using UTF8 here
        // but not sure if that's valid for HL7.
        return Encoding.UTF8.GetBytes("Response message: OK!");
    }
}
Run Code Online (Sandbox Code Playgroud)

更新:添加了有关 HL7 消息结构的最新信息。