BKS*_*BKS 2 .net c# multithreading task-parallel-library async-await
我有一个类负责通过调用遗留类来检索产品可用性.此遗留类本身通过进行BLOCKING网络调用来内部收集产品数据.请注意,我无法修改旧版API的代码.由于所有产品彼此独立,我想并行收集信息而不创建任何不必要的线程,也不阻止在调用此遗留API时被阻止的线程.有了这个背景,这里是我的基本课程.
class Product
{
public int ID { get; set; }
public int VendorID { get; set; }
public string Name { get; set; }
}
class ProductSearchResult
{
public int ID { get; set; }
public int AvailableQuantity { get; set; }
public DateTime ShipDate { get; set; }
public bool Success { get; set; }
public string Error { get; set; }
}
class ProductProcessor
{
List<Product> products;
private static readonly SemaphoreSlim mutex = new SemaphoreSlim(2);
CancellationTokenSource cts = new CancellationTokenSource();
public ProductProcessor()
{
products = new List<Product>()
{
new Product() { ID = 1, VendorID = 100, Name = "PC" },
new Product() { ID = 2, VendorID = 101, Name = "Tablet" },
new Product() { ID = 3, VendorID = 100, Name = "Laptop" },
new Product() { ID = 4, VendorID = 102, Name = "GPS" },
new Product() { ID = 5, VendorID = 107, Name = "Mars Rover" }
};
}
public async void Start()
{
Task<ProductSearchResult>[] tasks = new Task<ProductSearchResult>[products.Count];
Parallel.For(0, products.Count(), async i =>
{
tasks[i] = RetrieveProductAvailablity(products[i].ID, cts.Token);
});
Task<ProductSearchResult> results = await Task.WhenAny(tasks);
// Logic for waiting on indiviaul tasks and reporting results
}
private async Task<ProductSearchResult> RetrieveProductAvailablity(int productId, CancellationToken cancellationToken)
{
ProductSearchResult result = new ProductSearchResult();
result.ID = productId;
if (cancellationToken.IsCancellationRequested)
{
result.Success = false;
result.Error = "Cancelled.";
return result;
}
try
{
await mutex.WaitAsync();
if (cancellationToken.IsCancellationRequested)
{
result.Success = false;
result.Error = "Cancelled.";
return result;
}
LegacyApp app = new LegacyApp();
bool success = await Task.Run(() => app.RetrieveProductAvailability(productId));
if (success)
{
result.Success = success;
result.AvailableQuantity = app.AvailableQuantity;
result.ShipDate = app.ShipDate;
}
else
{
result.Success = false;
result.Error = app.Error;
}
}
finally
{
mutex.Release();
}
return result;
}
}
Run Code Online (Sandbox Code Playgroud)
鉴于我试图在同步API上包装异步,我有两个问题.
编译器会给你关于asynclambda的警告.仔细阅读; 它告诉你它不是异步的.在那里使用async没有意义.另外,不要使用async void.
由于你的底层API是阻塞的 - 并且没有办法改变它 - 异步代码不是一个选项.我建议要么使用几种Task.Run来电或 Parallel.For,但不能同时使用.所以让我们使用并行.实际上,让我们使用Parallel LINQ,因为你正在转换一个序列.
制作RetrieveProductAvailablity异步是没有意义的; 除了限制之外,它只进行阻塞工作,并行方法具有更自然的限制支持.这使您的方法看起来像:
private ProductSearchResult RetrieveProductAvailablity(int productId, CancellationToken cancellationToken)
{
... // no mutex code
LegacyApp app = new LegacyApp();
bool success = app.RetrieveProductAvailability(productId);
... // no mutex code
}
Run Code Online (Sandbox Code Playgroud)
然后,您可以执行以下并行处理:
public void Start()
{
ProductSearchResult[] results = products.AsParallel().AsOrdered()
.WithCancellation(cts.Token).WithDegreeOfParallelism(2)
.Select(product => RetrieveProductAvailability(product.ID, cts.Token))
.ToArray();
// Logic for waiting on indiviaul tasks and reporting results
}
Run Code Online (Sandbox Code Playgroud)
从您的UI线程,您可以使用以下方法调用该方法Task.Run:
async void MyUiEventHandler(...)
{
await Task.Run(() => processor.Start());
}
Run Code Online (Sandbox Code Playgroud)
这使您的业务逻辑保持干净(仅限同步/并行代码),并且将此工作从UI线程(使用Task.Run)移出的责任属于UI层.
更新:我添加了一个调用,AsOrdered以确保结果数组与产品序列具有相同的顺序.这可能是必要的,也可能不是必需的,但由于原始代码保留了顺序,此代码现在也可以.
更新:由于您需要在每次检索后更新UI,因此您应该使用Task.Run每个UI 而不是AsParallel:
public async Task Start()
{
var tasks = products.Select(product =>
ProcessAvailabilityAsync(product.ID, cts.Token));
await Task.WhenAll(tasks);
}
private SemaphoreSlim mutex = new SempahoreSlim(2);
private async Task ProcessAvailabilityAsync(int id, CancellationToken token)
{
await mutex.WaitAsync();
try
{
var result = await RetrieveProductAvailability(id, token);
// Logic for reporting results
}
finally
{
mutex.Release();
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1163 次 |
| 最近记录: |