并行事件订阅者.net 4.5

cs0*_*815 6 .net c# asynchronous .net-4.5

我正在尝试创建并行事件订阅者.这是我的第一次尝试:

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using EventStore.ClientAPI;

namespace Sandbox
{
    public class SomeEventSubscriber
    {
        private Position? _latestPosition;
        private readonly Dictionary<Type, Action<object>> _eventHandlerMapping;
        private IEventStoreConnection _connection;

        public Dictionary<Type, Action<object>> EventHandlerMapping
        {
            get { return _eventHandlerMapping; }
        }

        public SomeEventSubscriber()
        {
            _eventHandlerMapping = CreateEventHandlerMapping();
            _latestPosition = Position.Start;
        }

        public void Start()
        {
            ConnectToEventstore();
        }

        private void ConnectToEventstore()
        {
            _connection = EventStoreConnectionWrapper.Connect();
            _connection.Connected +=
            (sender, args) => _connection.SubscribeToAllFrom(_latestPosition, false, EventOccured, LiveProcessingStarted, HandleSubscriptionDropped);
        }

        private Dictionary<Type, Action<object>> CreateEventHandlerMapping()
        {
            return new Dictionary<Type, Action<object>>
            {
                {typeof (FakeEvent1), o => Handle(o as FakeEvent1)},
                {typeof (FakeEvent2), o => Handle(o as FakeEvent2)},
            };
        }

        private async Task Handle(FakeEvent1 eventToHandle)
        {
            SomethingLongRunning(eventToHandle);
        }

        private async Task Handle(FakeEvent2 eventToHandle)
        {
            SomethingLongRunning(eventToHandle);
        }

        private async Task SomethingLongRunning(BaseFakeEvent eventToHandle)
        {
            Console.WriteLine("Start Handling: " + eventToHandle.GetType());
            var task = Task.Delay(10000);
            await task;
            Console.WriteLine("Finished Handling: " + eventToHandle.GetType());
        }

        private void EventOccured(EventStoreCatchUpSubscription eventStoreCatchUpSubscription,
            ResolvedEvent resolvedEvent)
        {
            if (resolvedEvent.OriginalEvent.EventType.StartsWith("$") || resolvedEvent.OriginalEvent.EventStreamId.StartsWith("$"))
                return;

            var @event = EventSerialization.DeserializeEvent(resolvedEvent.OriginalEvent);
            if (@event != null)
            {
                var eventType = @event.GetType();
                if (_eventHandlerMapping.ContainsKey(eventType))
                {
                    var task = Task.Factory.StartNew(() => _eventHandlerMapping[eventType](event));
                    Console.WriteLine("The task is running asynchronously...");
                }
            }
            if (resolvedEvent.OriginalPosition != null) _latestPosition = resolvedEvent.OriginalPosition.Value;
        }

        private void HandleSubscriptionDropped(EventStoreCatchUpSubscription subscription, SubscriptionDropReason dropReason, Exception ex)
        {
            if (dropReason == SubscriptionDropReason.ProcessingQueueOverflow)
            {
                //TODO: Wait and reconnect probably with back off
            }

            if (dropReason == SubscriptionDropReason.UserInitiated)
                return;

            if (SubscriptionDropMayBeRecoverable(dropReason))
            {
                Start();
            }
        }

        private static bool SubscriptionDropMayBeRecoverable(SubscriptionDropReason dropReason)
        {
            return dropReason == SubscriptionDropReason.Unknown || dropReason == SubscriptionDropReason.SubscribingError ||
                   dropReason == SubscriptionDropReason.ServerError || dropReason == SubscriptionDropReason.ConnectionClosed;
        }

        private static void LiveProcessingStarted(EventStoreCatchUpSubscription eventStoreCatchUpSubscription)
        {

        }
    }
}
Run Code Online (Sandbox Code Playgroud)

在您的专家意见中,这是一种有效的方法吗?你能否提出任何改进建议?

PS:

也许:

Task.Run(() => _eventHandlerMapping[eventType](@event));
Run Code Online (Sandbox Code Playgroud)

会更好?

Vig*_*h.N 1

