bit*_*onk 3 .net multithreading
在.NET中的线程之间传递数据的方法是什么?我目前可以想到两件事:
.NET Framework有哪些解决方案来解决这个问题.也许.NET已经实现了通用的生产者 - 消费者模式?也许我可以以某种方式使用Thread.GetData和Thread.SetData?
作为Ash解决方案的替代方案,请考虑以下示例.
假设您有两个线程 - 一个用于接收来自套接字的数据包,另一个用于处理这些数据包.显然,当数据包可用于处理时,Receiver线程需要通知处理器线程,因此需要以某种方式在线程之间共享数据包.我通常使用共享数据队列执行此操作.
同时,我们不一定要将线程紧密耦合在一起.例如,Receiver线程甚至不应该知道处理器线程存在.接收方需要关注的是从网络接收数据包,然后通知任何感兴趣的用户数据包可用于处理.事件是在.NET中实现这一目标的完美方式.
所以这里有一些代码.
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Threading;
public class Packet
{
public byte[] Buffer { get; private set; }
public Packet(byte[] buffer)
{
Buffer = buffer;
}
}
public class PacketEventArgs : EventArgs
{
public Packet Packet { get; set; }
}
public class UdpState
{
public UdpClient Client{get;set;}
public IPEndPoint EndPoint{get;set;}
}
public class Receiver
{
public event EventHandler<PacketEventArgs> PacketReceived;
private Thread _thread;
private ManualResetEvent _shutdownThread = new ManualResetEvent(false);
public void Start() { _thread.Start(); }
public void Stop() { _shutdownThread.Set(); }
public Receiver()
{
_thread = new Thread(
delegate() {
// Create the client UDP socket.
IPEndPoint endPoint = new IPEndPoint(IPAddress.Any, 5006);
UdpClient client = new UdpClient( endPoint );
// Receive the packets asynchronously.
client.BeginReceive(
new AsyncCallback(OnPacketReceived),
new UdpState() { Client = client, EndPoint = endpoint });
// Wait for the thread to end.
_shutdownThread.WaitOne();
}
);
}
private void OnPacketReceived(IAsyncResult ar)
{
UdpState state = (UdpState)ar.AsyncState;
IPEndPoint endPoint = state.EndPoint;
byte[] bytes = state.Client.EndReceive(ar, ref endPoint);
// Create the packet.
Packet packet = new Packet(bytes);
// Notify any listeners.
EventHandler<PacketEventArgs> handler = PacketReceived;
if (handler != null) {
handler(this, new PacketEventArgs() { Packet = packet });
}
// Read next packet.
if (!_shutdownThread.WaitOne(0)) {
state.Client.BeginReceive(
new AsyncCallback(OnPacketReceived),
state);
}
}
}
public class Processor
{
private Thread _thread;
private object _sync = new object();
private ManualResetEvent _packetReceived = new ManualResetEvent(false);
private ManualResetEvent _shutdownThread = new ManualResetEvent(false);
private Queue<Packet> _packetQueue = new Queue<Packet>(); // shared data
public void Start() { _thread.Start(); }
public void Stop() { _shutdownThread.Set(); }
public Processor()
{
_thread = new Thread(
delegate() {
WaitHandle[] handles = new WaitHandle[] {
_shutdownThread,
_packetReceived
};
while (!_shutdownThread.WaitOne(0)) {
switch (WaitHandle.WaitAny(handles)) {
case 0: // Shutdown Thread Event
break;
case 1: // Packet Received Event
_packetReceived.Reset();
ProcessPackets();
break;
default:
Stop();
break;
}
}
}
);
}
private void ProcessPackets()
{
Queue<Packet> localPacketQueue = null;
Queue<Packet> newPacketQueue = new Queue<Packet>();
lock (_sync) {
// Swap out the populated queue with the empty queue.
localPacketQueue = _packetQueue;
_packetQueue = newPacketQueue;
}
foreach (Packet packet in localPacketQueue) {
Console.WriteLine(
"Packet received with {0} bytes",
packet.Buffer.Length );
}
}
public void OnPacketReceived(object sender, PacketEventArgs e)
{
// NOTE: This function executes on the Receiver thread.
lock (_sync) {
// Enqueue the packet.
_packetQueue.Enqueue(e.Packet);
}
// Notify the Processor thread that a packet is available.
_packetReceived.Set();
}
}
static void Main()
{
Receiver receiver = new Receiver();
Processor processor = new Processor();
receiver.PacketReceived += processor.OnPacketReceived;
processor.Start();
receiver.Start();
Thread.Sleep(5000);
receiver.Stop();
processor.Stop();
}
Run Code Online (Sandbox Code Playgroud)
我知道那里要消化很多.该程序应该在.NET 3.5中工作,只要您在端口5006上有UDP流量.
就线程之间的数据共享而言,兴趣点是Processor类的ProcessPackets()和OnPacketReceived()方法.请注意,OnPacketReceived()方法在Receiver线程上发生,即使该方法是Processor类的一部分,并且使用同步对象同步队列.
| 归档时间: |
|
| 查看次数: |
8162 次 |
| 最近记录: |