为什么TcpClient这么慢而且CPU很饿?

4 .net c# sockets proxy tcpclient

与我的其他问题相关,除了现在我尝试异步希望它能解决问题.它没有.

我正在尝试创建一个简单的SOCKS5服务器.我将浏览器(firefox)设置为使用此程序作为SOCKS5.这个想法是一个程序连接到代理服务器,为它提供所需的信息,服务器只是简单地从一个连接读取/写入数据到另一个连接.这个只是这样做,不记录也不过滤任何东西.这很简单,但由于CPU问题以及在点击几页后连接到站点需要几秒钟的事实使其完全无法使用.这怎么会占用这么多CPU?为什么连接到网站需要很长时间?异步和同步都受此影响

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Net.Sockets;
using System.Timers;
using System.IO;
using System.Net;
using System.Threading;

namespace ProxyTest
{
    class Program
    {
        static ManualResetEvent tcpClientConnected =new ManualResetEvent(false);
        static void Main(string[] args)
        {
            var s2 = new TcpListener(9998);
            s2.Start();
            Task.Run(() =>
            {
                while (true)
                {
                    tcpClientConnected.Reset();
                    s2.BeginAcceptTcpClient(Blah, s2);
                    tcpClientConnected.WaitOne();
                }
            });
            while (true)
                System.Threading.Thread.Sleep(10000000);
        }

