从Socket读取连续消息

Ahm*_*ood 3 c# sockets

我的目标是从套接字读取消息,其中每个消息用ETX字符分隔.它是一个高频市场数据馈送,所以我不认为逐字节方法是有道理的,也是完整消息的大小是未知的.

有没有办法可以通过使用NetworkStream类来阅读此消息?我也尝试过Socket为此目的使用类,但不是从socket中逐个读取消息,而是从socket读取所有消息,随着系统速度变慢,这成为一个问题.

Mar*_*ell 5

开始了; 这是从诸如或之类的源读取标记分隔的消息列表的基本过程.棘手的一点是跟踪您在传入缓冲区中使用的内容,以及来自早期缓冲区的任何未使用数据的积压.请注意,在这之间更改此代码并且实质上是更改为- 除此之外,方法将是相同的.SocketStreamSocketStreamReceiveRead

以下应该基本上做你需要的.您可以使用ReadNext()API直到获得null(表示流的结尾),或者您可以使用ReadAll()它来为您提供IEnumerable<string>序列.编码和缓冲区大小可供您通过构造函数进行调整,但默认为合理的值.

foreach (var s in reader.ReadAll())
    Console.WriteLine(s);
Run Code Online (Sandbox Code Playgroud)

码:

class EtxReader : IDisposable
{
    public IEnumerable<string> ReadAll()
    {
        string s;
        while ((s = ReadNext()) != null) yield return s;
    }
    public void Dispose()
    {
        if (socket != null) socket.Dispose();
        socket = null;
        if (backlog != null) backlog.Dispose();
        backlog = null;
        buffer = null;
        encoding = null;
    }
    public EtxReader(Socket socket, Encoding encoding = null, int bufferSize = 4096)
    {
        this.socket = socket;
        this.encoding = encoding ?? Encoding.UTF8;
        this.buffer = new byte[bufferSize];
    }
    private Encoding encoding;
    private Socket socket;
    int index, count;
    byte[] buffer;
    private bool ReadMore()
    {
        index = count = 0;
        int bytes = socket.Receive(buffer);
        if (bytes > 0)
        {
            count = bytes;
            return true;
        }
        return false;
    }
    public const byte ETX = 3;
    private MemoryStream backlog = new MemoryStream();
    public string ReadNext()
    {
        string s;
        if (count == 0)
        {
            if (!ReadMore()) return null;
        }
        // at this point, we expect there to be *some* data;
        // this may or may not include the ETX terminator
        var etxIndex = Array.IndexOf(buffer, ETX, index);
        if (etxIndex >= 0)
        {
            // found another message in the existing buffer
            int len = etxIndex - index;
            s = encoding.GetString(buffer, index, len);
            index = etxIndex + 1;
            count -= (len + 1);
            return s;
        }
        // no ETX in the buffer, so we'll need to fetch more data;
        // buffer the unconsumed data that we have
        backlog.SetLength(0);
        backlog.Write(buffer, index, count);

        bool haveEtx;
        do
        {
            if (!ReadMore())
            {
                // we had unused data; this must signal an error
                throw new EndOfStreamException();
            }
            etxIndex = Array.IndexOf(buffer, ETX, index);
            haveEtx = etxIndex >= 0;
            if (!haveEtx)
            {
                // keep buffering
                backlog.Write(buffer, index, count);
            }

        } while (!haveEtx);

        // now we have some data in the backlog, and the ETX in the buffer;
        // for convenience, copy the rest of the next message into
        // the backlog
        backlog.Write(buffer, 0, etxIndex);
        s = encoding.GetString(backlog.GetBuffer(), 0, (int)backlog.Length);
        index = etxIndex + 1;
        count -= (etxIndex + 1);
        return s;
    }
}
Run Code Online (Sandbox Code Playgroud)