Tho*_*dal 3 azure azureservicebus azure-functions
我有一系列 Azure Function 应用程序,所有应用程序都使用以下命令将消息输出到 Azure 服务总线IAsyncCollector<Message>:
public async Task Run([ServiceBus(...)] IAsyncCollector<Message> messages)
{
...
await messages.AddAsync(msg);
}
Run Code Online (Sandbox Code Playgroud)
我不时记录错误,如下所示:
Microsoft.Azure.WebJobs.Host.FunctionInvocationException: Exception while executing function: Function
---> Microsoft.Azure.ServiceBus.ServiceBusTimeoutException: The operation did not complete within the allocated time 00:00:59.9999536 for object message.Reference: ..., 7/13/2020 2:46:24 PM
---> System.TimeoutException: The operation did not complete within the allocated time 00:00:59.9999536 for object message.
at Microsoft.Azure.Amqp.AsyncResult.End[TAsyncResult](IAsyncResult result)
at Microsoft.Azure.Amqp.SendingAmqpLink.EndSendMessage(IAsyncResult result)
at System.Threading.Tasks.TaskFactory`1.FromAsyncCoreLogic(IAsyncResult iar, Func`2 endFunction, Action`1 endAction, Task`1 promise, Boolean requiresSynchronization)
--- End of stack trace from previous location where exception was thrown ---
at Microsoft.Azure.ServiceBus.Core.MessageSender.OnSendAsync(IList`1 messageList)
--- End of inner exception stack trace ---
at Microsoft.Azure.ServiceBus.Core.MessageSender.OnSendAsync(IList`1 messageList)
at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.Core.MessageSender.SendAsync(IList`1 messageList)
at Microsoft.Azure.WebJobs.ServiceBus.Bindings.MessageSenderExtensions.SendAndCreateEntityIfNotExists(MessageSender sender, Message message, Guid functionInstanceId, EntityType entityType, CancellationToken cancellationToken)
at My.Function.Run(String mySbMsg, IAsyncCollector`1 messages)
Run Code Online (Sandbox Code Playgroud)
我很难弄清楚这种情况何时在管道中发生。但我最近了解了这个FlushAsync方法:
await messages.AddAsync(msg);
await messages.FlushAsync();
Run Code Online (Sandbox Code Playgroud)
我的问题如下。为什么我永远不会FlushAsync在我的函数中包含调用?在我自己的代码中获取超时异常将使重试、更好地记录异常等成为可能。在功能代码中像这样手动刷新有什么缺点吗?
为什么我永远不会在我的函数中包含对 FlushAsync 的调用?在我自己的代码中获取超时异常将使重试、更好地记录异常等成为可能。
我将在这里更进一步说,在我获得了 Azure Functions 的一些经验后,我现在IAsyncCollector<T>完全避免了。一些实现发布于AddAsync; 其他实现可能在AddAsync和上发布FlushAsync。我怀疑服务总线实现实际上是在 上发布的AddAsync,在这种情况下FlushAsync可能是 noop。
好的部分IAsyncCollector<T>是它给了你一个“写这些东西”的抽象;你所要做的就是提供一个连接字符串,剩下的就是魔术。问题IAsyncCollector<T>在于它给了你一个抽象,因此你的控制权要少得多。
在幕后,进行了多少次重试?他们使用的是持续延迟还是呈指数增长?如果它永远不会成功,它的行为是什么?通常这些关键信息都没有记录。
尤其令人讨厌的是 AF 团队更改了抽象的语义。例如,对于某些输出绑定(CosmosDB 或存储,我不记得了),重试行为从函数 SDK 的一个版本更改为下一个版本。
所以,我倾向于避免输出绑定,尤其是IAsyncCollector<T>. 我通常希望以一分钟左右的上限进行紧密但呈指数增长的去相关抖动重试,但在函数运行时仅剩一分钟时中止,然后恢复行为更改为将消息写入错误队列(重试)。这比 anIAsyncCollector<T>所能提供的要复杂得多,但通过 Polly 直接调用 SDK 并不难。
在功能代码中像这样手动刷新有什么缺点吗?
否。默认情况下,IAsyncCollector<T>.FlushAsync在您的函数执行后由函数宿主调用。所以如果你自己打电话,你只是提前打电话。多次调用应该是安全的。
| 归档时间: |
|
| 查看次数: |
839 次 |
| 最近记录: |