您有一个EventOccured委托,您将在其中收到有关第一个事件中发生的所有事件的通知,请考虑在与触发事件的调度程序不同的调度程序中EventStore
运行预代码。 其次,您是否可以将其更改为with 实现,然后扩展它并为每种类型创建单独的实例。那将是更干净的解决方案。第三,考虑定制排队和运行这些任务 的习惯。 http://msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler(v=vs.110).aspxEventOccured
abstract classFakeEventBaseFakeEvent
ThreadSchedulerHandle

编辑:
我会有一个像下面这样的广播器类,它知道操作何时完成并引发完成的事件。

public class EventBroadcaster
{
    public event EventHandler SomeEventOccured;

    public async void DoLongRunningOperationAndRaiseFinishedEvent()
    {
        var waitingTask = Task.Delay(TimeSpan.FromSeconds(2));

        await waitingTask.ContinueWith(t => RaiseSomeEventOccured(), CancellationToken.None,
            TaskContinuationOptions.OnlyOnRanToCompletion, TaskScheduler.Current);
    }

    private void RaiseSomeEventOccured()
    {
        EventHandler handler = SomeEventOccured;
        if (handler != null) handler(this, EventArgs.Empty);
    }
}
Run Code Online (Sandbox Code Playgroud)

然后是一个事件监听器

public class EventListner
{
    private readonly string _id;

    public EventListner(string id)
    {
        _id = id;
    }

    public void ListenTo(EventBroadcaster broadcaster)
    {
        broadcaster.SomeEventOccured += OnSomeEventOccured;
    }

    private async void OnSomeEventOccured(object sender, EventArgs eventArgs)
    {
        var currentTime = DateTime.Now;
        Console.WriteLine("EventListner {0} received at {1}", _id,
            currentTime.ToString("dd-MM-yyyy HH:mm:ss.fffffff"));

        //Not required just to show this does not affect other instances.
        //await Task.Delay(TimeSpan.FromSeconds(5));
    }
}
Run Code Online (Sandbox Code Playgroud)

那么这将是用于测试的 Program.cs

public static class Program
{
    public static void Main(string[] args)
    {
        var broadcaster = new EventBroadcaster();

        var listners = new List<EventListner>();

        for (int i = 1; i < 10; i++)
        {
            var listner = new EventListner(i.ToString(CultureInfo.InvariantCulture));
            listner.ListenTo(broadcaster);
            listners.Add(listner);
        }

        broadcaster.DoLongRunningOperationAndRaiseFinishedEvent();

        Console.WriteLine("Waiting for operation to complete");

        Console.ReadLine();

    }
}
Run Code Online (Sandbox Code Playgroud)

在此示例中,处理程序委托按照其订阅的顺序一一触发。

现在将广播器中的代码修改为如下所示注意:为了便于编码,我已将方法签名从 更改EventHandlerAction

    private void RaiseSomeEventOccured()
    {
        Action handler = SomeEventOccured;
        if (handler != null)
        {
            var parallelOption = new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount };
            Parallel.Invoke(parallelOption, Array.ConvertAll(handler.GetInvocationList(), ConvertToAction));
            handler();
        }
    }

    private Action ConvertToAction(Delegate del)
    {
        return (Action)del;
    }
Run Code Online (Sandbox Code Playgroud)

现在您将看到事件以随机顺序触发。
我使用选项 1 获得了更好的性能。
注意:在进行编程之前,您始终TPL需要Parallel确保有好处。

  • 抱歉,您的回答没有意义。为什么 SomeEventSubscriber 应该实现 FakeEventBase?也不知道为什么我应该在这样一个简单的场景中实现一个量身定制的 ThreadScheduler ... (2认同)
  • 如果您有多个侦听器,那么您希望并行处理事件,则可以并行处理事件。想象一个交通信号灯,有多辆汽车在信号灯中等待“绿灯”,然后所有汽车根据前面的交通情况做出移动的决定;在您的情况下,“EventOccured”不会同时广播到所有处理程序,而是“EventOccured”将它们一一分派,尽管异步希望您能得到差异。 (2认同)