具有多线程轮询器取消的ZeroMQ PUB/SUB模式

Moo*_*ght 17 c# c++ multithreading zeromq netmq

我有两个应用程序,一个C++服务器和一个C#WPF UI.C++代码通过ZeroMQ消息传递[PUB/SUB]服务接收请求(来自任何地方/任何人).我使用我的C#代码进行反向测试并创建"反向测试"并执行它们.这些反向测试可以由许多"单元测试"组成,每个测试都从C++服务器发送/接收数千条消息.

目前,单独的背部测试工作良好,可以发送N个单元测试,每个测试具有数千个请求和捕获.我的问题是架构; 当我发出另一个回测试(在第一次之后)时,由于轮询线程没有被取消和处理,我得到第二次事件订阅的问题.这导致错误的输出.这可能看起来像一个微不足道的问题(可能是你们中的一些人),但在我当前的配置下取消这个轮询任务证明是麻烦的.一些代码......

我的消息代理类很简单,看起来像

public class MessageBroker : IMessageBroker<Taurus.FeedMux>, IDisposable
{
    private Task pollingTask;
    private NetMQContext context;
    private PublisherSocket pubSocket;

    private CancellationTokenSource source;
    private CancellationToken token;
    private ManualResetEvent pollerCancelled;

    public MessageBroker()
    {
        this.source = new CancellationTokenSource();
        this.token = source.Token;

        StartPolling();
        context = NetMQContext.Create();
        pubSocket = context.CreatePublisherSocket();
        pubSocket.Connect(PublisherAddress);
    }

    public void Dispatch(Taurus.FeedMux message)
    {
        pubSocket.Send(message.ToByteArray<Taurus.FeedMux>());
    }

    private void StartPolling()
    {
        pollerCancelled = new ManualResetEvent(false);
        pollingTask = Task.Run(() =>
        {
            try
            {
                using (var context = NetMQContext.Create())
                using (var subSocket = context.CreateSubscriberSocket())
                {
                    byte[] buffer = null;
                    subSocket.Options.ReceiveHighWatermark = 1000;
                    subSocket.Connect(SubscriberAddress);
                    subSocket.Subscribe(String.Empty);
                    while (true)
                    {
                        buffer = subSocket.Receive();
                        MessageRecieved.Report(buffer.ToObject<Taurus.FeedMux>());
                        if (this.token.IsCancellationRequested)
                            this.token.ThrowIfCancellationRequested();
                    }
                }
            }
            catch (OperationCanceledException)
            {
                pollerCancelled.Set();
            }
        }, this.token);
    }

    private void CancelPolling()
    {
        source.Cancel();
        pollerCancelled.WaitOne();
        pollerCancelled.Close();
    }

    public IProgress<Taurus.FeedMux> MessageRecieved { get; set; }
    public string PublisherAddress { get { return "tcp://127.X.X.X:6500"; } }
    public string SubscriberAddress { get { return "tcp://127.X.X.X:6501"; } }

    private bool disposed = false;

