tob*_*sen 17 .net c# multithreading thread-safety zeromq
我阅读了ZeroMq指南,我偶然发现了以下内容:
你不能在线程之间共享ØMQ套接字.ØMQ套接字不是线程安全的.从技术上讲,它可以做到这一点,但它需要信号量,锁或互斥量.这将使您的应用程序变得缓慢而脆弱.在线程之间共享套接字远程理解的唯一地方是语言绑定,需要像套接字上的垃圾收集那样做魔术.
后来:
切记:除了创建它们的线程外,不要使用或关闭套接字.
我也明白ZeroMQ Context是线程安全的.
如果一个类在.Net中注册另一个类的事件,则可以从与创建监听器的线程不同的线程调用此事件.
我认为只有两个选项可以从事件处理程序中通过ZeroMQ-Sockets调度:
Socket创建ZeroMQ-的线程Socket/获取现有Socket的ZeroMQ-Context 似乎0MQ-Guide不鼓励第一个,我不认为为每个线程创建一个新的ZeroMq-Socket是高性能/可行的方式.
我的问题:
在事件处理程序中通过0MQ发布消息的正确模式(它的意图是什么)是什么?
此外,该指南的作者在编写时还考虑到.Net的ZeroMQ-Binding:
在线程之间共享套接字远程理解的唯一地方是语言绑定,需要像套接字上的垃圾收集那样做魔术.?
这里有一些示例代码来强调我的问题/问题:
public class ExampleClass
{
public event EventHandler<ByteEventArgs> SomethinIsCalledFromAnotherThread;
}
public class ByteEventArgs : EventArgs
{
public byte[] BytesToSend;
}
public class Dispatcher
{
ZMQ.Context ctx;
public Dispatcher(ZMQ.Context mqcontext, ExampleClass exampleClassInstance)
{
this.ctx = mqcontext;
exampleClassInstance.SomethinIsCalledFromAnotherThread += new EventHandler<ByteEventArgs>(exampleClass_SomethinIsCalledFromAnotherThread);
}
void exampleClass_SomethinIsCalledFromAnotherThread(object sender, ByteEventArgs e)
{
// this method might be called by a different thread. So I have to get a new socket etc?
using (var socket = ctx.Socket(ZMQ.SocketType.PUSH))
{
// init socket etc..... and finally:
socket.Send(e.BytesToSend);
}
// isn't that too much overhead?
}
}
Run Code Online (Sandbox Code Playgroud)
小智 26
您可以创建许多0MQ套接字,当然可以创建多个线程.如果在一个线程中创建套接字,并在另一个线程中使用它,则必须在两个操作之间执行完整的内存屏障.任何其他东西都会导致libzmq中出现奇怪的随机故障,因为套接字对象不是线程安全的.
有一些传统模式,但我不知道这些模式是如何专门针对.NET的:
在.net framework v4及更高版本中,您可以使用并发集合来解决此问题.即生产者 - 消费者模式.多个线程(处理程序)可以将数据发送到线程安全队列,只有单个线程使用队列中的数据并使用套接字发送它.
这是一个想法:
sendQueue = new BlockingCollection<MyStuff>(new ConcurrentQueue<MyStuff>());
// concurrent queue can accept from multiple threads/handlers safely
MyHandler += (MyStuff stuffToSend) => sendQueue.Add(stuffToSend);
// start single-threaded data send loop
Task.Factory.StartNew(() => {
using(var socket = context.Socket()) {
MyStuff stuffToSend;
// this enumerable will be blocking until CompleteAdding is called
foreach(var stuff in sendQueue.GetConsumingEnumerable())
socket.Send(stuff.Serialize());
}
});
// break out of the send loop when done
OnMyAppExit += sendQueue.CompleteAdding;
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
14247 次 |
| 最近记录: |