强制任务继续当前线程?

Rog*_*son 10 c# system.reactive akka async-await tpl-dataflow

我正在为.NET建立一个AKKA框架的端口(现在不要太认真了,现在这是一个周末黑客的演员部分)

我对其中的"未来"支持存在一些问题.在Java/Scala Akka中,期货将与Await电话同步等待.很像.NET Task.Wait()

我的目标是支持真正的异步等待.它现在可以工作,但是在我当前的解决方案中,继续在错误的线程上执行.

这是将消息传递给我的一个包含未来await块的actor的结果.如您所见,actor总是在同一个线程上执行,而await块在随机线程池线程上执行.

actor thread: 6
await thread 10
actor thread: 6
await thread 12
actor thread: 6
actor thread: 6
await thread 13
...
Run Code Online (Sandbox Code Playgroud)

actor使用DataFlow获取消息BufferBlock<Message> 或者更确切地说,我在缓冲区块上使用RX来订阅消息.它配置如下:

var messages = new BufferBlock<Message>()
{
        BoundedCapacity = 100,
        TaskScheduler = TaskScheduler.Default,
};
messages.AsObservable().Subscribe(this);
Run Code Online (Sandbox Code Playgroud)

到现在为止还挺好.

但是,当我等待未来的结果时.像这样:

protected override void OnReceive(IMessage message)
{
    ....

    var result = await Ask(logger, m);
    // This is not executed on the same thread as the above code
    result.Match()  
       .With<SomeMessage>(t => {
       Console.WriteLine("await thread {0}",
          System.Threading.Thread.CurrentThread.GetHashCode());
        })
       .Default(_ => Console.WriteLine("Unknown message"));
     ...
Run Code Online (Sandbox Code Playgroud)

我知道这是异步等待的正常行为,但我必须确保只有一个线程可以访问我的actor.

我不希望未来同步运行,我想像正常一样运行异步,但我希望继续运行在与消息处理器/ actor相同的线程上.

我的未来支持代码如下所示:

public Task<IMessage> Ask(ActorRef actor, IMessage message)
{
    TaskCompletionSource<IMessage> result = 
        new TaskCompletionSource<IMessage>();
    var future = Context.ActorOf<FutureActor>(name : Guid.NewGuid().ToString());

    // once this object gets a response, 
    // we set the result for the task completion source
    var futureActorRef = new FutureActorRef(result);            
    future.Tell(new SetRespondTo(), futureActorRef); 
    actor.Tell(message, future); 
    return result.Task;
}
Run Code Online (Sandbox Code Playgroud)

任何想法我可以做什么来强制继续在启动上述代码的同一个线程上运行?

Ste*_*ary 6

我正在为.NET建立AKKA框架的端口

甜.尽管从未接触过Java/Scala/Akka,我去了CodeMash '13的Akka演讲.我看到了.NET库/框架的很多潜力.微软正在研究类似的东西,我希望它最终会被普遍提供(目前它的预览有限).

我怀疑尽可能多地停留在Dataflow/Rx世界是更容易的方法; async当你有异步操作(每个操作只有一个启动和单个结果)时,这是最好的,而Dataflow和Rx在流和订阅(使用单个启动和多个结果)时效果更好.所以我的第一个直觉反应是将缓冲区块链接到ActionBlock具有特定调度程序的缓冲区块,或者用于ObserveOn将Rx通知移动到特定的调度程序,而不是尝试在async侧面执行此操作.当然我对Akka API设计并不是很熟悉,所以请一点儿吃.

无论如何,我的async介绍描述了调度await延续的唯一两个可靠选项:SynchronizationContext.CurrentTaskScheduler.Current.如果您的Akka端口更像是一个框架(代码执行托管,最终用户代码总是由您的代码执行),那么a SynchronizationContext可能是有意义的.如果您的端口更像是一个(最终用户代码执行托管并根据需要调用您的代码),那么a TaskScheduler会更有意义.

自定义的例子并不多SynchronizationContext,因为这很少见.我的AsyncEx库中有一个AsyncContextThread类型,它定义了该线程的a 和a .有几个自定义的例子,例如Parallel Extensions Extras,它有一个STA调度程序和一个"当前线程"调度程序.SynchronizationContextTaskSchedulerTaskScheduler