    protected virtual void Dispose(bool disposing)
    {
        if (!disposed)
        {
            if (disposing)
            {
                if (this.pollingTask != null)
                {
                    CancelPolling();
                    if (this.pollingTask.Status == TaskStatus.RanToCompletion ||
                         this.pollingTask.Status == TaskStatus.Faulted ||
                         this.pollingTask.Status == TaskStatus.Canceled)
                    {
                        this.pollingTask.Dispose();
                        this.pollingTask = null;
                    }
                }
                if (this.context != null)
                {
                    this.context.Dispose();
                    this.context = null;
                }
                if (this.pubSocket != null)
                {
                    this.pubSocket.Dispose();
                    this.pubSocket = null;
                }
                if (this.source != null)
                {
                  this.source.Dispose();
                  this.source = null;
                }
            }
            disposed = true;
        }
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    ~MessageBroker()
    {
        Dispose(false);
    }
}
Run Code Online (Sandbox Code Playgroud)

回测"引擎"用于执行每个回测,首先构造一个Dictionary包含每个Test(单元测试)和消息,以便为每个测试分派到C++应用程序.

DispatchTests方法,在这里它是

private void DispatchTests(ConcurrentDictionary<Test, List<Taurus.FeedMux>> feedMuxCollection)
{
    broker = new MessageBroker();
    broker.MessageRecieved = new Progress<Taurus.FeedMux>(OnMessageRecieved);
    testCompleted = new ManualResetEvent(false);

    try
    {
        // Loop through the tests. 
        foreach (var kvp in feedMuxCollection)
        {
            testCompleted.Reset();
            Test t = kvp.Key;
            t.Bets = new List<Taurus.Bet>();
            foreach (Taurus.FeedMux mux in kvp.Value)
            {
                token.ThrowIfCancellationRequested();
                broker.Dispatch(mux);
            }
            broker.Dispatch(new Taurus.FeedMux()
            {
                type = Taurus.FeedMux.Type.PING,
                ping = new Taurus.Ping() { event_id = t.EventID }
            });
            testCompleted.WaitOne(); // Wait until all messages are received for this test. 
        }
        testCompleted.Close();
    }
    finally
    {
        broker.Dispose(); // Dispose the broker.
    }
}
Run Code Online (Sandbox Code Playgroud)

最后的PING消息,告诉C++我们已经完成了.然后我们强制等待,以便在从C++代码收到所有返回之前不调度下一个[unit]测试 - 我们使用a执行此操作ManualResetEvent.

当C++收到PING消息时,它会直接发送消息.我们处理收到的消息OnMessageRecieved,PING告诉我们设置,ManualResetEvent.Set()以便我们继续进行单元测试; "下一个"...

private async void OnMessageRecieved(Taurus.FeedMux mux)
{
    string errorMsg = String.Empty;
    if (mux.type == Taurus.FeedMux.Type.MSG)
    {
        // Do stuff.
    }
    else if (mux.type == Taurus.FeedMux.Type.PING)
    {
        // Do stuff.

        // We are finished reciving messages for this "unit test"
        testCompleted.Set(); 
    }
}
Run Code Online (Sandbox Code Playgroud)

我的问题是,broker.Dispose()在最后上面永远不会被击中.我很欣赏最终在后台线程上执行的块不能保证执行.

上面划掉的文字是由于我弄乱了代码; 在孩子完成之前我正在停止父线程.但是,仍有问题......

现在broker.Dispose()被正确调用,并被broker.Dispose()调用,在此方法中,我尝试取消轮询线程并Task正确处置以避免任何多个订阅.

要取消线程,我使用该CancelPolling()方法

private void CancelPolling()
{
    source.Cancel();
    pollerCancelled.WaitOne(); <- Blocks here waiting for cancellation.
    pollerCancelled.Close();
}
Run Code Online (Sandbox Code Playgroud)

但在StartPolling()方法中

while (true)
{
    buffer = subSocket.Receive();
    MessageRecieved.Report(buffer.ToObject<Taurus.FeedMux>());
    if (this.token.IsCancellationRequested)
        this.token.ThrowIfCancellationRequested();
}
Run Code Online (Sandbox Code Playgroud)

ThrowIfCancellationRequested()永远不会被调用,线程永远不会被取消,因此从未妥善处理.该subSocket.Receive()方法阻止了轮询器线程.

现在,我不清楚如何实现我想要的东西,我需要调用除了用于轮询消息的线程之外的broker.Dispose()/ PollerCancel(),以及一些如何强制取消.线程中止不是我想要不惜任何代价进入的.

本质上,我想broker在执行下一个后台测试之前正确处理它,如何正确处理它,拆分轮询并在单独的应用程序域中运行它?

我已经尝试过,在OnMessageRecived处理程序内部进行处理,但这显然是在与poller相同的线程上执行的,并且不是这样做的方式,而不调用其他线程,它会阻塞.

什么是达到我想要的最佳方式,并有一个模式的这种情况下,我可以遵循?

谢谢你的时间.

use*_*197 1

对主题的更高层次的看法

你们致力于创建测试框架的专注和努力,表明你们的目标是开发一种严格的、专业级的方法,这让我首先向这种勇敢的事业致敬。

虽然测试是一项重要的活动,可以提供合理的定量证据,证明被测系统满足既定的期望,但测试的成功取决于测试环境与实际部署条件的接近程度。

人们可能会同意,在另一个不同的基础上进行测试并不能证明实际部署将在与测试环境主要不同的环境中按预期运行。


逐元素控制还是仅逐状态控制,这就是问题所在。

您的努力(至少在发布 OP 时)集中在代码架构上,该架构尝试将实例保持在适当的位置,并尝试在下一次测试电池开始之前重新设置 Poller 实例的内部状态。

在我看来,如果你追求专业测试,测试有几个需要遵循的原则:

  • 测试重复性原则(测试的重新运行应提供相同的结果,从而避免仅提供结果的准测试——“抽奖”)

  • 无干预测试原则(测试的重新运行不应受到“外部”干扰,不受测试场景控制)

话虽如此,让我带来一些受哈利·马科维茨启发的笔记,他因出色的定量投资组合优化研究而获得诺贝尔奖。

而是退后一步来控制元素的整个生命周期

CACI Simulations, Inc.(Harry Markowitz 的公司之一)在 90 年代初开发了他们的旗舰软件框架 COMET III - 一个极其强大的模拟引擎,用于大规模计算中运行的流程的大型、复杂的设计原型和性能模拟/网络/电信网络。

COMET III 给人最大的印象是它能够生成测试场景,包括可配置的测试前“预热”预加载,这使得被测元素进入类似于机械中“疲劳”含义的状态酷刑试验或氢扩散脆性对核电站冶金学家意味着什么。

是的,一旦您深入了解算法、节点缓冲区、内存分配、管道/负载平衡/网格处理架构选择、故障恢复开销、垃圾收集策略和有限资源共享算法的底层细节工作和影响(在实际使用的工作负载模式“压力”下)端到端性能/延迟,此功能是必不可少的。

这意味着,与单个实例相关的简单状态控制是不够的,因为它不提供测试可重复性和测试隔离/非干预行为的方法。简而言之,即使您找到一种“重置” Poller 实例的方法,这也不会让您进入实际测试,并保证测试可重复性和可能的​​预测试预热。

需要后退一步和更高的抽象层和测试场景控制。

这如何应用于 OP 问题?

  • 而不仅仅是国家控制
  • 创建多层架构/控制平面/单独的信令

支持这一目标的 ZeroMQ 方式

  • 创建超级结构作为重要的模式
  • 对测试场景中使用的实例进行完整的生命周期控制
  • 保持 ZeroMQ 准则:零共享、零阻塞……
  • 从多上下文中受益()