0MQ:如何以线程安全的方式使用ZeroMQ?

tob*_*sen 17 .net c# multithreading thread-safety zeromq

我阅读了ZeroMq指南,我偶然发现了以下内容:

你不能在线程之间共享ØMQ套接字.ØMQ套接字不是线程安全的.从技术上讲,它可以做到这一点,但它需要信号量,锁或互斥量.这将使您的应用程序变得缓慢而脆弱.在线程之间共享套接字远程理解的唯一地方是语言绑定,需要像套接字上的垃圾收集那样做魔术.

后来:

切记:除了创建它们的线程外,不要使用或关闭套接字.

我也明白ZeroMQ Context是线程安全的.

如果一个类在.Net中注册另一个类的事件,则可以从与创建监听器的线程不同的线程调用此事件.

我认为只有两个选项可以从事件处理程序中通过ZeroMQ-Sockets调度:

  • 将eventhandler-invoking-thread同步到Socket创建ZeroMQ-的线程
  • 通过使用threadsafe ZeroMQ-为eventhandler中的线程创建一个新的ZeroMQ- Socket/获取现有Socket的ZeroMQ-Context

似乎0MQ-Guide不鼓励第一个,我不认为为每个线程创建一个新的ZeroMq-Socket是高性能/可行的方式.

我的问题:
在事件处理程序中通过0MQ发布消息的正确模式(它的意图是什么)是什么?

此外,该指南的作者在编写时还考虑到.Net的ZeroMQ-Binding:

在线程之间共享套接字远程理解的唯一地方是语言绑定,需要像套接字上的垃圾收集那样做魔术.?

这里有一些示例代码来强调我的问题/问题:

public class ExampleClass
{
    public event EventHandler<ByteEventArgs> SomethinIsCalledFromAnotherThread;
}

public class ByteEventArgs : EventArgs
{
    public byte[] BytesToSend;
}


public class Dispatcher
{
    ZMQ.Context ctx;

    public Dispatcher(ZMQ.Context mqcontext, ExampleClass exampleClassInstance)
    {
        this.ctx = mqcontext;
        exampleClassInstance.SomethinIsCalledFromAnotherThread += new EventHandler<ByteEventArgs>(exampleClass_SomethinIsCalledFromAnotherThread);
    }

    void exampleClass_SomethinIsCalledFromAnotherThread(object sender, ByteEventArgs e)
    {
        // this method might be called by a different thread. So I have to get a new socket etc?
        using (var socket = ctx.Socket(ZMQ.SocketType.PUSH))
        {
            // init socket etc..... and finally: 
            socket.Send(e.BytesToSend);
        }
        // isn't that too much overhead?
    }
}
Run Code Online (Sandbox Code Playgroud)

小智 26

您可以创建许多0MQ套接字,当然可以创建多个线程.如果在一个线程中创建套接字,并在另一个线程中使用它,则必须在两个操作之间执行完整的内存屏障.任何其他东西都会导致libzmq中出现奇怪的随机故障,因为套接字对象不是线程安全的.

有一些传统模式,但我不知道这些模式是如何专门针对.NET的:

  1. 在使用它们的线程中创建套接字,句点.在紧密绑定到一个进程的线程之间共享上下文,并在未严格绑定的线程中创建单独的内容.在高级C API(czmq)中,这些称为附加和分离线程.
  2. 在父线程中创建套接字,并在线程创建时将其传递给附加线程.线程创建调用将执行完整的内存屏障.从那时起,在子线程中使用套接字."use"表示recv,send,setsockopt,getsockopt和close.
  3. 在一个线程中创建一个套接字,并在另一个线程中使用,在每次使用之间执行自己的完整内存屏障.这是非常微妙的,如果你不知道"完全记忆障碍"是什么,你不应该这样做.

  • 在.NET中,有许多操作涉及线程池(例如Task),其中操作在下一个可用线程上运行; 你不知道它将是哪个线程. (3认同)
  • @Pieter,安息吧 (2认同)

Vad*_*kan 7

在.net framework v4及更高版本中,您可以使用并发集合来解决此问题.即生产者 - 消费者模式.多个线程(处理程序)可以将数据发送到线程安全队列,只有单个线程使用队列中的数据并使用套接字发送它.

这是一个想法:

sendQueue = new BlockingCollection<MyStuff>(new ConcurrentQueue<MyStuff>());
// concurrent queue can accept from multiple threads/handlers safely
MyHandler += (MyStuff stuffToSend) => sendQueue.Add(stuffToSend);

// start single-threaded data send loop
Task.Factory.StartNew(() => {
    using(var socket = context.Socket()) {
        MyStuff stuffToSend;
        // this enumerable will be blocking until CompleteAdding is called
        foreach(var stuff in sendQueue.GetConsumingEnumerable())
            socket.Send(stuff.Serialize());
    }
});

// break out of the send loop when done
OnMyAppExit += sendQueue.CompleteAdding;
Run Code Online (Sandbox Code Playgroud)