将异步 - 等待C#代码转换为与调度程序相关的F#

Vek*_*ksi 15 c# f# task-parallel-library c#-to-f# orleans

我想知道这是否是一个太广泛的问题,但最近我让自己遇到了一段我想确定如何从C#转换为适当的F#的代码.旅程从这里开始(1)(TPL-F#交互的原始问题),并继续这里(2)(我正在考虑转换为F#的一些示例代码).

示例代码在这里重现的时间太长,但有趣的功能是ActivateAsync,RefreshHubsAddHub.特别有趣的是

  1. AddHub有签名private async Task AddHub(string address).
  2. RefreshHubsAddHub在循环中调用并收集一个列表tasks,然后它在最后等待await Task.WhenAll(tasks),因此返回值与其签名匹配private async Task RefreshHubs(object _).
  3. RefreshHubs被称为ActivateAsync一样await RefreshHubs(null),然后在年底有一个叫await base.ActivateAsync()匹配函数签名public override async Task ActivateAsync().

题:

将这些函数签名正确转换为F#仍然保持接口和功能并尊重默认的自定义调度程序是什么?而且我也不太确定这个"在F#中异步/等待".至于如何"机械地"做到这一点.:)

原因是在链接"here(1)"中似乎存在问题(我没有验证这一点),因为F#异步操作不遵循(Orleans)运行时设置的自定义协作调度程序.此外,这里声明TPL操作逃脱调度程序并进入任务池,因此禁止使用它们.

我能想到解决这个问题的一种方法是使用F#函数,如下所示

//Sorry for the inconvenience of shorterned code, for context see the link "here (1)"...
override this.ActivateAsync() =
    this.RegisterTimer(new Func<obj, Task>(this.FlushQueue), null, TimeSpan.FromMilliseconds(100.0), TimeSpan.FromMilliseconds(100.0)) |> ignore

    if RoleEnvironment.IsAvailable then
        this.RefreshHubs(null) |> Async.awaitPlainTask |> Async.RunSynchronously
    else
        this.AddHub("http://localhost:48777/") |> Async.awaitPlainTask |> Async.RunSynchronously

    //Return value comes from here.
    base.ActivateAsync()

member private this.RefreshHubs(_) =
    //Code omitted, in case mor context is needed, take a look at the link "here (2)", sorry for the inconvinience...
    //The return value is Task.
    //In the C# version the AddHub provided tasks are collected and then the
    //on the last line there is return await Task.WhenAll(newHubAdditionTasks) 
    newHubs |> Array.map(fun i -> this.AddHub(i)) |> Task.WhenAll

member private this.AddHub(address) =
    //Code omitted, in case mor context is needed, take a look at the link "here (2)", sorry for the inconvinience...
    //In the C# version:
    //...
    //hubs.Add(address, new Tuple<HubConnection, IHubProxy>(hubConnection, hub))
    //} 
    //so this is "void" and could perhaps be Async<void> in F#... 
    //The return value is Task.
    hubConnection.Start() |> Async.awaitTaskVoid |> Async.RunSynchronously
    TaskDone.Done
Run Code Online (Sandbox Code Playgroud)

这个startAsPlainTask功能是由Sacha Barber这里开始的.另一个有趣的选择可能是在这里

module Async =
    let AwaitTaskVoid : (Task -> Async<unit>) =
        Async.AwaitIAsyncResult >> Async.Ignore
Run Code Online (Sandbox Code Playgroud)

<编辑:我刚刚注意到Task.WhenAll还需要等待.但是什么是正确的方法?呃,是时候睡觉了(一个不好的双关语)......

<编辑2:这里(1)(TPL-F#交互的原始问题)在Codeplex中提到F#使用同步上下文将工作推送到线程,而TPL没有.现在,这是一个看似合理的解释,我觉得(尽管无论自定义调度程序如何,我仍然无法正确翻译这些代码段).可以从中获得一些有趣的附加信息

我认为我需要在这种背景下提及Hopac,作为一个有趣的切线,并且还提到我在接下来的50个小时左右是不可及的,以防我的所有交叉发布都失控.

<编辑3:Danielsvick在评论中提供了很好的建议,以便使用自定义任务构建器.Daniel提供了一个已经在FSharpx中定义的链接.

查看源代码,我看到带参数的接口定义为

type TaskBuilder(?continuationOptions, ?scheduler, ?cancellationToken) =
    let contOptions = defaultArg continuationOptions TaskContinuationOptions.None
    let scheduler = defaultArg scheduler TaskScheduler.Default
    let cancellationToken = defaultArg cancellationToken CancellationToken.None
Run Code Online (Sandbox Code Playgroud)

如果要在奥尔良使用它,看起来TaskScheduler应该TaskScheduler.Current按照这里的文档

Orleans拥有自己的任务调度程序,它提供了在grain中使用的单线程执行模型.在运行任务时使用Orleans调度程序非常重要,而不是.NET线程池.

如果您的谷物代码需要创建子任务,您应该使用Task.Factory.StartNew:

等待Task.Factory.StartNew(()=> {/*logic*/});

此技术将使用当前任务调度程序,它将是Orleans调度程序.

您应该避免使用始终使用.NET线程池的Task.Run,​​因此不会在单线程执行模型中运行.

它看起来在TaskScheduler.CurrentTaskScheduler.Default之间存在细微差别.也许这可以保证一个问题,在哪种情况下会出现意想不到的差异.由于新奥尔良的文件指出,不使用Task.Run,而是引导到Task.Factory.StartNew,我不知道如果一个人应该定义TaskCreationOptions.DenyAttachChild如通过这类机构作为建议斯蒂芬ToubTask.Run VS Task.Factory.StartNew斯蒂芬·克利里StartNew是危险的.嗯,除非我弄错了,否则看起来.Default会是这样.DenyAttachChilld.

此外,因为有一个问题Task.RunTask.Factory.CreateNew关于定制调度,我不知道这个特殊的问题可以通过使用自定义被删除TaskFactory在解释任务计划程序(Task.Factory)和控制线程的数目如何创建一个限制并发的任务计划程序.

嗯,这已经成为一个漫长的"思考".我想知道我该如何关闭它?也许如果斯维克丹尼尔可以将他们的评论作为答案,我会同时投票并接受svick的

Dan*_*iel 1

您可以TaskBuilder在 FSharpx 中使用 use 并传入TaskScheduler.Current. 这是我的翻译尝试RefreshHubs。请注意,Task<unit>使用 代替Task

let RefreshHubs _ =
    let task = TaskBuilder(scheduler = TaskScheduler.Current)
    task {
        let addresses = 
            RoleEnvironment.Roles.["GPSTracker.Web"].Instances
            |> Seq.map (fun instance ->
                let endpoint = instance.InstanceEndpoints.["InternalSignalR"]
                sprintf "http://%O" endpoint.IPEndpoint
            )
            |> Seq.toList

        let newHubs = addresses |> List.filter (not << hubs.ContainsKey)
        let deadHubs = hubs.Keys |> Seq.filter (fun x -> 
            not (List.exists ((=) x) addresses))

        // remove dead hubs
        deadHubs |> Seq.iter (hubs.Remove >> ignore)

        // add new hubs
        let! _ = Task.WhenAll [| for hub in newHubs -> AddHub hub |]
        return ()
    }
Run Code Online (Sandbox Code Playgroud)