我目前正在研究一个涉及索引大量文件的研究项目(240k); 它们主要是html,xml,doc,xls,zip,rar,pdf和文本大小从几KB到超过100 MB的文本.
在提取了所有zip和rar文件后,我最终得到了一百万个文件.
我正在使用Visual Studio 2010,C#和.NET 4.0,支持TPL Dataflow和Async CTP V3.要从这些文件中提取文本,我使用Apache Tika(使用ikvm转换),并使用Lucene.net 2.9.4作为索引器.我想使用新的TPL数据流库和异步编程.
我有几个问题:
如果我使用TPL,我会获得性能优势吗?它主要是一个I/O过程,据我所知,当您大量使用I/O时,TPL并没有提供太多好处.
生产者/消费者方法是否是处理此类文件处理的最佳方式,还是有更好的其他模型?我正在考虑使用blockingcollections创建一个包含多个使用者的生产者.
TPL数据流库是否适用于此类流程?看来TPL Dataflow最适合用于某种消息传递系统......
在这种情况下,我应该使用异步编程还是坚持同步?
我试图使用AsyncTargetingPack异步将一些信息记录到针对.NET 4.0的MVC 4控制器操作中的SQL Server.我会直接跳到.NET 4.5,但我的应用程序存在于Azure中,我们仍在等待更新 ...
此代码按预期工作(一行写入我的数据库,没有抛出异常):
public class SystemActionLogger : ISystemActionLogger
{
private readonly ActionBlock<Tuple<SystemAction, object>> actionBlock;
public SystemActionLogger(ISystemActionLogEntryRepository repository)
{
actionBlock = new ActionBlock<Tuple<SystemAction, object>>(
entry => TaskEx.Run(async () =>
{
string data = await JsonConvert.SerializeObjectAsync(entry.Item2);
await repository.PersistAsync(new SystemActionLogEntry(entry.Item1, data));
}));
}
public void Log(SystemAction systemAction, object data)
{
actionBlock.Post(new Tuple<SystemAction, object>(systemAction, data));
}
}
Run Code Online (Sandbox Code Playgroud)
并且此代码抛出NullReferenceException:
public class SystemActionLogger : ISystemActionLogger
{
private readonly ActionBlock<Tuple<SystemAction, object>> actionBlock;
public SystemActionLogger(ISystemActionLogEntryRepository repository)
{
actionBlock = new ActionBlock<Tuple<SystemAction, object>>(async entry …Run Code Online (Sandbox Code Playgroud) 在使用数据流后,我遇到了一个新问题.我想限制所有块的输入队列.我的产品块(ActionBlock)正在快速创建5000个元素并将它们发布到广播块.因此,如果我将广播块的BoundedCapacity设置为100,则会抛出大量数据.但是我希望生产块等待我的缓冲区输入队列中的新槽.
有没有办法摆脱这个问题?
在C#TPL数据流库中,SingleProducerConstrained是ActionBlocks的优化选项,当只有一个线程为动作块提供时,您可以使用它:
如果一个块只曾经打算通过一次单一的生产商,同时意味着只有一个线程被用来将使用类似帖子,OfferMessage和方法的块上完成的,这个属性可以被设置为true通知阻止它不需要应用额外的同步.
如果有什么的ActionBlock使用具有单个TransformBlock喂MaxDegreeOfPArallelism> 1 - 将违反规则的ActionBlock SingleProcerContrained设置为true?或者是MaxDegreeOfPArallelism> 1的单个TransformBlock仍被视为"单一生产者"?
我有几个文件(每个近1GB)和数据.数据是一个字符串行.
我需要与数百名消费者一起处理这些文件.这些消费者中的每一个都做一些与其他消费者不同的处 消费者不会同时写任何地方.他们只需要输入字符串.处理后,他们更新本地缓冲区.消费者可以轻松地并行执行.
重要提示:对于一个特定文件,每个消费者必须以正确的顺序处理所有行(不跳过)(因为它们出现在文件中).处理不同文件的顺序无关紧要.
一个消费者对单行的处理速度相当快.我希望Corei5上的时间不到50微秒.
所以现在我正在寻找解决这个问题的好方法.这将成为.NET项目的一部分,所以请让我们坚持使用.NET(最好是C#).
我知道TPL和DataFlow.我猜最相关的是BroadcastBlock.但我认为这里的问题是,每一行我都要等待所有消费者完成才能发布新的消费者.我猜它效率不高.
我认为理想的情况是这样的:
我采用这种方法是对的吗?无论是否,我如何实施良好的解决方案?
当您创建具有有界容量的批处理块并调用triggerBatch时(并行)发布新项目 - 在触发批处理执行时间期间发布新项目将失败.
在输入数据流暂停或减速的情况下,调用触发批处理(每X次)以确保数据在块中不会延迟太长时间.
以下代码将输出一些"post failure"事件.例如:
public static void Main(string[] args)
{
var batchBlock = new BatchBlock<int>(10, new GroupingDataflowBlockOptions() { BoundedCapacity = 10000000 });
var actionBlock = new ActionBlock<int[]>(x => ProcessBatch(x), new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });
batchBlock.LinkTo(actionBlock);
var producerTask = Task.Factory.StartNew(() =>
{
//Post 10K Items
for (int i = 0; i < 10000; i++)
{
var postResult = batchBlock.Post(i);
if (!postResult)
Console.WriteLine("Failed to Post");
}
});
var triggerBatchTask = Task.Factory.StartNew(() =>
{
//Trigger Batch..
for (int i …Run Code Online (Sandbox Code Playgroud) 我想实现优先顺序ActionBlock<T>.因此我可以TInput通过使用a 来有条件地优先考虑某些项目Predicate<T>.
我阅读了 Parallel Extensions Extras示例和实现自定义TPL数据流块的指南.
但仍然没有弄清楚如何实现这种情况.
---------------------------- EDIT --------------------- ------
有一些任务,其中5个可以同时运行.当用户按下按钮时,一些(取决于谓词函数)任务应该以最高优先级运行.
实际上我写了这段代码
TaskScheduler taskSchedulerHighPriority;
ActionBlock<CustomObject> actionBlockLow;
ActionBlock<CustomObject> actionBlockHigh;
...
queuedTaskScheduler = new QueuedTaskScheduler(TaskScheduler.Default, 5);
taskSchedulerHigh = queuedTaskScheduler.ActivateNewQueue(0);
taskSchedulerLow = queuedTaskScheduler.ActivateNewQueue(1);
...
actionBlockHigh = new ActionBlock<CustomObject>(new Action<CustomObject>(method), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, SingleProducerConstrained = false, TaskScheduler = taskSchedulerHigh });
actionBlockLow = new ActionBlock<CustomObject>(new Action<CustomObject>(method), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, MaxMessagesPerTask = 1, TaskScheduler = taskSchedulerLow });
...
if (predicate(customObject)) …Run Code Online (Sandbox Code Playgroud) c# multithreading task-parallel-library .net-4.5 tpl-dataflow
我正在研究一个系统,它涉及通过TCP网络连接接受命令,然后在执行这些命令时发送响应.相当基本的东西,但我希望支持一些要求:
我想使用async/await干净地实现它,并且基于我所读到的,TPL Dataflow听起来像是一种干净地将处理分解为可以在线程池上运行而不是绑定线程的漂亮块的好方法对于不同的会话/命令,阻塞等待句柄.
这就是我的开始(一些部分被剥离以简化,例如异常处理的细节;我还省略了一个为网络I/O提供有效等待的包装器):
private readonly Task _serviceTask;
private readonly Task _commandsTask;
private readonly CancellationTokenSource _cancellation;
private readonly BufferBlock<Command> _pendingCommands;
public NetworkService(ICommandProcessor commandProcessor)
{
_commandProcessor = commandProcessor;
IsRunning = true;
_cancellation = new CancellationTokenSource();
_pendingCommands = new BufferBlock<Command>();
_serviceTask = Task.Run((Func<Task>)RunService);
_commandsTask = Task.Run((Func<Task>)RunCommands);
}
public bool IsRunning { get; private set; }
private async Task RunService()
{
_listener = new TcpListener(IPAddress.Any, ServicePort);
_listener.Start();
while (IsRunning)
{
Socket client = null;
try
{
client = await _listener.AcceptSocketAsync();
client.Blocking …Run Code Online (Sandbox Code Playgroud) 我编写了以下方法来批处理一个巨大的CSV文件.我们的想法是从文件中读取一大块行到内存中,然后将这些行分成固定大小的批量.获得分区后,将这些分区发送到服务器(同步或异步),这可能需要一段时间.
private static void BatchProcess(string filePath, int chunkSize, int batchSize)
{
List<string> chunk = new List<string>(chunkSize);
foreach (var line in File.ReadLines(filePath))
{
if (chunk.Count == chunk.Capacity)
{
// Partition each chunk into smaller chunks grouped on column 1
var partitions = chunk.GroupBy(c => c.Split(',')[0], (key, g) => g);
// Further breakdown the chunks into batch size groups
var groups = partitions.Select(x => x.Select((i, index) =>
new { i, index }).GroupBy(g => g.index / batchSize, e => e.i));
// Get batches …Run Code Online (Sandbox Code Playgroud) c# csv task-parallel-library blockingcollection tpl-dataflow
正如Stephen Toub在这篇文章中解释的那样,当您向ActionBlock提交消息时,可以在调用ActionBlock.Post之前先执行ExecutionContext.Capture,将包含消息和ExecutionContext的DTO传递到块中,然后在消息处理委托内部使用ExecutionContext.Run在捕获的上下文上运行委托:
public sealed class ContextFlowProcessor<T> {
private struct MessageState {
internal ExecutionContext Context;
internal T Value;
}
private readonly ITargetBlock<MessageState> m_block;
public ContextFlowProcessor(Action<T> action) {
m_block = new ActionBlock<MessageState>(ms =>
{
if (ms.Context != null)
using (ms.Context) ExecutionContext.Run(ms.Context, s => action((T)s), ms.Value);
else
action(ms.Value);
});
}
public bool Post(T item) {
var ec = ExecutionContext.Capture();
var rv = m_block.Post(new MessageState { Context = ec, Value = item });
if (!rv) ec.Dispose();
return rv;
}
public void Done() …Run Code Online (Sandbox Code Playgroud) tpl-dataflow ×10
c# ×9
.net ×3
.net-4.0 ×1
.net-4.5 ×1
async-await ×1
async-ctp ×1
asynchronous ×1
csv ×1
file-io ×1