mar*_*ark 5 .net c# asynchronous task-parallel-library
我有以下代码:
innerExceptions = dbconnByServer
.AsParallel()
.WithDegreeOfParallelism(dbconnByServer.Count)
// A stream of groups of server connections proceeding in parallel per server
.Select(dbconns => dbconns.Select(dbconn => m_sqlUtilProvider.Get(dbconn)))
// A stream of groups of SqlUtil objects proceeding in parallel per server
.Select(sqlUtils => GetTaskException(sqlUtils
// Aggregate SqlUtil objects to form a single Task which runs the SQL asynchronously for the first SqlUtil, then upon completion
// for the next SqlUtil and so long until all the SqlUtil objects are processed asynchronously one after another.
.Aggregate<ISqlUtil, Task>(null, (res, sqlUtil) =>
{
if (res == null)
{
return sqlUtil.ExecuteSqlAsync(SQL, parameters);
}
return res.ContinueWith(_ => sqlUtil.ExecuteSqlAsync(SQL, parameters)).Unwrap();
})))
.Where(e => e != null)
.ToList();
Run Code Online (Sandbox Code Playgroud)
哪里:
private static Exception GetTaskException(Task t)
{
try
{
t.Wait();
return null;
}
catch (Exception exc)
{
return exc;
}
}
Run Code Online (Sandbox Code Playgroud)
此代码执行的操作是跨多个数据库连接执行某些SQL语句,其中某些连接可能属于一个数据库服务器,而其他连接可能属于另一个数据库服务器,依此类推.
代码确保两个条件成立:
给定每个数据库服务器的N db连接Task,在聚合结束时会有一个单独执行,具有以下效果:
我的问题是,除了第一个数据库连接之外,现在除了异常.我知道我应该检查_参数并以某种方式处理_.Exceptioncontinuation函数内的属性.我想知道是否有一种优雅的方式来做到这一点.
有任何想法吗?
这样,我执行此操作的代码将如下所示:
public Task<IEnumerable<Exception>> ExecuteOnServersAsync(
IList<IEnumerable<Connection>> dbConnByServer,
string sql, object parameters)
{
var tasks = new List<Task>();
var exceptions = new ConcurrentQueue<Exception>();
Action<Task> handleException = t =>
{
if (t.IsFaulted)
exceptions.Enqueue(t.Exception);
};
foreach (var dbConns in dbConnByServer)
{
Task task = null;
foreach (var dbConn in dbConns)
{
var sqlUtil = m_sqlUtilProvider.Get(dbConn);
if (task == null)
{
task = sqlUtil.ExecuteSqlAsync(sql, parameters);
}
else
{
task = task.ContinueWith(
t =>
{
handleException(t);
return sqlUtil.ExecuteSqlAsync(sql, parameters);
}).Unwrap();
}
}
if (task != null)
{
task = task.ContinueWith(handleException);
tasks.Add(task);
}
}
return Task.Factory.ContinueWhenAll(
tasks.ToArray(), _ => exceptions.AsEnumerable());
}
Run Code Online (Sandbox Code Playgroud)