cs0*_*815 6 .net c# asynchronous .net-4.5
我正在尝试创建并行事件订阅者.这是我的第一次尝试:
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using EventStore.ClientAPI;
namespace Sandbox
{
public class SomeEventSubscriber
{
private Position? _latestPosition;
private readonly Dictionary<Type, Action<object>> _eventHandlerMapping;
private IEventStoreConnection _connection;
public Dictionary<Type, Action<object>> EventHandlerMapping
{
get { return _eventHandlerMapping; }
}
public SomeEventSubscriber()
{
_eventHandlerMapping = CreateEventHandlerMapping();
_latestPosition = Position.Start;
}
public void Start()
{
ConnectToEventstore();
}
private void ConnectToEventstore()
{
_connection = EventStoreConnectionWrapper.Connect();
_connection.Connected +=
(sender, args) => _connection.SubscribeToAllFrom(_latestPosition, false, EventOccured, LiveProcessingStarted, HandleSubscriptionDropped);
}
private Dictionary<Type, Action<object>> CreateEventHandlerMapping()
{
return new Dictionary<Type, Action<object>>
{
{typeof (FakeEvent1), o => Handle(o as FakeEvent1)},
{typeof (FakeEvent2), o => Handle(o as FakeEvent2)},
};
}
private async Task Handle(FakeEvent1 eventToHandle)
{
SomethingLongRunning(eventToHandle);
}
private async Task Handle(FakeEvent2 eventToHandle)
{
SomethingLongRunning(eventToHandle);
}
private async Task SomethingLongRunning(BaseFakeEvent eventToHandle)
{
Console.WriteLine("Start Handling: " + eventToHandle.GetType());
var task = Task.Delay(10000);
await task;
Console.WriteLine("Finished Handling: " + eventToHandle.GetType());
}
private void EventOccured(EventStoreCatchUpSubscription eventStoreCatchUpSubscription,
ResolvedEvent resolvedEvent)
{
if (resolvedEvent.OriginalEvent.EventType.StartsWith("$") || resolvedEvent.OriginalEvent.EventStreamId.StartsWith("$"))
return;
var @event = EventSerialization.DeserializeEvent(resolvedEvent.OriginalEvent);
if (@event != null)
{
var eventType = @event.GetType();
if (_eventHandlerMapping.ContainsKey(eventType))
{
var task = Task.Factory.StartNew(() => _eventHandlerMapping[eventType](event));
Console.WriteLine("The task is running asynchronously...");
}
}
if (resolvedEvent.OriginalPosition != null) _latestPosition = resolvedEvent.OriginalPosition.Value;
}
private void HandleSubscriptionDropped(EventStoreCatchUpSubscription subscription, SubscriptionDropReason dropReason, Exception ex)
{
if (dropReason == SubscriptionDropReason.ProcessingQueueOverflow)
{
//TODO: Wait and reconnect probably with back off
}
if (dropReason == SubscriptionDropReason.UserInitiated)
return;
if (SubscriptionDropMayBeRecoverable(dropReason))
{
Start();
}
}
private static bool SubscriptionDropMayBeRecoverable(SubscriptionDropReason dropReason)
{
return dropReason == SubscriptionDropReason.Unknown || dropReason == SubscriptionDropReason.SubscribingError ||
dropReason == SubscriptionDropReason.ServerError || dropReason == SubscriptionDropReason.ConnectionClosed;
}
private static void LiveProcessingStarted(EventStoreCatchUpSubscription eventStoreCatchUpSubscription)
{
}
}
}
Run Code Online (Sandbox Code Playgroud)
在您的专家意见中,这是一种有效的方法吗?你能否提出任何改进建议?
PS:
也许:
Task.Run(() => _eventHandlerMapping[eventType](@event));
Run Code Online (Sandbox Code Playgroud)
会更好?
您有一个EventOccured委托,您将在其中收到有关第一个事件中发生的所有事件的通知,请考虑在与触发事件的调度程序不同的调度程序中EventStore
运行预代码。
其次,您是否可以将其更改为with 实现,然后扩展它并为每种类型创建单独的实例。那将是更干净的解决方案。第三,考虑定制排队和运行这些任务
的习惯。
http://msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler(v=vs.110).aspxEventOccuredabstract classFakeEventBaseFakeEventThreadSchedulerHandle
编辑:
我会有一个像下面这样的广播器类,它知道操作何时完成并引发完成的事件。
public class EventBroadcaster
{
public event EventHandler SomeEventOccured;
public async void DoLongRunningOperationAndRaiseFinishedEvent()
{
var waitingTask = Task.Delay(TimeSpan.FromSeconds(2));
await waitingTask.ContinueWith(t => RaiseSomeEventOccured(), CancellationToken.None,
TaskContinuationOptions.OnlyOnRanToCompletion, TaskScheduler.Current);
}
private void RaiseSomeEventOccured()
{
EventHandler handler = SomeEventOccured;
if (handler != null) handler(this, EventArgs.Empty);
}
}
Run Code Online (Sandbox Code Playgroud)
然后是一个事件监听器
public class EventListner
{
private readonly string _id;
public EventListner(string id)
{
_id = id;
}
public void ListenTo(EventBroadcaster broadcaster)
{
broadcaster.SomeEventOccured += OnSomeEventOccured;
}
private async void OnSomeEventOccured(object sender, EventArgs eventArgs)
{
var currentTime = DateTime.Now;
Console.WriteLine("EventListner {0} received at {1}", _id,
currentTime.ToString("dd-MM-yyyy HH:mm:ss.fffffff"));
//Not required just to show this does not affect other instances.
//await Task.Delay(TimeSpan.FromSeconds(5));
}
}
Run Code Online (Sandbox Code Playgroud)
那么这将是用于测试的 Program.cs
public static class Program
{
public static void Main(string[] args)
{
var broadcaster = new EventBroadcaster();
var listners = new List<EventListner>();
for (int i = 1; i < 10; i++)
{
var listner = new EventListner(i.ToString(CultureInfo.InvariantCulture));
listner.ListenTo(broadcaster);
listners.Add(listner);
}
broadcaster.DoLongRunningOperationAndRaiseFinishedEvent();
Console.WriteLine("Waiting for operation to complete");
Console.ReadLine();
}
}
Run Code Online (Sandbox Code Playgroud)
在此示例中,处理程序委托按照其订阅的顺序一一触发。
现在将广播器中的代码修改为如下所示注意:为了便于编码,我已将方法签名从 更改EventHandler为Action。
private void RaiseSomeEventOccured()
{
Action handler = SomeEventOccured;
if (handler != null)
{
var parallelOption = new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount };
Parallel.Invoke(parallelOption, Array.ConvertAll(handler.GetInvocationList(), ConvertToAction));
handler();
}
}
private Action ConvertToAction(Delegate del)
{
return (Action)del;
}
Run Code Online (Sandbox Code Playgroud)
现在您将看到事件以随机顺序触发。
我使用选项 1 获得了更好的性能。
注意:在进行编程之前,您始终TPL需要Parallel确保有好处。
| 归档时间: |
|
| 查看次数: |
597 次 |
| 最近记录: |