我已经整理了一个简单的应用程序,它监视文件创建事件,从文件内容创建一些对象,并进行一些处理.以下是示例代码:
class Program
{
private const string Folder = "C:\\Temp\\InputData";
static void Main(string[] args)
{
var cts = new CancellationTokenSource();
foreach (var obj in Input(cts.Token))
Console.WriteLine(obj);
}
public static IEnumerable<object> Input(CancellationToken cancellationToken)
{
var fileList = new BlockingCollection<string>();
var watcher = new FileSystemWatcher(Folder);
watcher.Created += (source, e) =>
{
if (cancellationToken.IsCancellationRequested)
watcher.EnableRaisingEvents = false;
else if (Path.GetFileName(e.FullPath) == "STOP")
{
watcher.EnableRaisingEvents = false;
fileList.CompleteAdding();
File.Delete(e.FullPath);
}
else
fileList.Add(e.FullPath);
};
watcher.EnableRaisingEvents = true;
return from file in
fileList.GetConsumingEnumerable(cancellationToken)
//.AsParallel()
//.WithCancellation(cancellationToken)
//.WithDegreeOfParallelism(5) …Run Code Online (Sandbox Code Playgroud) 下面我为了简单起见将一个真实场景复制为LINQPad脚本:
var total = 1 * 1000 * 1000;
var cts = new CancellationTokenSource();
var threads = Environment.ProcessorCount;
int capacity = 10;
var edbOptions = new ExecutionDataflowBlockOptions{BoundedCapacity = capacity, CancellationToken = cts.Token, MaxDegreeOfParallelism = threads};
var dbOptions = new DataflowBlockOptions {BoundedCapacity = capacity, CancellationToken = cts.Token};
var gdbOptions = new GroupingDataflowBlockOptions {BoundedCapacity = capacity, CancellationToken = cts.Token};
var dlOptions = new DataflowLinkOptions {PropagateCompletion = true};
var counter1 = 0;
var counter2 = 0;
var delay1 = 10;
var delay2 = …Run Code Online (Sandbox Code Playgroud)