我已经编写了一个复制该问题的示例测试。这不是我的实际代码,我尝试编写一个小代码。如果将边界容量增加到迭代次数,从而有效地使其没有边界,则不会出现死锁,如果将最大并行度设置为较小的数字(例如 1),则不会出现死锁。
再说一次,我知道下面的代码不是很好,但我实际发现的代码要大得多并且难以理解。基本上有一个与远程资源的连接的阻塞对象池,并且流中的几个块使用了该连接。
关于如何解决这个问题有什么想法吗?乍一看,这似乎是数据流的问题。当我中断查看线程时,我看到许多线程在 Add 上被阻塞,0 个线程在 take 上被阻塞。addBlocks 出站队列中有几个项目尚未传播到 takeblock,因此它被卡住或死锁。
var blockingCollection = new BlockingCollection<int>(10000);
var takeBlock = new ActionBlock<int>((i) =>
{
int j = blockingCollection.Take();
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 20,
SingleProducerConstrained = true
});
var addBlock = new TransformBlock<int, int>((i) =>
{
blockingCollection.Add(i);
return i;
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 20
});
addBlock.LinkTo(takeBlock, new DataflowLinkOptions()
{
PropagateCompletion = true
});
for (int i = 0; i < 100000; i++)
{
addBlock.Post(i);
}
addBlock.Complete();
await addBlock.Completion; …Run Code Online (Sandbox Code Playgroud) c# multithreading task-parallel-library blockingcollection tpl-dataflow
我有(我的网址列表大约有 1000 个网址),我想知道是否有更有效的方法从同一站点调用多个网址(已经更改了ServicePointManager.DefaultConnectionLimit)。
另外,是在每次调用时重用相同的HttpClient还是创建新的更好,下面仅使用一个而不是多个。
using (var client = new HttpClient { Timeout = new TimeSpan(0, 5, 0) })
{
var tasks = urls.Select(async url =>
{
await client.GetStringAsync(url).ContinueWith(response =>
{
var resultHtml = response.Result;
//process the html
});
}).ToList();
Task.WaitAll(tasks.ToArray());
}
Run Code Online (Sandbox Code Playgroud)
正如@cory建议的,
这里是使用的修改后的代码TPL,但是我必须设置MaxDegreeOfParallelism = 100以达到与基于任务的速度大致相同的速度,下面的代码可以改进吗?
var downloader = new ActionBlock<string>(async url =>
{
var client = new WebClient();
var resultHtml = await client.DownloadStringTaskAsync(new Uri(url));
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 100 }); …Run Code Online (Sandbox Code Playgroud) 考虑这个例子:
class Program
{
private static readonly ITargetBlock<string> Mesh = CreateMesh();
private static readonly AsyncLocal<string> AsyncLocalContext
= new AsyncLocal<string>();
static async Task Main(string[] args)
{
var tasks = Enumerable.Range(1, 4)
.Select(ProcessMessage);
await Task.WhenAll(tasks);
Mesh.Complete();
await Mesh.Completion;
Console.WriteLine();
Console.WriteLine("Done");
}
private static async Task ProcessMessage(int number)
{
var param = number.ToString();
using (SetScopedAsyncLocal(param))
{
Console.WriteLine($"Before send {param}");
await Mesh.SendAsync(param);
Console.WriteLine($"After send {param}");
}
}
private static IDisposable SetScopedAsyncLocal(string value)
{
AsyncLocalContext.Value = value;
return new Disposer(() => AsyncLocalContext.Value = null);
} …Run Code Online (Sandbox Code Playgroud) 我正在使用TPL块来执行可能被用户取消的操作:我提出了两个选项,首先我取消整个块但不取消块内的操作,如下所示:
_downloadCts = new CancellationTokenSource();
var processBlockV1 = new TransformBlock<int, List<int>>(construct =>
{
List<int> properties = GetPropertiesMethod(construct );
var entities = properties
.AsParallel()
.Select(DoSometheningWithData)
.ToList();
return entities;
}, new ExecutionDataflowBlockOptions() { CancellationToken = _downloadCts.Token });
Run Code Online (Sandbox Code Playgroud)
第二个我取消内部操作,但不是块本身:
var processBlockV2 = new TransformBlock<int, List<int>>(construct =>
{
List<int> properties = GetPropertiesMethod(construct);
var entities = properties
.AsParallel().WithCancellation(_downloadCts.Token)
.Select(DoSometheningWithData)
.ToList();
return entities;
});
Run Code Online (Sandbox Code Playgroud)
据我了解,第一个选项将取消整个块,从而关闭整个管道。我的问题是它是否也会取消内部操作并处理所有资源(如果有)(打开 StreamReaders 等),或者最好选择第二个选项,然后我自己可以确保所有内容都被取消和清理,然后我可以使用一些方法(铁路编程)漂浮OperationCanceledException在管道上并在我想要的地方处理它?
c# task-parallel-library cancellationtokensource tpl-dataflow
我正在通过移植一些旧的套接字代码以使用TPL数据流和新的异步功能来尝试TPL数据流。尽管API感觉很坚如磐石,但我的代码仍然最终变得混乱。我想知道我是否在这里错过了什么。
我的要求如下:一个套接字类公开:Open,Close,Send和Receive方法。全部都返回一个Task,因此是异步的。打开和关闭是原子的。尽管发送和接收一次只能处理1条命令,但它们可以彼此相邻工作。
从逻辑上讲,这使我进入下一个内部控制代码:
// exposing an exclusive scheduler for connectivity related tasks and a parallel scheduler where send and receive can work with
private readonly ConcurrentExclusiveSchedulerPair exclusiveConnectionSchedulerPair;
private readonly ActionBlock<Action> connectionBlock;
private readonly ActionBlock<Action> sendBlock;
private readonly ActionBlock<Action> receiveBlock;
// within the constructor:
this.exclusiveConnectionSchedulerPair = new ConcurrentExclusiveSchedulerPair();
this.connectionBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions() { TaskScheduler = exclusiveConnectionSchedulerPair.ExclusiveScheduler });
this.sendBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions() { TaskScheduler = exclusiveConnectionSchedulerPair.ConcurrentScheduler });
this.receiveBlock = new ActionBlock<Action>(action => action(), new …Run Code Online (Sandbox Code Playgroud) 使用TPL.DataFlow块,是否可以将两个或多个源链接到单个ITargetBlock(例如ActionBlock)并确定源的优先级?
例如
BufferBlock<string> b1 = new ...
BufferBlock<string> b2 = new ...
ActionBlock<string> a = new ...
//somehow force messages in b1 to be processed before any message of b2, always
b1.LinkTo (a);
b2.LinkTo (a);
Run Code Online (Sandbox Code Playgroud)
只要b1中有消息,我希望这些消息被输入"a",一旦b1为空,b2消息就被推送到"a"
想法?
我工作的公司运行着几百个非常动态的网站.它决定建立一个搜索引擎,我的任务是编写刮刀.有些网站在旧硬件上运行,无法承受太多惩罚,而其他网站则可以处理大量的并发用户.
我需要能够说对站点A使用5个并行请求,对站点B使用2个,对站点C使用1个并行请求.
我知道我可以使用线程,互斥体,信号量等来实现这一目标,但它会非常复杂.是否有任何更高级别的框架,如TPL,await/async,TPL Dataflow足够强大,能够以更简单的方式完成此应用程序?
我需要一个Dataflow块,它根据消息中的时间戳(LogEntry)将消息的转发延迟到下一个块.
这就是我提出的,但感觉不对.有任何改进建议吗?
private IPropagatorBlock<LogEntry, LogEntry> DelayedForwardBlock()
{
var buffer = new ConcurrentQueue<LogEntry>();
var source = new BufferBlock<LogEntry>();
var target = new ActionBlock<LogEntry>(item =>
{
buffer.Enqueue(item);
});
Task.Run(() =>
{
LogEntry entry;
while (true)
{
entry = null;
if (buffer.TryPeek(out entry))
{
if (entry.UtcTimestamp < (DateTime.UtcNow - TimeSpan.FromMinutes(5)))
{
buffer.TryDequeue(out entry);
source.Post(entry);
}
}
}
});
target.Completion.ContinueWith(delegate
{
LogEntry entry;
while (buffer.TryDequeue(out entry))
{
source.Post(entry);
}
source.Complete();
});
return DataflowBlock.Encapsulate(target, source);
}
Run Code Online (Sandbox Code Playgroud) Lucian在这里讨论了一种模式(技巧3:在任务返回API中包装事件并等待它们).
我试图在一个经常调用的方法上实现它,看起来像下面的设计代码:
public Task BlackBoxAsync()
{
var tcs = new TaskCompletionSource<Object>(); // new'ed up every call
ThreadPool.QueueUserWorkItem(_ =>
{
try
{
DoSomethingStuff();
tcs.SetResult(null);
}
catch(Exception exc) { tcs.SetException(exc); }
});
return tcs.Task;
}
Run Code Online (Sandbox Code Playgroud)
我很担心性能,TaskCompletionSource每次通话都是新的(让我们说我每100毫秒调用一次这个方法).
我当时正在考虑使用BufferBlock<T>,认为每次通话都不会重新开始.所以它看起来像:
private readonly BufferBlock<object> signalDone; // dummy class-level variable, new'ed up once in CTOR
public Task BlackBoxAsync()
{
ThreadPool.QueueUserWorkItem(_ =>
{
try
{
DoSomethingStuff();
signalDone.Post(null);
}
catch(Exception exc) { }
});
return signalDone.ReceiveAsync();
}
Run Code Online (Sandbox Code Playgroud)
调用对象会将其称为:
for (var i=0; i<10000; i++) …Run Code Online (Sandbox Code Playgroud) c# task-parallel-library async-await tpl-dataflow taskcompletionsource
这是一个简短的代码示例,可以快速向您介绍我的问题:
using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace DataflowTest
{
class Program
{
static void Main(string[] args)
{
var firstBlock = new TransformBlock<int, int>(x => x, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4
});
var secondBlock = new TransformBlock<int,string>(async x =>
{
if (x == 12)
{
await Task.Delay(5000);
return $"{DateTime.Now}: Message is {x} (This is delayed message!) ";
}
return $"{DateTime.Now}: Message is {x}";
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4
});
var thirdBlock = new ActionBlock<string>(s => …Run Code Online (Sandbox Code Playgroud)