小编jam*_*nor的帖子

TPL FromAsync与TaskScheduler和TaskFactory

我正在尝试与使用组合创建一个任务管道/有序调度程序TaskFactory.FromAsync.

我希望能够发起Web服务请求(FromAsync使用I/O完成端口),但维护它们的顺序,并且只能在任何时候执行一个.

目前我不使用,FromAsync所以我可以做TaskFactory.StartNew(()=>api.DoSyncWebServiceCall())并依赖于OrderedTaskScheduler使用,TaskFactory以确保只有一个请求是未完成的.

我假设在使用该FromAsync方法时这种行为会保留,但它不会:

TaskFactory<Stuff> taskFactory = new TaskFactory<Stuff>(new OrderedTaskScheduler());
var t1 = taskFactory.FromAsync((a, s) => api.beginGetStuff(a, s), a => api.endGetStuff(a));
var t2 = taskFactory.FromAsync((a, s) => api.beginGetStuff(a, s), a => api.endGetStuff(a));
var t3 = taskFactory.FromAsync((a, s) => api.beginGetStuff(a, s), a => api.endGetStuff(a));
Run Code Online (Sandbox Code Playgroud)

所有这些beginGetStuff方法都在调用中被FromAsync调用(因此尽管它们按顺序被调度但是n同时发生了api调用).

FromAsync需要一个TaskScheduler 的重载:

public Task FromAsync(
    IAsyncResult asyncResult,
    Action<IAsyncResult> endMethod,
    TaskCreationOptions creationOptions,
    TaskScheduler scheduler
)
Run Code Online (Sandbox Code Playgroud)

但文档说: …

.net c# parallel-processing task-parallel-library

5
推荐指数
1
解决办法
1012
查看次数

只有当值已经改变了一定的余量时,Rx IObservable才会产生值

我有一种感觉,这可能是一个非常简单的扩展方法,我已经错过但我看不到它...

我基本上想要一个产生流的流,其中值在每个新值上缓慢递增.我希望节流/采样,而不是时间,而是"容忍".例如

var ob = Enumerable.Range(0, 30).ToObservable(); // 0, 1, 2, 3, 4, 5,....., 30
var largeMovingOb = ob.WhenChangedBy(10); // 0, 10, 20, 30
Run Code Online (Sandbox Code Playgroud)

当我有[1,4,20,33]等序列时,我希望在值变化超过最后一个的15时输出 - 这将导致:[1,20].如果价值变化为12将导致:[1,20,33]

这是否有内置的Rx扩展?理想情况下,它可以在所有数字类型上工作,而无需为每个类型编写重载

.net c# system.reactive c#-4.0

3
推荐指数
1
解决办法
1724
查看次数

有限并发TaskScheduler,可以交错要明确排序的任务

我正在寻找一个TaskScheduler:

  1. 允许我定义一些专用线程(例如8) - 标准LimitedConcurrencyLevelTaskScheduler(使用线程池线程)或WorkStealingTaskScheduler执行此操作.
  2. 允许我创建完全排序的子TaskScheduler,但是在父调度程序的专用线程上调度任务.

目前我们TaskScheduler.Default用于通用池(受线程池增长算法等的支配)以及new OrderedTaskScheduler()每当我们想要订购任务时.我想保持这种行为,但将这两个要求限制在我自己的专用线程池中.

QueuedTaskScheduler似乎非常接近.我认为QueuedTaskScheduler.ActivateNewQueue()返回子TaskScheduler的方法会在父级的工作池上执行IN ORDER任务,但似乎并非如此.子TaskSchedulers似乎具有与父级相同的并行化级别.

我不一定希望子任务调度程序任务优先于父任务调度程序任务(尽管它将来可能是一个很好的功能).

我在这里看到了一个相关的问题:有限的并发级别任务调度程序(具有任务优先级)处理包装的任务,但我的要求不需要处理异步任务(我的所有排队任务从头到尾完全同步,没有延续).

c# multithreading task parallel-extensions task-parallel-library

3
推荐指数
1
解决办法
1637
查看次数

Rx IObservable - 返回第一个IObservable流以获取值

我正在创建两个(或更多)IObservable<T>所有相同的T.它们是从Task<IEnumerable<T>>哪个可以比其他人更快地回来生成的.我所关心的只是IObservable返回第一个值 - 这是我从那时开始使用的值.

我记得在剑桥参加过Jon Skeet的演讲,他用非常简洁的方式使用TPL完成了这个,但我记不起来了!理想情况下,我会得到一个像这样的方法:

IObservable<T> PickFastestObservable<T>(IEnumerable<IObservable<T>> slowObservables);
Run Code Online (Sandbox Code Playgroud)

但如果我必须直接在任务上完成,我可能会解决一些问题.

我很努力想让自己有点自信,我很自信.

干杯,

.net c# system.reactive

0
推荐指数
1
解决办法
142
查看次数