C# - 从TCP连接读取时丢弃某些数据包

Tai*_*ary 4 .net c# sockets tcp

我正在使用C#通过TCP连接读取连续的数据流(ITCH数据,即外汇价格),但在运行应用程序较长时间后,应用程序有时会丢弃数据包并且信息丢失.

下面是我用来读取数据的代码片段:

private void ReaderThreadStarter()
    {
        StreamReader streamReader = new StreamReader(this._networkStream);
        while (!_stopping)
        {
            try
            {
                if (this._networkStream.DataAvailable)
                {
                    while ((line = streamReader.ReadLine()) != null)
                    {
                        lock (_queue.ConcurrentQueue)
                        {
                            byte[] data = System.Text.Encoding.ASCII.GetBytes(line);
                            Log.Info("Data Added in Queue: " + Encoding.ASCII.GetString(data, 0, data.Length));
                            _queue.WriteToQueue(data);
                        }
                    }
                }
            }
            catch (Exception exception)
            {
                Log.Error(exception);
            }
            finally
            {
                SetStopped();
            }
        }
    }
Run Code Online (Sandbox Code Playgroud)

上面的代码所做的是它从TCP连接读取数据并将其写入并发队列,然后另一个线程使用队列中的数据进行处理.所以基本上只是生产者 - 消费者问题.

生产者 - 消费者部分似乎工作正常,因为我在队列中写的内容被消费者使用.

一种选择是使用嗅探器并确认应用程序正在丢弃数据包,但我正在一个我无法使用嗅探器的环境中工作.我认为有丢包的原因是因为对于我的一些外汇订单,我从未得到取消和我的价格下降,数据提供商告诉我,价格在那里是正确的.

我也记录了我在保存到队列之前从TCP端口读取的数据,因此从日志中我假设数据在从连接读取时丢失.

有人可以告诉我这里我可能做错了什么或者丢弃数据包的原因是什么.

以下是我的消费者代码的代码片段:

public void ReadQueue()
    {
        try
        {
            while (true)
            {
                {
                    byte[] data = _queue.ReadFromQueue();

                    Parse(data);
                }
            }
        }
        catch (Exception exception)
        {
            Log.Error(exception);
        }
    }

public byte[] ReadFromQueue()
    {
        try
        {
            byte[] data;
            lock (this) // Enter synchronization block
            {
                ConcurrentQueue.TryDequeue(out data);
            } 
            return data;
        }
        catch (Exception exception)
        {
            Log.Error(exception);
            return null;
        }
    }
Run Code Online (Sandbox Code Playgroud)

Mar*_*ell 7

有两件事让我眼前一亮; 首先是你的使用DataAvailable.使用它几乎不是正确的事情.这个有用的主要时间是在同步和异步方法之间进行选择.它并不能告诉你更多的数据是否是入站,例如,并且会给人一种"假阳性"(因为你正在使用的东西,这并不意味着)造成你的循环过早退出.DataAvailable只告诉你数据当前是否在本地缓冲区中可用,这就是它告诉你的全部内容.

我感兴趣的第二件事data是二元还是文字.您正在使用的事实StreamReader建议文本,但那么......为什么要重新编码它byte[]?如果它任意二进制文件,那么你不能将它作为文本处理 - 这是行不通的.当你通过它获取它时,StreamReader已经破坏了内容.如果它是基于文本的协议,请不要重新编码它:使用字符串队列(或类似的).

在一个不相关的注释...如果队列真正并发,您可能不需要同步访问.