我已经开始使用Azure中的Azure Service Bus了.通过Internet浏览了一些引用,似乎人们在Microsoft.ServiceBus.Messaging中使用BrokeredMessage类而不是在Microsoft.Azure.ServiceBus中使用Message类.
我可以将两种消息"类型"发送到Azure Service Bus,也可以通过Azure Service Bus使用它们.此外,两者都可用于异步操作.这两种类型的主要区别是什么?
[更新]本文介绍了在交换代理消息时的Azure Service Bus的最佳实践(https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-performance-improvements).我不确定它是否也被引用到Microsoft.Azure.ServiceBus中的Message.
我想运行一个具有" 心跳 " 的任务,该任务以特定的时间间隔继续运行,直到任务完成.
我在想这样的扩展方法效果很好:
public static async Task WithHeartbeat(this Task primaryTask, TimeSpan heartbeatInterval, Action<CancellationToken> heartbeatAction, CancellationToken cancellationToken)
Run Code Online (Sandbox Code Playgroud)
例如:
public class Program {
public static void Main() {
var cancelTokenSource = new CancellationTokenSource();
var cancelToken = cancelTokenSource.Token;
var longRunningTask = Task.Factory.StartNew(SomeLongRunningTask, cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Current);
var withHeartbeatTask = longRunningTask.WithHeartbeat(TimeSpan.FromSeconds(1), PerformHeartbeat, cancelToken);
withHeartbeatTask.Wait();
Console.WriteLine("Long running task completed!");
Console.ReadLine()
}
private static void SomeLongRunningTask() {
Console.WriteLine("Starting long task");
Thread.Sleep(TimeSpan.FromSeconds(9.5));
}
private static int _heartbeatCount = 0;
private static void PerformHeartbeat(CancellationToken cancellationToken) …Run Code Online (Sandbox Code Playgroud) 有人可以提供有关使用Azure Service Bus OnMessageOptions.AutoRenewTimeout的更多指导 http://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.onmessageoptions.autorenewtimeout.aspx
因为我没有找到关于此选项的大量文档,并且想知道这是否是更新消息锁的正确方法
我的用例:
1)消息处理队列的锁定持续时间为5分钟(允许的最大值)
2)消息处理器使用OnMessageAsync消息泵从队列中读取(使用ReceiveMode.PeekLock)长时间运行的处理可能需要10分钟才能处理消息,然后再手动调用msg.CompleteAsync
3)我希望消息处理器自动更新它的锁定,直到预期完成处理的时间(~10分钟).如果在此期间之后尚未完成,则应自动释放锁定.
谢谢
- 更新
我从来没有最终获得有关AutoRenewTimeout的更多指导.我最终使用自定义MessageLock类,根据计时器自动更新消息锁.
请参阅要点 - https://gist.github.com/Soopster/dd0fbd754a65fc5edfa9
我的场景:托管在云上的网站,每个实例创建一个服务总线主题的订阅,以便自己监听消息.
我的问题: 如何以编程方式创建订阅?
...
问题:我习惯使用较旧的WindowsAzure库.现在我使用的是不支持旧库的.NET Standard
解决方案:使用Microsoft.Azure.ServiceBus NuGet包
...
问题:它不支持管理功能(如创建订阅)
解决方案:还使用NuGet库Microsoft.Azure.Management.ServiceBus
...
问题:它不支持使用连接字符串或Azure门户提供给您的密钥
解决方案:https: //github.com/Azure-Samples/service-bus-dotnet-management提供了常用的编程模式.
...
问题:WTF是它正在谈论的{tenantId}?
解决方案:很明显,它位于"Azure Active Directory" - >"Properties"下,并且有助于标记为"Directory ID"而不是tenantId,正如人们所期望的那样
...
问题:WTF是{clientId}和{clientSecret}?
解决方案:更容易.从这些信息中可以明显看出,您必须:
...
好的,示例工作的第1步也是如此?AcquireTokenAsync返回一个访问令牌!YAYYYYYYYYYYYYY
result.AccessToken是步骤2中提到的"令牌",似乎.
第2,3步......没问题......继续执行第4步.除了使用sbClient.Subscriptions.CreateOrUpdate代替
{resourceGroupName}似乎很容易 - 从门户网站上的"资源组"复制
...
问题:什么是{namespaceName}?
是服务总线名称吗?喜欢{name} .servicebus.windows.net?或者命名空间是{name.servicebus.windows.net}的全部内容?或者它包括像{sb://name.servicebus.windows.net}这样的方案?
...
问题:无论我在这里尝试什么,我都会在CreateOrUpdate调用中以"Operation返回无效状态代码'NotFound'"结束.
那就是我被困住的地方.我哪里出错了?
在旁注中,snark反映了我的挫败感 - 我觉得好像我需要学习一大堆Azure技术,我真的不想知道任何事情只是为了做一些"应该简单的事情" ".就像我被一个兔子洞弄下来一样.
[编辑#1] - "SubscriptionId"原来是在门户的计费部分下找到的AZURE订阅ID,而不是指服务总线订阅.
[编辑#2] - "namespaceName"不包括.servicebus.windows.net - 只是名称本身
随着这些变化,经过无数个小时的研究,IT工作
我正在使用访问控制服务来授权访问特定服务标识的特定服务总线订阅.
当从订阅接收会话或消息时,服务标识被授权并且可以根据需要接收和完成或放弃消息.
但是,我没有看到UnauthorizedAccessException尝试访问服务标识无权访问的订阅时,也没有在尝试执行规则组未针对该服务标识和依赖项发出声明的操作时看到此异常派对(如发送消息或创建主题).
相反,我最终看到了一个TimeoutException - "The timeout elapsed upon attempting to obtain a token while accessing 'https://namespace-sb.accesscontrol.windows.net/WRAPv0.9/'".内部例外是一个SecurityTokenException - "The token provider was unable to provide a security token while accessing 'https://namespace-sb.accesscontrol.windows.net/WRAPv0.9/'. Token provider returned message: 'The operation has timed out'".这会导致RetryPolicy出现问题,因为Timeout Exception被认为是瞬态的.
但奇怪的是,我UnauthorizedAccessException在尝试接收订阅描述时收到了.根据服务总线操作所需的权利,应该可以在... myTopic/Subscriptions/mySubscription范围内使用Listen Claim提供服务标识.
我有以下设置:
我看到以下问题:
var manager …Run Code Online (Sandbox Code Playgroud) 我想确保,如果我的eventhub客户端崩溃(当前是一个控制台应用程序),它只会选择尚未从eventhub获取的事件.实现这一目标的一种方法是利用抵消.但是,这(根据我的理解)要求客户端存储最新的偏移量(除了事件似乎不一定会触及SequenceNumber排序的ProcessEventsAsync方法的foreach循环).
另一种方法是使用检查点.我认为它们是使用提供的存储帐户凭据通过服务器(eventhub)保留的.它是否正确?
这是我目前使用的一些初步代码:
public class SimpleEventProcessor : IEventProcessor
{
private Stopwatch _checkpointStopWatch;
async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
{
Console.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason);
if (reason == CloseReason.Shutdown)
{
await context.CheckpointAsync();
}
}
Task IEventProcessor.OpenAsync(PartitionContext context)
{
Console.WriteLine("SimpleEventProcessor initialized. Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset);
_checkpointStopWatch = new Stopwatch();
_checkpointStopWatch.Start();
return Task.FromResult<object>(null);
}
async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (var eventData in messages)
{
// do something
}
//Call checkpoint every 5 minutes, so …Run Code Online (Sandbox Code Playgroud) 在Azure Service Bus中,您可以使用QueueClient和发送代理消息MessageFactory.我想知道你为什么要用一个而不是另一个.
MS最近推出了Microsoft.Azure.ServiceBus命名空间.
https://github.com/Azure/azure-service-bus/blob/master/samples/readme.md
它适用于新的.net标准框架(就好像MS没有足够的半冗余代码库)
我的问题是,在性能方面有多好?
我可以自信地说, Microsoft.ServiceBus.Messaging留下了许多需要,特别是在持久接收方面.
Microsoft.ServiceBus.Messaging的一个非常有用的功能是消息泵,它构建在OnMessage()方法之上.
新的库没有这个,并且需要在每个收据上重新绑定事件处理程序以保持抽水.绝对是退步.
寻找任何有过这方面经验的人的反馈,可以比较..
我通过.net SDK使用Azure ServiceBus队列.OnMessageHandler/OnMessageOptions上有一个名为"AutoRenewTimeout"的标志,但似乎对这个值的实际含义感到困惑.
在这里的官方文档https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.onmessageoptions.aspx它建议AutoRenewTimeout应该大于队列锁定.
获取或设置自动续订锁定的最长持续时间.该值应大于最长的消息锁定持续时间; 例如,LockDuration属性.
这似乎表明AutoRenewTimeout或多或少是处理消息所需的最长时间.例如,如果您的锁定持续时间为1分钟,并且自动续订时间为5分钟,则该消息将在放弃之前更新总共5次并再次在队列中显示.还有其他StackOverflow答案确认这种情况,例如/sf/answers/2523598591/
要处理长消息处理,您应该设置AutoRenewTimeout == 10分钟(在您的情况下).这意味着每次LockDuration过期时,锁定将在这10分钟内更新.
因此,例如,如果您的LockDuration为3分钟且AutoRenewTimeout为10分钟,则每3分钟锁定将自动续订(3分钟,6分钟和9分钟后),并且自消息消耗后12分钟后锁定将自动释放.
然而,在更多的研究中,我偶然发现了一条旧的推文(https://twitter.com/clemensv/status/649940087267041284),看起来是微软消息传递的首席架构师.在这条推文中,似乎暗示AutoRenewTimeout是调用"RenewLock"方法的间隔.
它是在回调处于活动状态时在消息上调用https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.brokeredmessage.renewlock.aspx的时间间隔
因此,例如,如果你的锁定是1分钟,AutoRenewTimeout应该是30秒,以确保在释放之前更新消息锁定.
在我自己的测试中,我倾向于前者是正确的,但推文让我怀疑的事实可能我不知道充分利用AutoRenewTimeout