Cha*_*ion 13 c# f# discriminated-union
我创建了一个名为Actor的新类,它处理传递给它的消息.我遇到的问题是弄清楚将相关但不同的消息传递给Actor的最优雅方式是什么.我的第一个想法是使用继承,但它似乎是如此膨胀,但它是强烈的类型,这是一个明确的要求.
有什么想法吗?
private abstract class QueueMessage { }
private class ClearMessage : QueueMessage
{
public static readonly ClearMessage Instance = new ClearMessage();
private ClearMessage() { }
}
private class TryDequeueMessage : QueueMessage
{
public static readonly TryDequeueMessage Instance = new TryDequeueMessage();
private TryDequeueMessage() { }
}
private class EnqueueMessage : QueueMessage
{
public TValue Item { get; private set; }
private EnqueueMessage(TValue item)
{
Item = item;
}
}
Run Code Online (Sandbox Code Playgroud)
/// <summary>Represents a callback method to be executed by an Actor.</summary>
/// <typeparam name="TReply">The type of reply.</typeparam>
/// <param name="reply">The reply made by the actor.</param>
public delegate void ActorReplyCallback<TReply>(TReply reply);
/// <summary>Represents an Actor which receives and processes messages in concurrent applications.</summary>
/// <typeparam name="TMessage">The type of message this actor accepts.</typeparam>
/// <typeparam name="TReply">The type of reply made by this actor.</typeparam>
public abstract class Actor<TMessage, TReply> : IDisposable
{
/// <summary>The default total number of threads to process messages.</summary>
private const Int32 DefaultThreadCount = 1;
/// <summary>Used to serialize access to the message queue.</summary>
private readonly Locker Locker;
/// <summary>Stores the messages until they can be processed.</summary>
private readonly System.Collections.Generic.Queue<Message> MessageQueue;
/// <summary>Signals the actor thread to process a new message.</summary>
private readonly ManualResetEvent PostEvent;
/// <summary>This tells the actor thread to stop reading from the queue.</summary>
private readonly ManualResetEvent DisposeEvent;
/// <summary>Processes the messages posted to the actor.</summary>
private readonly List<Thread> ActorThreads;
/// <summary>Initializes a new instance of the Genex.Concurrency<TRequest, TResponse> class.</summary>
public Actor() : this(DefaultThreadCount) { }
/// <summary>Initializes a new instance of the Genex.Concurrency<TRequest, TResponse> class.</summary>
/// <param name="thread_count"></param>
public Actor(Int32 thread_count)
{
if (thread_count < 1) throw new ArgumentOutOfRangeException("thread_count", thread_count, "Must be 1 or greater.");
Locker = new Locker();
MessageQueue = new System.Collections.Generic.Queue<Message>();
EnqueueEvent = new ManualResetEvent(true);
PostEvent = new ManualResetEvent(false);
DisposeEvent = new ManualResetEvent(true);
ActorThreads = new List<Thread>();
for (Int32 i = 0; i < thread_count; i++)
{
var thread = new Thread(ProcessMessages);
thread.IsBackground = true;
thread.Start();
ActorThreads.Add(thread);
}
}
/// <summary>Posts a message and waits for the reply.</summary>
/// <param name="value">The message to post to the actor.</param>
/// <returns>The reply from the actor.</returns>
public TReply PostWithReply(TMessage message)
{
using (var wrapper = new Message(message))
{
lock (Locker) MessageQueue.Enqueue(wrapper);
PostEvent.Set();
wrapper.Channel.CompleteEvent.WaitOne();
return wrapper.Channel.Value;
}
}
/// <summary>Posts a message to the actor and executes the callback when the reply is received.</summary>
/// <param name="value">The message to post to the actor.</param>
/// <param name="callback">The callback that will be invoked once the replay is received.</param>
public void PostWithAsyncReply(TMessage value, ActorReplyCallback<TReply> callback)
{
if (callback == null) throw new ArgumentNullException("callback");
ThreadPool.QueueUserWorkItem(state => callback(PostWithReply(value)));
}
/// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
public void Dispose()
{
if (DisposeEvent.WaitOne(10))
{
DisposeEvent.Reset();
PostEvent.Set();
foreach (var thread in ActorThreads)
{
thread.Join();
}
((IDisposable)PostEvent).Dispose();
((IDisposable)DisposeEvent).Dispose();
}
}
/// <summary>Processes a message posted to the actor.</summary>
/// <param name="message">The message to be processed.</param>
protected abstract void ProcessMessage(Message message);
/// <summary>Dequeues the messages passes them to ProcessMessage.</summary>
private void ProcessMessages()
{
while (PostEvent.WaitOne() && DisposeEvent.WaitOne(10))
{
var message = (Message)null;
while (true)
{
lock (Locker)
{
message = MessageQueue.Count > 0 ?
MessageQueue.Dequeue() :
null;
if (message == null)
{
PostEvent.Reset();
break;
}
}
try
{
ProcessMessage(message);
}
catch
{
}
}
}
}
/// <summary>Represents a message that is passed to an actor.</summary>
protected class Message : IDisposable
{
/// <summary>The actual value of this message.</summary>
public TMessage Value { get; private set; }
/// <summary>The channel used to give a reply to this message.</summary>
public Channel Channel { get; private set; }
/// <summary>Initializes a new instance of Genex.Concurrency.Message class.</summary>
/// <param name="value">The actual value of the message.</param>
public Message(TMessage value)
{
Value = value;
Channel = new Channel();
}
/// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
public void Dispose()
{
Channel.Dispose();
}
}
/// <summary>Represents a channel used by an actor to reply to a message.</summary>
protected class Channel : IDisposable
{
/// <summary>The value of the reply.</summary>
public TReply Value { get; private set; }
/// <summary>Signifies that the message has been replied to.</summary>
public ManualResetEvent CompleteEvent { get; private set; }
/// <summary>Initializes a new instance of Genex.Concurrency.Channel class.</summary>
public Channel()
{
CompleteEvent = new ManualResetEvent(false);
}
/// <summary>Reply to the message received.</summary>
/// <param name="value">The value of the reply.</param>
public void Reply(TReply value)
{
Value = value;
CompleteEvent.Set();
}
/// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
public void Dispose()
{
((IDisposable)CompleteEvent).Dispose();
}
}
}
Run Code Online (Sandbox Code Playgroud)
kvb*_*kvb 11
Steve Gilham总结了编译器如何实际处理受歧视的联合.对于您自己的代码,您可以考虑它的简化版本.鉴于以下F#:
type QueueMessage<T> = ClearMessage | TryDequeueMessage | EnqueueMessage of T
Run Code Online (Sandbox Code Playgroud)
这是在C#中模拟它的一种方法:
public enum MessageType { ClearMessage, TryDequeueMessage, EnqueueMessage }
public abstract class QueueMessage<T>
{
// prevents unwanted subclassing
private QueueMessage() { }
public abstract MessageType MessageType { get; }
/// <summary>
/// Only applies to EnqueueMessages
/// </summary>
public abstract T Item { get; }
public static QueueMessage<T> MakeClearMessage() { return new ClearMessage(); }
public static QueueMessage<T> MakeTryDequeueMessage() { return new TryDequeueMessage(); }
public static QueueMessage<T> MakeEnqueueMessage(T item) { return new EnqueueMessage(item); }
private sealed class ClearMessage : QueueMessage<T>
{
public ClearMessage() { }
public override MessageType MessageType
{
get { return MessageType.ClearMessage; }
}
/// <summary>
/// Not implemented by this subclass
/// </summary>
public override T Item
{
get { throw new NotImplementedException(); }
}
}
private sealed class TryDequeueMessage : QueueMessage<T>
{
public TryDequeueMessage() { }
public override MessageType MessageType
{
get { return MessageType.TryDequeueMessage; }
}
/// <summary>
/// Not implemented by this subclass
/// </summary>
public override T Item
{
get { throw new NotImplementedException(); }
}
}
private sealed class EnqueueMessage : QueueMessage<T>
{
private T item;
public EnqueueMessage(T item) { this.item = item; }
public override MessageType MessageType
{
get { return MessageType.EnqueueMessage; }
}
/// <summary>
/// Gets the item to be enqueued
/// </summary>
public override T Item { get { return item; } }
}
}
Run Code Online (Sandbox Code Playgroud)
现在,在给定a的代码中QueueMessage,您可以打开MessageType属性来代替模式匹配,并确保Item仅在EnqueueMessages 上访问该属性.
编辑
这是另一种基于朱丽叶代码的替代方案.我试图简化一些事情,以便它从C#获得更有用的界面.这比以前的版本更好,因为您无法获得MethodNotImplemented异常.
public abstract class QueueMessage<T>
{
// prevents unwanted subclassing
private QueueMessage() { }
public abstract TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase);
public static QueueMessage<T> MakeClearMessage() { return new ClearMessage(); }
public static QueueMessage<T> MakeTryDequeueMessage() { return new TryDequeueMessage(); }
public static QueueMessage<T> MakeEnqueueMessage(T item) { return new EnqueueMessage(item); }
private sealed class ClearMessage : QueueMessage<T>
{
public ClearMessage() { }
public override TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase)
{
return clearCase();
}
}
private sealed class TryDequeueMessage : QueueMessage<T>
{
public TryDequeueMessage() { }
public override TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase)
{
return tryDequeueCase();
}
}
private sealed class EnqueueMessage : QueueMessage<T>
{
private T item;
public EnqueueMessage(T item) { this.item = item; }
public override TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase)
{
return enqueueCase(item);
}
}
}
Run Code Online (Sandbox Code Playgroud)
您可以像这样使用此代码:
public class MessageUserTest
{
public void Use()
{
// your code to get a message here...
QueueMessage<string> msg = null;
// emulate pattern matching, but without constructor names
int i =
msg.Match(
clearCase: () => -1,
tryDequeueCase: () => -2,
enqueueCase: s => s.Length);
}
}
Run Code Online (Sandbox Code Playgroud)
联盟类型和模式匹配直接映射到访问者模式,我之前已经发布了几次:
因此,如果您想传递包含许多不同类型的邮件,那么您将无法实现访问者模式.
(警告,未经测试的代码,但应该让你知道它是如何完成的)
假设我们有这样的事情:
type msg =
| Add of int
| Sub of int
| Query of ReplyChannel<int>
let rec counts = function
| [] -> (0, 0, 0)
| Add(_)::xs -> let (a, b, c) = counts xs in (a + 1, b, c)
| Sub(_)::xs -> let (a, b, c) = counts xs in (a, b + 1, c)
| Query(_)::xs -> let (a, b, c) = counts xs in (a, b, c + 1)
Run Code Online (Sandbox Code Playgroud)
你最终得到了这个庞大的C#代码:
interface IMsgVisitor<T>
{
T Visit(Add msg);
T Visit(Sub msg);
T Visit(Query msg);
}
abstract class Msg
{
public abstract T Accept<T>(IMsgVistor<T> visitor)
}
class Add : Msg
{
public readonly int Value;
public Add(int value) { this.Value = value; }
public override T Accept<T>(IMsgVisitor<T> visitor) { return visitor.Visit(this); }
}
class Sub : Msg
{
public readonly int Value;
public Add(int value) { this.Value = value; }
public override T Accept<T>(IMsgVisitor<T> visitor) { return visitor.Visit(this); }
}
class Query : Msg
{
public readonly ReplyChannel<int> Value;
public Add(ReplyChannel<int> value) { this.Value = value; }
public override T Accept<T>(IMsgVisitor<T> visitor) { return visitor.Visit(this); }
}
Run Code Online (Sandbox Code Playgroud)
现在,无论何时您想对消息执行某些操作,都需要实现访问者:
class MsgTypeCounter : IMsgVisitor<MsgTypeCounter>
{
public readonly Tuple<int, int, int> State;
public MsgTypeCounter(Tuple<int, int, int> state) { this.State = state; }
public MsgTypeCounter Visit(Add msg)
{
Console.WriteLine("got Add of " + msg.Value);
return new MsgTypeCounter(Tuple.Create(1 + State.Item1, State.Item2, State.Item3));
}
public MsgTypeCounter Visit(Sub msg)
{
Console.WriteLine("got Sub of " + msg.Value);
return new MsgTypeCounter(Tuple.Create(State.Item1, 1 + State.Item2, State.Item3));
}
public MsgTypeCounter Visit(Query msg)
{
Console.WriteLine("got Query of " + msg.Value);
return new MsgTypeCounter(Tuple.Create(State.Item1, 1 + State.Item2, State.Item3));
}
}
Run Code Online (Sandbox Code Playgroud)
最后你可以像这样使用它:
var msgs = new Msg[] { new Add(1), new Add(3), new Sub(4), new ReplyChannel(null) };
var counts = msgs.Aggregate(new MsgTypeVisitor(Tuple.Create(0, 0, 0)),
(acc, x) => x.Accept(acc)).State;
Run Code Online (Sandbox Code Playgroud)
是的,它看起来很钝,但这就是你以类型安全的方式传递多个消息的方式,这也是我们不用C#实现联合的原因;)
在你的示例代码,您实现PostWithAsyncReply在以下方面PostWithReply.这并不理想,因为这意味着当你调用PostWithAsyncReply并且actor需要一段时间来处理它时,实际上有两个线程被绑定:一个执行actor和一个等待它完成的线程.让一个线程执行actor然后在异步情况下调用回调会更好.(显然在同步的情况下,没有避免捆绑两个线程).
更新:
更多关于上述内容:您构造一个带有参数的actor,告诉它要运行多少个线程.为简单起见,假设每个actor都运行一个线程(实际上是非常好的情况,因为actor可以具有内部状态而不会锁定它,因为只有一个线程直接访问它).
演员A调用演员B,等待回应.为了处理请求,actor B需要调用actor C.所以现在A和B的唯一线程正在等待,而C是唯一一个实际给CPU做任何工作的线程.这么多线程!但如果你一直等待答案,这就是你得到的.
好的,你可以增加你在每个actor中开始的线程数.但是你要开始他们所以他们可以无所事事地坐着.堆栈占用大量内存,上下文切换可能很昂贵.
因此,最好使用回调机制异步发送消息,以便您可以获取完成的结果.你的实现的问题是你从线程池中获取另一个线程,纯粹是坐下来等待.所以你基本上应用了增加线程数的解决方法.您将线程分配给永不运行的任务.
这将是更好地执行PostWithReply来讲PostWithAsyncReply,即以相反的方式轮.异步版本是低级别的.基于我的基于委托的示例(因为它涉及较少的代码输入!):
private bool InsertCoinImpl(int value)
{
// only accept dimes/10p/whatever it is in euros
return (value == 10);
}
public void InsertCoin(int value, Action<bool> accepted)
{
Submit(() => accepted(InsertCoinImpl(value)));
}
Run Code Online (Sandbox Code Playgroud)
所以私有实现返回一个bool.公共异步方法接受将接收返回值的操作; 私有实现和回调操作都在同一个线程上执行.
希望同步等待的需要将成为少数情况.但是当你需要它时,它可以由一个辅助方法提供,完全是通用的,不依赖于任何特定的actor或消息类型:
public static T Wait<T>(Action<Action<T>> activity)
{
T result = default(T);
var finished = new EventWaitHandle(false, EventResetMode.AutoReset);
activity(r =>
{
result = r;
finished.Set();
});
finished.WaitOne();
return result;
}
Run Code Online (Sandbox Code Playgroud)
所以现在在其他演员身上我们可以说:
bool accepted = Helpers.Wait<bool>(r => chocMachine.InsertCoin(5, r));
Run Code Online (Sandbox Code Playgroud)
类型参数Wait可能是不必要的,还没有尝试编译任何这些.但Wait基本上是你的回调,所以你可以把它传递给一些异步方法,而在外面你只需要把回传给回调的东西作为你的返回值.请注意,传递给的lambda Wait实际上仍然在调用的同一个线程上执行Wait.
我们现在回到我们的常规课程......
至于您询问的实际问题,您向演员发送消息以使其做某事.代表们在这里很有帮助.它们让你有效地让编译器生成一个包含一些数据的类,一个你甚至不需要显式调用的构造函数以及一个方法.如果你不得不写一堆小班,请切换到代表.
abstract class Actor
{
Queue<Action> _messages = new Queue<Action>();
protected void Submit(Action action)
{
// take out a lock of course
_messages.Enqueue(action);
}
// also a "run" that reads and executes the
// message delegates on background threads
}
Run Code Online (Sandbox Code Playgroud)
现在,特定的派生actor遵循以下模式:
class ChocolateMachineActor : Actor
{
private void InsertCoinImpl(int value)
{
// whatever...
}
public void InsertCoin(int value)
{
Submit(() => InsertCoinImpl(value));
}
}
Run Code Online (Sandbox Code Playgroud)
因此,要向actor发送消息,只需调用public方法即可.私有Impl方法可以完成真正的工作.无需手动编写一堆消息类.
显然我已经省略了有关回复的内容,但这可以通过更多参数来完成.(见上面的更新).
| 归档时间: |
|
| 查看次数: |
3409 次 |
| 最近记录: |