我一直在学习事件中心,只是希望得到确认或更正我对事件中心的看法?我习惯于利用重试,有毒消息,至少一次交付等等,以获得Azure Service Bus Queues和Topics给我的正常企业消息传递解决方案.似乎Event Hubs旨在提供一种非常高规模的不同工具,您必须放弃一些更高规模的"企业"功能.
我正确地考虑了这个吗?是否还需要考虑其他细节?我意识到事件中心和主题可能存在一些功能重叠,但我只想弄清楚如何考虑使用事件中心.
有没有办法在本地模拟器上玩和发现Azure Service Bus而无需注册到真正的Azure服务?
我正在关注使用Azure Service Bus的教程,但在某个时刻a Namespace
和a Issuer Name
并且Key
是必需的.我没有这些数据,因为我没有注册到Azure服务,我现在不想这样做(当我准备好开发/测试真实的东西时,我会接受试用).
当您在Azure Service Bus中使用代理消息时,可以使用调用.GetBody检索消息正文.代码很简单:
var msg = subscription.Receive();
MyPayload payload = msg.GetBody<MyPayload>();
Run Code Online (Sandbox Code Playgroud)
但是,有没有办法在不明确知道body对象的类的情况下检索Body?
var msg = subscription.Receive();
Type bodyType = Type.GetType( msg.ContentType);
var payload = msg.GetBody<bodyType>();
Run Code Online (Sandbox Code Playgroud) 我正在学习消息传递系统,但对这些术语感到困惑。
下面的所有消息传递系统都提供了具有不同功能集的服务之间的松散耦合。
queue
- FIFO,拉动机制,每个队列有 1 个消费者,但有任意数量的生产者?
message bus
- 发布/订阅模型,任意数量的消费者和任意数量的生产者处理消息?是Azure Service Bus
的实现message bus
?
event bus
- 发布/订阅模型,任意数量的消费者和任意数量的生产者处理事件?
就术语而言,人们是否可以互换使用message bus
和event bus
?
事件和消息有什么区别?在这种情况下,这些只是同义词吗?
event hub
- 发布/订阅模型,分区,重放,消费者可以将事件存储在外部存储中或接近实时数据分析。究竟什么是事件中心?
event grid
- 它可以用作事件中心的下游服务。它究竟做了哪些event hub
没有做的事情?
有人可以提供一些历史背景,说明每种技术如何演变为另一种技术,每种技术都与一些实际用例相关联吗?
我发现消息总线与消息队列有帮助
message-queue servicebus azureservicebus azure-eventhub azure-eventgrid
我正在尝试使用Azure WebJob从Azure ServiceBus队列中读取消息,但它正在抛出异常:
Unhandled Exception: System.InvalidOperationException: Found 2 DNS claims in authorization context.
Run Code Online (Sandbox Code Playgroud)
我已经设置了名为"AzureWebJobsServiceBus","AzureWebJobsDashboard"和"AzureWebJobsStorage"的正确连接字符串
WebJob程序代码已更新为使用JobHostConfiguration:
class Program
{
static void Main()
{
var config = new JobHostConfiguration();
config.UseServiceBus();
var host = new JobHost(config);
host.RunAndBlock();
}
}
Run Code Online (Sandbox Code Playgroud)
而实际的Job方法
public class Functions
{
public async static Task ServiceBusResizeRequest(
[ServiceBusTrigger("blah")] string message,
TextWriter log
)
{
await log.WriteLineAsync("got message " + message);
}
}
Run Code Online (Sandbox Code Playgroud)
我可以通过单独的控制台应用程序成功创建和写入队列.
但是当我运行webjob应用程序时,它会抛出异常.
有任何想法吗?
编辑:使用.net 4.6.1
azure azureservicebus azure-servicebus-queues azure-webjobs azure-webjobssdk
所以场景是我使用SB队列来限制对其他服务的传出回调.回拨给其他服务的标准问题之一是它们可能会因无法控制的时间而停机.假设我检测到目标已关闭/没有响应,放弃该消息的最佳模式是什么,以便它不会立即重新出现在队列中?
以下是我要么意识到的,尝试过或正在考虑的一些方法:
显然,如果我只是使用BrokeredMessage::Abandon()
该消息将被解锁并重新放回队列.对于这种情况以及我想要避免的情况,这显然是不可取的.
如果我只是忽略了我遇到错误并且从不调用Abandon的事实,这将使它不会立即出现,但我真的没有细致的控制,直到它再次显示多久,我想实现一个腐朽的重试策略.
我想也许我可以调用BrokeredMessage::Abandon(IDictionary<string, object>)
并以某种方式更新ScheduledEnqueueTimeUTC
属性,但我已经尝试了这个并且除了最初发送消息之外似乎没有办法影响该属性.有道理,但值得一试.
我考虑过只是BrokeredMessage::Complete()
在这种情况下使用,实际上只是用属性集将消息的新副本排入队列ScheduledEqueueTimeUTC
.
最后的子弹几乎看起来太过沉重,但我得出结论,考虑到队列的固有性质,它可能是正确的答案.我只想到在Azure SB队列中可能有一个更好的方法来实现这一点,我很想念.
我正在使用Microsoft azure服务总线队列来处理计算,我的程序可以正常运行几个小时,但后来我开始为从那时起处理的每条消息都获得此异常.我不知道从哪里开始,因为前几个小时一切正常.我的代码似乎也很准确.我将发布处理azure服务总线消息的方法.
public static async Task processCalculations(BrokeredMessage message)
{
try
{
if (message != null)
{
if (connection == null || !connection.IsConnected)
{
connection = await ConnectionMultiplexer.ConnectAsync("connection,SyncTimeout=10000,ConnectTimeout=10000");
//connection = ConnectionMultiplexer.Connect("connection,SyncTimeout=10000,ConnectTimeout=10000");
}
cache = connection.GetDatabase();
string sandpKey = message.Properties["sandp"].ToString();
string dateKey = message.Properties["date"].ToString();
string symbolclassKey = message.Properties["symbolclass"].ToString();
string stockdataKey = message.Properties["stockdata"].ToString();
string stockcomparedataKey = message.Properties["stockcomparedata"].ToString();
var sandpTask = cache.GetAsync<List<StockData>>(sandpKey);
var dateTask = cache.GetAsync<DateTime>(dateKey);
var symbolinfoTask = cache.GetAsync<SymbolInfo>(symbolclassKey);
var stockdataTask = cache.GetAsync<List<StockData>>(stockdataKey);
var stockcomparedataTask = cache.GetAsync<List<StockMarketCompare>>(stockcomparedataKey);
await Task.WhenAll(sandpTask, dateTask, symbolinfoTask,
stockdataTask, stockcomparedataTask); …
Run Code Online (Sandbox Code Playgroud) 我知道有一种方法可以确定Azure队列(存储帐户)中的邮件数量(或近似数量); 但有没有办法查询Azure Service Bus队列中的待处理消息数?
Azure Service Bus支持内置的重试机制,该机制使被放弃的消息立即可见以进行另一次读取尝试.我正在尝试使用此机制来处理一些瞬态错误,但消息在被放弃后立即可用.
我想做的是让信息在被放弃后的一段时间内不可见,最好是基于指数递增的策略.
我ScheduledEnqueueTimeUtc
在放弃消息时尝试设置属性,但似乎没有效果:
var messagingFactory = MessagingFactory.CreateFromConnectionString(...);
var receiver = messagingFactory.CreateMessageReceiver("test-queue");
receiver.OnMessageAsync(async brokeredMessage =>
{
await brokeredMessage.AbandonAsync(
new Dictionary<string, object>
{
{ "ScheduledEnqueueTimeUtc", DateTime.UtcNow.AddSeconds(30) }
});
}
});
Run Code Online (Sandbox Code Playgroud)
我已经考虑过根本不放弃消息而只是让锁过期,但是这需要一些方法来影响MessageReceiver
指定消息的锁定持续时间的方式,而我在API中找不到任何东西让我改变这个价值.此外,在已经需要锁定之前,将无法读取消息的传递计数(因此决定等待下一次重试的时间).
消息总线中的重试策略是否可以以某种方式受到影响,还是可以通过其他方式人为引入延迟?
我正在使用EventProcessorHost和一个IEventProcessor类(调用它:MyEventProcessor)从EventHub接收事件.我通过在两台服务器上运行我的EPH,并使用相同的ConsumerGroup连接到Hub,但使用唯一的hostName(使用机器名称)将其扩展到两台服务器.
问题是:在白天/黑夜的随机时间,应用程序记录:
Exception information:
Exception type: ReceiverDisconnectedException
Exception message: New receiver with higher epoch of '186' is created hence current receiver with epoch '186' is getting disconnected. If you are recreating the receiver, make sure a higher epoch is used.
at Microsoft.ServiceBus.Common.ExceptionDispatcher.Throw(Exception exception)
at Microsoft.ServiceBus.Common.Parallel.TaskHelpers.EndAsyncResult(IAsyncResult asyncResult)
at Microsoft.ServiceBus.Messaging.IteratorAsyncResult`1.StepCallback(IAsyncResult result)
Run Code Online (Sandbox Code Playgroud)
此异常与LeaseLostException同时发生,当它尝试检查点时,从MyEventProcessor的CloseAsync方法抛出.(由于ReceiverDisconnectedException,可能正在调用Close?)
我认为这是由于Event Hubs在扩展到多台机器时的自动租赁管理而发生的.但我想知道我是否需要做一些不同的事情以使其更干净地工作并避免这些例外?例如:有时代的东西?
azureservicebus ×10
azure ×9
c# ×3
servicebus ×2
azure-acs ×1
azure-queues ×1
distributed ×1
namespaces ×1