如何在纯 C# 中制作排队消息代理

bad*_*ror 5 c# publish-subscribe unity-game-engine

背景

我需要一个排队的消息代理以分布式(在连续帧上)的方式调度消息。在下面显示的示例中,它将处理不超过 10 个订阅者,然后在进一步处理之前等待下一帧。

(为了让那些不熟悉 Unity3D 的人澄清,Process()方法是使用 Unity 的内置方法运行的,StartCoroutine()并且 - 在这种情况下 - 将持续游戏的整个生命周期 - 从队列中等待或处理。)

所以我有一个相对简单的类:

public class MessageBus : IMessageBus
{
    private const int LIMIT = 10;
    private readonly WaitForSeconds Wait;

    private Queue<IMessage> Messages;
    private Dictionary<Type, List<Action<IMessage>>> Subscribers;

    public MessageBus()
    {
        Wait = new WaitForSeconds(2f);
        Messages = new Queue<IMessage>();
        Subscribers = new Dictionary<Type, List<Action<IMessage>>>();
    }

    public void Submit(IMessage message)
    {
        Messages.Enqueue(message);
    }

    public IEnumerator Process()
    {
        var processed = 0;

        while (true)
        {
            if (Messages.Count == 0)
            {
                yield return Wait;
            }
            else
            {
                while(Messages.Count > 0)
                {
                    var message = Messages.Dequeue();

                    foreach (var subscriber in Subscribers[message.GetType()])
                    {
                        if (processed >= LIMIT)
                        {
                            processed = 0;
                            yield return null;
                        }

                        processed++;
                        subscriber?.Invoke(message);
                    }
                }

                processed = 0;
            }
        }
    }

    public void Subscribe<T>(Action<IMessage> handler) where T : IMessage
    {
        if (!Subscribers.ContainsKey(typeof(T)))
        {
            Subscribers[typeof(T)] = new List<Action<IMessage>>();
        }

        Subscribers[typeof(T)].Add(handler);
    }

    public void Unsubscribe<T>(Action<IMessage> handler) where T : IMessage
    {
        if (!Subscribers.ContainsKey(typeof(T)))
        {
            return;
        }

        Subscribers[typeof(T)].Remove(handler);
    }
}
Run Code Online (Sandbox Code Playgroud)

它的工作和行为与预期的一样,但存在一个问题。

问题

我想像这样使用它(从订阅者的角度来看):

public void Run()
{
    MessageBus.Subscribe<TestEvent>(OnTestEvent);
}

public void OnTestEvent(TestEvent message)
{
    message.SomeTestEventMethod();
}
Run Code Online (Sandbox Code Playgroud)

但这显然失败了,因为Action<IMessage>无法转换为Action<TestEvent>.

我可以使用它的唯一方法是这样的:

public void Run()
{
    MessageBus.Subscribe<TestEvent>(OnTestEvent);
}

public void OnTestEvent(IMessage message)
{
    ((TestEvent)message).SomeTestEventMethod();
}
Run Code Online (Sandbox Code Playgroud)

但这感觉不优雅且非常浪费,因为每个订阅者都需要自己进行演员表。

我试过的

我正在尝试像这样的“铸造”动作:

public void Subscribe<T>(Action<T> handler) where T : IMessage
{
    if (!Subscribers.ContainsKey(typeof(T)))
    {
        Subscribers[typeof(T)] = new List<Action<IMessage>>();
    }

    Subscribers[typeof(T)].Add((IMessage a) => handler((T)a));
}
Run Code Online (Sandbox Code Playgroud)

这适用于subscribe部分,但显然不适用于unsubscribe。我可以将新创建​​的 handler-wrapper-lambdas 缓存在某个地方,以便在取消订阅时使用,但老实说,我认为这不是真正的解决方案。

问题

我怎样才能让它像我想的那样工作?如果可能的话,最好使用一些 C#“魔法”,但我知道它可能需要一种完全不同的方法。

因为这将在游戏中使用,并在其生命周期内运行,如果可能的话,我想要一个无垃圾的解决方案。

spe*_*der 2

所以问题是您试图将不同类型的列表存储为订户字典中的值。

解决这个问题的一种方法可能是存储 aList<Delegate>然后使用Delegate.DynamicInvoke.

下面是一些测试代码,总结了要点:

Dictionary<Type, List<Delegate>> Subscribers = new Dictionary<Type, List<Delegate>>();

void Main()
{
    Subscribe<Evt>(ev => Console.WriteLine($"hello {ev.Message}"));
    IMessage m = new Evt("spender");
    foreach (var subscriber in Subscribers[m.GetType()])
    {
        subscriber?.DynamicInvoke(m);
    }
}

public void Subscribe<T>(Action<T> handler) where T : IMessage
{
    if (!Subscribers.ContainsKey(typeof(T)))
    {
        Subscribers[typeof(T)] = new List<Delegate>();
    }
    Subscribers[typeof(T)].Add(handler);
}

public interface IMessage{}

public class Evt : IMessage
{
    public Evt(string message)
    {
        this.Message = message;
    }
    public string Message { get; }
}
Run Code Online (Sandbox Code Playgroud)