通过增强OrderManager来解决以下同步问题的最佳方法是什么?OrderForm需要获取最新的订单和交易列表并订阅这些事件,而OrderManager通过另一个线程生成订单和交易.
public class OrderManager
{
public event EventHandler<OrderEventArgs> OrderAdded;
public event EventHandler<OrderEventArgs> OrderUpdated;
public event EventHandler<OrderEventArgs> OrderDeleted;
public event EventHandler<TradeEventArgs> TradeAdded;
public List<Order> Orders { get; private set; }
public List<Trade> Trades { get; private set; }
...
}
public class OrderForm
{
public OrderForm(OrderManager manager)
{
manager.OrderAdded += manager_OrderAdded;
manager.OrderUpdated += manager_OrderUpdated;
manager.OrderDeleted += manager_OrderDeleted;
manager.TradeAdded += manager_TradeAdded;
Populate(manager.Orders);
Populate(manager.Trades);
}
...
}
Run Code Online (Sandbox Code Playgroud)
我应该删除事件模式并实现这样吗?还有其他更好的办法?
public class OrderListener
{
public Action<Order> OrderAdded { get; set; }
public Action<Order> OrderUpdated { get; set; }
public Action<Order> OrderDeleted { get; set; }
public Action<Trade> TradeAdded { get; set; }
}
public class OrderManager
{
...
List<Order> orders;
List<Trade> trades;
List<OrderListener> listeners;
public IDisposable Subscribe(OrderListener listener)
{
lock (orderTradeLock)
{
listeners.Add(listener);
orders.ForEach(listener.OrderAdded);
trades.ForEach(listener.TradeAdded);
// Allow caller to dispose the return object to unsubscribe.
return Disposable.Create(() => { lock (orderTradeLock) { listeners.Remove(listener); } });
}
}
void OnOrderAdded(Order order)
{
lock (orderTradeLock)
{
orders.Add(order);
listeners.ForEach(x => x.OrderAdded(order));
}
}
void OnTradeAdded(Trade trade)
{
lock (orderTradeLock)
{
trades.Add(trade);
listeners.ForEach(x => x.TradeAdded(trade));
}
}
...
}
public class OrderForm
{
IDisposable subscriptionToken;
public OrderForm(OrderManager manager)
{
subscriptionToken = manager.Subscribe(new OrderListener
{
OrderAdded = manager_OrderAdded;
OrderUpdated = manager_OrderUpdated;
OrderDeleted = manager_OrderDeleted;
TradeAdded = manager_TradeAdded;
}
}
...
}
Run Code Online (Sandbox Code Playgroud)
对于您提到的场景,有几个选项。我将尝试通过它们:
避免在并发场景中使用事件,如果考虑多线程情况,.NET 事件没有完美的解决方案。
特别是如果您的消费者/订阅者之间的并发性很高(添加/删除委托的线程数量较多),您可以在本文中查看更多信息: http ://www.codeproject.com/Articles/37474/Threadsafe-活动
几天前我写了一篇文章,其中包含您需要了解的有关 .NET 事件的所有信息。另外还有一些解决问题/限制的解决方案,如何变得简单以及如何编写好的代码:http://www.codeproject.com/Articles/864690/Simplifying-Events-in-NET
您只需要对每个事件或所有事件使用相同的同步机制,可以是生产者(将引发事件)和消费者(将添加/删除委托)之间的a ManualResetEventSlim、 a Semaphore、 或 a 。lock可以保证您不会丢失信息。
该解决方案中最大的问题是引发事件。如果您在同步机制内设置委托的调用,则可能会发生死锁或性能不佳,因为责任将集中在订阅者/消费者委托将执行的操作上,这通常是一个糟糕的设计。
在这种情况下,使用 Rx,您可以使用相同的 Scheduler 来订阅并引发事件,这会引入同步,在这一部分中,更改执行中的 Scheduler 以避免出现我提到的使用手动同步机制的相同问题非常重要。例如:
static EventHandler _handler;
static EventLoopScheduler _eventLoopScheduler = new EventLoopScheduler(); // synchronization for the consumers/subscribers and producers
static ManualResetEventSlim _finish = new ManualResetEventSlim(); // just to check when this test end
static int samples = 500; // number of times that will raise the event
static int count = 0; // number of times that the event was received by the subscriber
static void Main()
{
var subscription = Observable.FromEventPattern(add => _handler += add, rem => _handler -= rem) // creating event composition
.SubscribeOn(_eventLoopScheduler) // used to introduce synchronization when someone call subscribe in the composition
//.ObserveOn(Scheduler.Default).Do(arg => DoSomething()) // Used to change the thread to let the _eventScheduler free for new consumers/subscribers
.SelectMany(arg => Observable.FromAsync(a => Task.Run(() => DoSomething()))) // Used to change the thread to let the _eventScheduler free.... Used to introduce concurrency in the method execution
.Subscribe(); // subscribe to receive the event
Parallel.For(0, samples, new ParallelOptions{ MaxDegreeOfParallelism = 4 }, a =>
{
Console.WriteLine(a.ToString("D4") + " | Trying to raise the event in thread:" + Thread.CurrentThread.ManagedThreadId);
_eventLoopScheduler.Schedule(() =>
{
Console.WriteLine(a.ToString("D4") + " | Raising event in thread:" + Thread.CurrentThread.ManagedThreadId);
_handler(null, EventArgs.Empty);
});
});
_finish.Wait();
subscription.Dispose();
_eventLoopScheduler.Dispose();
Console.WriteLine("Completed");
}
static void DoSomething()
{
//var current = count++; // use this code ONLY if you are not introducing concurrency (which is without wrap the execution in another thread)
var current = Interlocked.Increment(ref count); // to synchronize the count value, in this case it is necessary if you have the execution in multiple threads (such as using Task.Run or ThreadPool)
var random = new Random(current);
Console.WriteLine(current.ToString("D4") + " | Doing Something in thread:" + Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(random.Next(0,500)); // Simulate some Process
if (current == samples)
{
_finish.Set();
}
}
Run Code Online (Sandbox Code Playgroud)
在上面的示例中,我只使用了一名订户,但如果您使用多个订户,则完全没问题。我知道有时它并不那么容易理解,因为多线程是一件复杂的事情,但请随意添加信息或提出问题。
| 归档时间: |
|
| 查看次数: |
374 次 |
| 最近记录: |