jb9*_*b99 5 azure orchestration azure-durable-functions
我有一个耐用的功能扇出和输入模式,但似乎工作不可靠。每 10 分钟从 Timer 函数调用一次 Orchestration,但此后已将其增加到 20 分钟。使用 context.CallActivityAsync 调用 Activity 函数并返回一个整数(已处理的行数)。目前,传入的 workItems 应该只包含 2 个要处理的项目。第一项处理所有行并在日志中显示完整的行。第二项有时显示正在处理的行,但在某些时候它只是停止...没有其他活动被识别,并且“完成”永远不会显示在日志中。另外,第二个活动有时会显示它同时运行多次...我已经在我的开发机器上使用相同的数据尝试了这个确切的代码,并且它处理完成的时间不超过 5 分钟。我还将hosts.json 文件设置为
{
"version": "2.0",
"functionTimeout": "00:10:00",
"extensions": {
"queues": {
"maxPollingInterval": "00:00:05"
}
}
}
Run Code Online (Sandbox Code Playgroud)
编排:
public static async void RunOrchestrator(
[OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
{
log.LogInformation($"************** Fanning out ********************");
var parallelTasks = new List<Task<int>>();
//object[] workBatch = await context.CallActivityAsync<object[]>("GETVendors", null);
object[] workBatch = GETVendors(log); //work batch only has 2 items
for (int i = 0; i<workBatch.Length; i++)
{
Task<int> task = context.CallActivityAsync<int>("SynchVendor", workBatch[i]);
parallelTasks.Add(task);
}
log.LogInformation($"************** 'Waiting' for parallel results ********************");
await Task.WhenAll(parallelTasks);
log.LogInformation($"************** All activity functions complete ********************");
log.LogInformation($"************** fanning in ********************");
int cnt = 0;
foreach (var completedParallelActivity in parallelTasks)
{
cnt += completedParallelActivity.Result;
}
log.LogInformation($"Total Records Converted across all tasks = {cnt}");
//return outputs;
}
Run Code Online (Sandbox Code Playgroud)
活动功能
public static async Task<int> SynchVendor([ActivityTrigger] string vendor, ILogger log)
{
log.LogInformation($"SynchVendor {vendor}");
string sqlStr = Environment.GetEnvironmentVariable("Sqldb_Connection");
bool dev = Convert.ToBoolean(Environment.GetEnvironmentVariable("Dev"));
int totalCount = 0;
using (SqlConnection conn = new SqlConnection(sqlStr))
{
conn.Open();
// lets synch the vendor
Int32 limit = 200;
bool initialLoad = false;
int offset = 0;
bool done = false;
do
{
//synch logic...
// if there are rows found to have changed then send them to a queue for further processing
} while (!done);
// we are done syncing a vendor write out the vendorinfo
conn.Close();
}
log.LogInformation($"SynchVendor {vendor} Complete");
return totalCount;
Run Code Online (Sandbox Code Playgroud)
对于额外的日志记录,您需要将其添加到 log.Log*** 操作之前:
if (!context.IsReplaying)
Run Code Online (Sandbox Code Playgroud)
对于永远看不到完成的问题,您可以执行以下操作:
| 归档时间: |
|
| 查看次数: |
3873 次 |
| 最近记录: |