        static void Blah(IAsyncResult ar)
        {
            try
            {
                Console.WriteLine("Connection");
                TcpListener listener = (TcpListener)ar.AsyncState;
                using (var socketin = listener.EndAcceptTcpClient(ar))
                {
                    tcpClientConnected.Set();
                    var ns1 = socketin.GetStream();
                    var r1 = new BinaryReader(ns1);
                    var w1 = new BinaryWriter(ns1);

                    if (false)
                    {
                        var s3 = new TcpClient();
                        s3.Connect("127.0.0.1", 9150);
                        var ns3 = s3.GetStream();
                        var r3 = new BinaryReader(ns3);
                        var w3 = new BinaryWriter(ns3);
                        while (true)
                        {
                            while (ns1.DataAvailable)
                            {
                                var b = ns1.ReadByte();
                                w3.Write((byte)b);
                                //Console.WriteLine("1: {0}", b);
                            }
                            while (ns3.DataAvailable)
                            {
                                var b = ns3.ReadByte();
                                w1.Write((byte)b);
                                Console.WriteLine("2: {0}", b);
                            }
                        }
                    }

                    {
                        if (!(r1.ReadByte() == 5 && r1.ReadByte() == 1))
                            return;
                        var c = r1.ReadByte();
                        for (int i = 0; i < c; ++i)
                            r1.ReadByte();
                        w1.Write((byte)5);
                        w1.Write((byte)0);
                    }
                    {
                        if (!(r1.ReadByte() == 5 && r1.ReadByte() == 1))
                            return;
                        if (r1.ReadByte() != 0)
                            return;
                    }
                    byte[] ipAddr = null;
                    string hostname = null;
                    var type = r1.ReadByte();
                    switch (type)
                    {
                        case 1:
                            ipAddr = r1.ReadBytes(4);
                            break;
                        case 3:
                            hostname = Encoding.ASCII.GetString(r1.ReadBytes(r1.ReadByte()));
                            break;
                        case 4:
                            throw new Exception();
                    }
                    var nhport = r1.ReadInt16();
                    var port = IPAddress.NetworkToHostOrder(nhport);

                    var socketout = new TcpClient();
                    if (hostname != null)
                        socketout.Connect(hostname, port);
                    else
                        socketout.Connect(new IPAddress(ipAddr), port);

                    w1.Write((byte)5);
                    w1.Write((byte)0);
                    w1.Write((byte)0);
                    w1.Write(type);
                    switch (type)
                    {
                        case 1:
                            w1.Write(ipAddr);
                            break;
                        case 2:
                            w1.Write((byte)hostname.Length);
                            w1.Write(Encoding.ASCII.GetBytes(hostname), 0, hostname.Length);
                            break;
                    }
                    w1.Write(nhport);

                    var buf1 = new byte[4096];
                    var buf2 = new byte[4096];
                    var ns2 = socketout.GetStream();
                    var r2 = new BinaryReader(ns2);
                    var w2 = new BinaryWriter(ns2);
                    Task.Run(() =>
                    {
                        var re = new ManualResetEvent(false);
                        while (true)
                        {
                            re.Reset();
                            ns1.BeginRead(buf1, 0, buf1.Length, ReadCallback, new A() { buf = buf1, thisSocket = socketin, otherSocket = socketout, thisStream = ns1, otherStream = ns2, re=re });
                            re.WaitOne();
                        }
                    });
                    Task.Run(() =>
                    {
                        var re = new ManualResetEvent(false);
                        while (true)
                        {
                            re.Reset();
                            ns2.BeginRead(buf2, 0, buf2.Length, ReadCallback, new A() { buf = buf2, thisSocket = socketout, otherSocket = socketin, thisStream = ns2, otherStream = ns1, re = re });
                            re.WaitOne();
                        }
                    });
                    while (true)
                    {
                        if (socketin.Connected == false)
                            return;
                        Thread.Sleep(100);
                    }
                }
            }
            catch { }
        }
        class A { public byte[] buf; public TcpClient thisSocket, otherSocket; public NetworkStream thisStream, otherStream; public ManualResetEvent re;};
        static void ReadCallback(IAsyncResult ar)
        {
            try
            {
                var a = (A)ar.AsyncState;
                var ns1 = a.thisStream;
                var len = ns1.EndRead(ar);
                a.otherStream.Write(a.buf, 0, len);
                a.re.Set();
            }
            catch
            {
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

fel*_*ega 5

警告:因为我没有使用4.5,所以我必须稍微调整一下.

Task.Run() - > new Thread().Start()

您使用的线程太多了.只是尝试在stackoverflow中加载这个问题导致30多个线程产生,这再现了使用Task.Run()看到的行为.

将代码缩减为每个连接的单个线程,我的CPU使用率徘徊在0%左右.一切都很快加载.

using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.ComponentModel;
using System.Data;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Net.Sockets;
using System.Timers;
using System.IO;
using System.Net;
using System.Threading;

namespace SOCKS5
{
    static class Program
    {
        static void Main()
        {
            var s2 = new TcpListener(9998);
            s2.Start();

            while (true)
            {
                if (s2.Pending())
                {
                    Thread test = new Thread(() =>
                    {
                        using (TcpClient client = s2.AcceptTcpClient())
                        {
                            Blah(client);
                        }
                    });

                    test.Start();
                }

                Thread.Sleep(10);
            }
        }

        static void Blah(TcpClient listener)
        {
            try
            {
                Console.WriteLine("Connection");
                //TcpListener listener = (TcpListener)ar.AsyncState;


                //tcpClientConnected.Set();
                var ns1 = listener.GetStream();
                var r1 = new BinaryReader(ns1);
                var w1 = new BinaryWriter(ns1);

                if (false)
                {
                    var s3 = new TcpClient();
                    s3.Connect("127.0.0.1", 9150);
                    var ns3 = s3.GetStream();
                    var r3 = new BinaryReader(ns3);
                    var w3 = new BinaryWriter(ns3);
                    while (true)
                    {
                        while (ns1.DataAvailable)
                        {
                            var b = ns1.ReadByte();
                            w3.Write((byte)b);
                            //Console.WriteLine("1: {0}", b);
                        }
                        while (ns3.DataAvailable)
                        {
                            var b = ns3.ReadByte();
                            w1.Write((byte)b);
                            Console.WriteLine("2: {0}", b);
                        }
                    }
                }

                {
                    if (!(r1.ReadByte() == 5 && r1.ReadByte() == 1))
                        return;
                    var c = r1.ReadByte();
                    for (int i = 0; i < c; ++i)
                        r1.ReadByte();
                    w1.Write((byte)5);
                    w1.Write((byte)0);
                }
                {
                    if (!(r1.ReadByte() == 5 && r1.ReadByte() == 1))
                        return;
                    if (r1.ReadByte() != 0)
                        return;
                }
                byte[] ipAddr = null;
                string hostname = null;
                var type = r1.ReadByte();
                switch (type)
                {
                    case 1:
                        ipAddr = r1.ReadBytes(4);
                        break;
                    case 3:
                        hostname = Encoding.ASCII.GetString(r1.ReadBytes(r1.ReadByte()));
                        break;
                    case 4:
                        throw new Exception();
                }
                var nhport = r1.ReadInt16();
                var port = IPAddress.NetworkToHostOrder(nhport);

                var socketout = new TcpClient();
                if (hostname != null)
                    socketout.Connect(hostname, port);
                else
                    socketout.Connect(new IPAddress(ipAddr), port);

                w1.Write((byte)5);
                w1.Write((byte)0);
                w1.Write((byte)0);
                w1.Write(type);
                switch (type)
                {
                    case 1:
                        w1.Write(ipAddr);
                        break;
                    case 2:
                        w1.Write((byte)hostname.Length);
                        w1.Write(Encoding.ASCII.GetBytes(hostname), 0, hostname.Length);
                        break;
                }
                w1.Write(nhport);

                var buf1 = new byte[4096];
                var buf2 = new byte[4096];
                var ns2 = socketout.GetStream();

                DateTime last = DateTime.Now;

                while ((DateTime.Now - last).TotalMinutes < 5.0)
                {
                    if (ns1.DataAvailable)
                    {
                        int size = ns1.Read(buf1, 0, buf1.Length);
                        ns2.Write(buf1, 0, size);
                        last = DateTime.Now;
                    }
                    if (ns2.DataAvailable)
                    {
                        int size = ns2.Read(buf2, 0, buf2.Length);
                        ns1.Write(buf2, 0, size);
                        last = DateTime.Now;
                    }

                    Thread.Sleep(10);
                }
            }
            catch { }
            finally
            {
                try
                {
                    listener.Close();
                }
                catch (Exception) { }
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

编辑:

这最终变得有点好玩.

在将Firefox流量路由这几个小时之后,进行了一些观察.

从未注意到常规模式以确定何时关闭连接.让线程在空闲5分钟后终止(没有rx/tx)使线程计数相当低.这是一个非常安全的界限,允许gmail聊天等服务继续运行.

出于某种原因,该程序偶尔会收不到浏览器的请求,这会报告超时.没有通知程序中遗漏的请求,没有.只有在浏览stackoverflow时才会注意到 仍然没有想到那一个.

  • @felega - 请注意 Task.Run() *不* 等同于 new Thread().Start()。Task.Run 只在某个线程池上调度任务,不生成和运行线程。因此,虽然 30 Thread().Start() 将导致 30 个线程,但 30 Task.Run() 将导致系统在当时认为合适的数量 (2认同)

Pet*_*art 5

这里有一些事情发生了!

异步调用都称为同步样式.就像在,启动操作的线程调用WaitOne - 这基本上只是使它等同于同步调用,没有区别.

睡眠循环很糟糕. sleep(1)循环将快速响应但使用一些CPU,sleep(1000)循环将缓慢响应但使用较少的CPU.在睡眠循环中使用十几个线程不会占用太多CPU,但如果线程数量不断增加,CPU使用率将变得非常重要.最好的方法是使用异步调用而不是轮询.

很多任务都在运行循环.如果没有保证退出路径,这些会导致线程数量猛增.

如果要将数据从套接字A转发到套接字B,则需要在任一套接字关闭时执行操作:停止转发,确保挂起的写入完成并关闭套接字.

如果关闭转发任务,则当前实现不能正确确保关闭转发任务,并且如果任务在设置事件之前获得异常,则启动任务然后阻止手动重置事件的技术可能会失败.两种情况都会使任务无限制地运行.

检查Socket.Connected似乎是一件显而易见的事情,但实际上这只是最后一次IO操作是否遇到断开连接的缓存.我更喜欢采取"零回收",这是你第一次断开连接的通知.

我通过NuGet使用PowerThreading来敲除原始同步例程的快速异步版本(这是在框架4.5之前执行异步例程的一种方式).这适用TcpListener于零CPU使用和非常少的线程.

这可以使用async/await在vanilla c#中完成...我只是不知道怎么样:)

using System;
using System.Collections.Generic;
using System.Text;

namespace AeProxy
{
    using System.IO;
    using System.Net;
    using System.Net.Sockets;
    using System.Threading;
    // Need to install Wintellect.Threading via NuGet for this:
    using Wintellect.Threading.AsyncProgModel;

    class Program
    {
        static void Main(string[] args)
        {
            var ae = new AsyncEnumerator() {SyncContext = null};
            var mainOp = ae.BeginExecute(ListenerFiber(ae), null, null);
            // block until main server is finished
            ae.EndExecute(mainOp);
        }

        static IEnumerator<int> ListenerFiber(AsyncEnumerator ae)
        {
            var listeningServer = new TcpListener(IPAddress.Loopback, 9998);
            listeningServer.Start();
            while (!ae.IsCanceled())
            {
                listeningServer.BeginAcceptTcpClient(ae.End(0, listeningServer.EndAcceptTcpClient), null);
                yield return 1;
                if (ae.IsCanceled()) yield break;
                var clientSocket = listeningServer.EndAcceptTcpClient(ae.DequeueAsyncResult());
                var clientAe = new AsyncEnumerator() { SyncContext = null };
                clientAe.BeginExecute(
                    ClientFiber(clientAe, clientSocket),
                    ar =>
                        {
                            try
                            {
                                clientAe.EndExecute(ar);
                            }
                            catch { }
                    }, null);
            }
        }

        static long clients = 0;

        static IEnumerator<int> ClientFiber(AsyncEnumerator ae, TcpClient clientSocket)
        {
            Console.WriteLine("ClientFibers ++{0}", Interlocked.Increment(ref clients));
            try
            {
                // original code to do handshaking and connect to remote host
                var ns1 = clientSocket.GetStream();
                var r1 = new BinaryReader(ns1);
                var w1 = new BinaryWriter(ns1);

                if (!(r1.ReadByte() == 5 && r1.ReadByte() == 1)) yield break;
                var c = r1.ReadByte();
                for (int i = 0; i < c; ++i) r1.ReadByte();
                w1.Write((byte)5);
                w1.Write((byte)0);

                if (!(r1.ReadByte() == 5 && r1.ReadByte() == 1)) yield break;
                if (r1.ReadByte() != 0) yield break;

                byte[] ipAddr = null;
                string hostname = null;
                var type = r1.ReadByte();
                switch (type)
                {
                    case 1:
                        ipAddr = r1.ReadBytes(4);
                        break;
                    case 3:
                        hostname = Encoding.ASCII.GetString(r1.ReadBytes(r1.ReadByte()));
                        break;
                    case 4:
                        throw new Exception();
                }
                var nhport = r1.ReadInt16();
                var port = IPAddress.NetworkToHostOrder(nhport);
                var socketout = new TcpClient();
                if (hostname != null) socketout.Connect(hostname, port);
                else socketout.Connect(new IPAddress(ipAddr), port);
                w1.Write((byte)5);
                w1.Write((byte)0);
                w1.Write((byte)0);
                w1.Write(type);
                switch (type)
                {
                    case 1:
                        w1.Write(ipAddr);
                        break;
                    case 3:
                        w1.Write((byte)hostname.Length);
                        w1.Write(Encoding.ASCII.GetBytes(hostname), 0, hostname.Length);
                        break;
                }
                w1.Write(nhport);
                using (var ns2 = socketout.GetStream())
                {
                    var forwardAe = new AsyncEnumerator() { SyncContext = null };
                    forwardAe.BeginExecute(
                        ForwardingFiber(forwardAe, ns1, ns2), ae.EndVoid(0, forwardAe.EndExecute), null);
                    yield return 1;
                    if (ae.IsCanceled()) yield break;
                    forwardAe.EndExecute(ae.DequeueAsyncResult());
                }
            }
            finally
            {
                Console.WriteLine("ClientFibers --{0}", Interlocked.Decrement(ref clients));
            }
        }

        private enum Operation { OutboundWrite, OutboundRead, InboundRead, InboundWrite } 

        const int bufsize = 4096;

        static IEnumerator<int> ForwardingFiber(AsyncEnumerator ae, NetworkStream inputStream, NetworkStream outputStream)
        {
            while (!ae.IsCanceled())
            {
                byte[] outputRead = new byte[bufsize], outputWrite = new byte[bufsize];
                byte[] inputRead = new byte[bufsize], inputWrite = new byte[bufsize];
                // start off output and input reads.
                // NB ObjectDisposedExceptions can be raised here when a socket is closed while an async read is in progress.
                outputStream.BeginRead(outputRead, 0, bufsize, ae.End(1, ar => outputStream.EndRead(ar)), Operation.OutboundRead);
                inputStream.BeginRead(inputRead, 0, bufsize, ae.End(1, ar => inputStream.EndRead(ar)), Operation.InboundRead);
                var pendingops = 2;
                while (!ae.IsCanceled())
                {
                    // wait for the next operation to complete, the state object passed to each async
                    // call can be used to find out what completed.
                    if (pendingops == 0) yield break;
                    yield return 1;
                    if (!ae.IsCanceled())
                    {
                        int byteCount;
                        var latestEvent = ae.DequeueAsyncResult();
                        var currentOp = (Operation)latestEvent.AsyncState;
                        if (currentOp == Operation.InboundRead)
                        {
                            byteCount = inputStream.EndRead(latestEvent);
                            if (byteCount == 0)
                            {
                                pendingops--;
                                outputStream.Close();
                                continue;
                            }
                            Array.Copy(inputRead, outputWrite, byteCount);
                            outputStream.BeginWrite(outputWrite, 0, byteCount, ae.EndVoid(1, outputStream.EndWrite), Operation.OutboundWrite);
                            inputStream.BeginRead(inputRead, 0, bufsize, ae.End(1, ar => inputStream.EndRead(ar)), Operation.InboundRead);
                        }
                        else if (currentOp == Operation.OutboundRead)
                        {
                            byteCount = outputStream.EndRead(latestEvent);
                            if (byteCount == 0)
                            {
                                pendingops--;
                                inputStream.Close();
                                continue;
                            }
                            Array.Copy(outputRead, inputWrite, byteCount);
                            inputStream.BeginWrite(inputWrite, 0, byteCount, ae.EndVoid(1, inputStream.EndWrite), Operation.InboundWrite);
                            outputStream.BeginRead(outputRead, 0, bufsize, ae.End(1, ar => outputStream.EndRead(ar)), Operation.OutboundRead);
                        }
                        else if (currentOp == Operation.InboundWrite)
                        {
                            inputStream.EndWrite(latestEvent);
                        }
                        else if (currentOp == Operation.OutboundWrite)
                        {
                            outputStream.EndWrite(latestEvent);
                        }
                    }
                }
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)