我正在使用Azure Service Bus主题和订阅.它用于在整个应用程序中发送控制消息.消息侦听器(订阅者)以worker角色运行,他们正在拾取消息并处理请求.即使有多个侦听器同时运行,总线中的每条消息也只能被拾取一次.
使用服务总线没有问题; 但是,我们在本地调试/测试应用程序时遇到了一些问题.我们有2个服务总线,一个用于云,一个用于本地调试.现在,如果多个人同时调试应用程序,则只有一个系统(随机)选择该消息.这是预期的行为,但它在调试时会造成麻烦.
有什么办法可以将本地仿真器用于服务总线吗?我做了一些研究,但我找不到任何可靠的解决方案.我们如何单独调试应用程序?
azure azureservicebus brokeredmessage azure-servicebus-topics
我有一个BrokeredMessageContext类,它使用Timer定期检查并更新BrokeredMessage实例上的锁,以防处理此消息的进程运行的时间超过预期.它通过调用BrokeredMessage实例上的RenewLock()方法来更新锁.
我希望这个调用给我一个与原始锁相同超时的新锁(MSDN声明"你可以在与实体锁定超时相同的持续时间内更新锁定,并且没有锁定续订的最大持续时间.") ,但在调试时,锁定超时似乎增加了"任意"10-15秒.我在BrokeredMessage实例上设置了一个监视器,我可以看到每次调用RenewLock()时,LockedUntilUtc属性都会增加10-15秒.
有谁知道为什么会这样?可以做任何事情来延长锁定时间吗?
编辑:
Mike的回答如下,是正确的.事实上,我发现事实上,我试图从开始就每隔十秒更新一次锁,即使我的代码是为了在锁定到期之前的20秒之前更新锁定.这一切都归结为一个时间比较问题,以及我机器上的时间错误的事实(它提前了将近一分钟).D'哦!
我将BrokeredMessage标记为Complete时遇到问题.
简单的代码,按预期工作:
private void OnMessageArrived(BrokeredMessage message)
{
var myObj= message.GetBody<MyObject>();
//do things with myObj
message.Complete();
}
Run Code Online (Sandbox Code Playgroud)
当我尝试等待用户完成myObj时,我得到异常:
已经处理了breakredmessage
代码如下:
private Dictionary<long, BrokeredMessage> ReceivedMessages;
ReceivedMessages = new Dictionary<long, BrokeredMessage>();
private void OnMessageArrived(BrokeredMessage message)
{
var myObj= message.GetBody<MyObject>();
ReceivedMessages.Add(myObj.Id, message);
//do things with myObj
}
private void Button_Click(object sender, RoutedEventArgs e)
{
// get myObj on which user clicked
ReceivedMessages[myObj.Id].Complete();
ReceivedMessages.Remove(myObj.Id);
}
Run Code Online (Sandbox Code Playgroud)
对我来说,看起来ServiceBus在某种程度上失去了与c#中实际对象的连接
类似于EF中的分离对象的东西,只是在这种情况下,对象与ServiceBus分离
编辑:
只有在用户点击按钮后才能将消息标记为完成,这一点非常重要.如果AC关闭(或类似的事情),我希望消息仍然保留在服务总线主题上,以便下次用户启动应用程序时,他将再次接收他未处理的消息.
我曾经使用 RabbitMQ 作为消息传递平台,我从来没有遇到过任何问题 - 不幸的是,我们最近将我们的基础设施转移到了 Azure 并且他们不提供 RabbitMQ 服务器,所以我想尝试使用服务总线扩展。
我有一个作家和多个读者。目前,读者将各自阅读不同的消息(消息竞争模式 - 例如有利于负载平衡)。
我想要的是,所有的读者都得到相同的信息并自己处理。
读者:
string connectionKey = "....";
this.client = QueueClient.CreateFromConnectionString(connectionKey, "dev");
this.client.OnMessage((message) =>
{
try
{
Console.WriteLine("Message received: " + message.GetBody<string>());
Console.WriteLine("Message ID: " + message.MessageId);
// message.Complete();
}
catch (Exception e)
{
Console.WriteLine("Exception " + e.Message);
}
});
Run Code Online (Sandbox Code Playgroud)
笔者:
Console.WriteLine("Sending message " + message);
BrokeredMessage msg = new BrokeredMessage(message);
this.client.Send(msg);
Run Code Online (Sandbox Code Playgroud)
我一直在寻找解决方案两个小时,但找不到任何东西。在 RabbitMQ 中,这将是默认行为。
因此,我有一个天蓝色函数作为队列触发器,调用内部托管的 API。
对于如何处理由于有毒以外的问题而无法处理的消息,网上似乎没有明确的答案。
一个例子:
收到我的消息并且该函数尝试调用 API。消息有效负载是正确的并且可以处理,但是 API/服务由于某种原因而关闭(这个时间可能会超过 10 分钟)。目前发生的情况是消息传递计数达到其最大值(10),然后被推送到死信队列,这又会在之后的每条消息中发生。
我需要一种方法来要么不增加交付计数,要么在达到最大值时重置它。或者,我可以放弃对消息的查看锁定,而不增加传递计数,因为我想停止处理队列上的任何消息,直到 API/服务恢复并运行。这样我就可以确保所有可以处理的消息都不会因为服务之间的连接问题而陷入死信。
关于如何实现这一目标有什么想法吗?
通过ServiceBus任何自定义传递代理消息DataContractSerializer[as Default XML Serializer Take Place].
var message = new BrokeredMessage(objMess.MessageBody);
Run Code Online (Sandbox Code Playgroud)
注意: 主要是邮件正文是HTML电子邮件的类型.
但是当在传递给worker角色的消息之后deserialization,我看到一些随机文本被附加在顶部正文中,
var reader = new StreamReader(receivedMessage.GetBody<Stream>());
@string3http://schemas.microsoft.com/2003/10/Serialization/? .
Rest of Message Body
Run Code Online (Sandbox Code Playgroud)
我试着给定制DataContractSerializer.但这搞砸了HTML符号.
一些格式化我找到的服务总线消息文章的内容,但仍然找到摆脱模式字符串的方法.
现在我正在substring使用消息体.
我有一个应用程序,其中数据从SQL DB获取并作为代理消息发送到服务总线.这些是步骤:
这是我面临的第三步.这是代码:
public async Task SendMessagesAsync(List<BrokeredMessage> brokeredMessageList)
{
try
{
var topicClient = CreateTopicClient();
await topicClient.SendBatchAsync(brokeredMessageList);
}
catch(Exception ex)
{
throw ex;
}
}
Run Code Online (Sandbox Code Playgroud)
当编译器使用SendBatchAsync方法时,它会在与Service Bus通信期间出现Error错误.检查连接信息,然后重试.内部例外是:
Internal Server Error: The server did not provide a meaningful reply; this might be caused by a premature session shutdown. TrackingId:some guid here
Run Code Online (Sandbox Code Playgroud)
但是,如果我尝试发送100条消息,它可以正常工作.我该怎么做才能让它一次发送1000条消息?
注意:每条消息大小为1445字节
我正在使用 Microsoft.Azure.ServiceBus, Version=2.0.0.0 程序集连接到 Azure 主题。代码如下
public void SendMessage(Message brokeredMessage)
{
var topicClient = new TopicClient(_configuration.ConnectionString, topicName, _defaultRetryPolicy);
await topicClient.SendAsync(brokeredMessage);
await topicClient.CloseAsync();
}
Run Code Online (Sandbox Code Playgroud)
我想知道每次需要向主题发送消息时创建主题客户端是否是一个好习惯,还是应该在应用程序启动时创建主题客户端并在每次需要发送消息时继续使用相同的客户端?
我需要考虑任何性能或可扩展性问题吗?
在 .NET Framework 中,Microsoft.ServiceBus.Messaging 有一个用于从服务总线接收消息的类 BrokeredMessage。但是,在 .NET Standard 2.0 中,为了从服务总线接收消息,使用来自 Microsoft.Azure.ServiceBus.Core 的 Message 类。
BrokeredMessage有一个方法CompleteAsync(),用于完成消息的接收操作,并指示该消息应被标记为已处理和已删除。我找不到 Message 类执行相同操作的方法。你们知道有什么解决方案可以将 Message 类的消息标记为已处理和已删除吗?
我想知道是否可以从 JAVA 中的 azure 服务总线队列读取死信消息。
我发现以下示例https://code.msdn.microsoft.com/windowsazure/Brokered-Messaging-Dead-22536dd8/sourcecode?fileId=123792&pathId=497121593 但是,我无法将代码转换为 JAVA。
我还找到了https://github.com/Azure/azure-storage-java/tree/master/microsoft-azure-storage/src/com/microsoft/azure/storage 但那里似乎没有任何关于死信的内容根本不。
我还找到了几个博客(我不允许添加更多链接,所以我不知道是否应该在没有适当标签的情况下)。但它们都没有描述如何在JAVA中读取死信消息。
预先非常感谢