标签: azureservicebus

Azure 服务总线:一次处理 1 条消息

现在,我们使用 Azure 服务总线来触发我们的函数应用程序。传入消息的速率非常高,但我们希望一次仅触发函数应用程序一条消息(按照我们自己的节奏进行处理,而不是处理太多请求错误)。如何配置服务总线一次只向接收者发送一条消息?这是我们的基本功能应用程序代码:

    [FunctionName("ServiceBusListener")]
    public void Run([ServiceBusTrigger("myqueue", Connection = "MyConnection")]string queueContent, ILogger log)
    {
        _myService.ProcessMessage(queueContent);
        log.LogInformation($"C# ServiceBus queue trigger function processed message: {queueContent}");
    }
Run Code Online (Sandbox Code Playgroud)

c# azure azureservicebus .net-core azure-functions

1
推荐指数
1
解决办法
2213
查看次数

持久的用户模式与天蓝色的服务总线

我希望将Azure Service Bus与主题一起使用,但需要处理订阅者可能没有收听其感兴趣的消息的情况(例如,服务器正在重新启动等).这是此处描述的典型持久订户模式http://www.eaipatterns.com/DurableSubscription.html.

我无法解决的是如何将此应用于Azure Service Bus,我似乎无法在文档中找到任何示例或讨论.这是Azure服务总线提供的内容还是应该开始寻找Azure Service Bus的替代方案?

azure azureservicebus

0
推荐指数
1
解决办法
866
查看次数

使用Azure Service Bus的NServiceBus,始终检查命名空间和队列的存在

我使用NServiceBus,使用Azure Service Bus作为传输.每次发送邮件时,我都会在日志中看到以下内容:

Checking existence cache for '...'
Checking namespace for existance of the queue '...'
Determined that the queue '...' exists
Queue '...' already exists, skipping creation
Run Code Online (Sandbox Code Playgroud)

尽管日志中有噪音(我可以过滤掉这些消息,但除此之外),我担心每次检查的开销.如何配置NServiceBus以期望这些命名空间和队列存在并停止检查每条消息?

nservicebus azure azureservicebus

0
推荐指数
1
解决办法
203
查看次数

Azure Service Bus Queue将NodeJ中的消息发送到.NET客户端

我正在从C#worker向Queue发送一条消息,然后我在另一个C#worker上调用它

string body = message.GetBody<string>();

这工作,我后来反序列化字符串/ JSON消息.

现在我试图以JSON消息的形式从NodeJS发送相同的消息.当我试图接收它时

string body = message.GetBody<string>();

我称之为输入格式不正确的例外情况.

我在NodeJS上的消息对象看起来像这样

{
  body: JSON.stringify(message)
}
Run Code Online (Sandbox Code Playgroud)

有任何想法吗?

c# azure node.js azureservicebus azure-servicebus-queues

0
推荐指数
1
解决办法
1016
查看次数

服务总线队列触发器,Azure函数多次调用同一消息

我有两个azure函数,一个是http函数,另一个是队列触发函数.在http函数中,我在队列中添加了一个消息,并在队列触发器中处理它.当我在队列触发器中记录消息时,有时它会使用相同的消息多次触发?会是什么原因?

提前致谢

azure azureservicebus azure-servicebus-queues azure-functions

0
推荐指数
1
解决办法
521
查看次数

如何为 Azure 服务总线中的每条损坏的消息重置“传递计数”?

实时,我几乎没有天蓝色的服务总线队列。每个队列都有自己的默认消息传递计数限制。当它超过传递计数时,消息将移动到“死信队列”。

现在我必须将 DeadLetter 消息重新发布到原始队列以进行重新处理。由于传递计数,它将消息推回死信。

我不想更改队列最大传送计数。现在,我如何重置每条损坏消息的传递计数?

azure azureservicebus

0
推荐指数
1
解决办法
2836
查看次数

带有 AMQP 的 Azure 服务总线 - 如何指定会话 ID

我正在尝试使用 AMQP QPID java 库将消息发送到服务总线

我收到此错误:

“需要将所有代理消息的 SessionId 设置为支持排序的分区主题”

我的主题打开了“强制消息排序”(我猜这是我收到此错误的方式)

当使用 Azure 服务总线 java 库(而不是 AMQP)时,我有这个功能:

this.entity.setSessionId(...);

使用 AMQP 库时,我没有看到在要发送的消息上设置会话 ID 的选项

请注意,如果我取消选中“强制消息排序”选项,消息将成功发送

这是我的代码

private boolean sendServiceBusMsg(MessageProducer sender,Session sendSession) {

        try {
            // generate message

            BytesMessage createBytesMessage = (BytesMessage)sendSession.createBytesMessage();

            createBytesMessage.setStringProperty(CAMPAIGN_ID, campaignKey);             
            createBytesMessage.setJMSMessageID("ID:" + bm.getMessageId());                                                    
      createBytesMessage.setContentType(Symbol.getSymbol("application/octet-stream"));

            /*message is the actual data i send / not seen here*/
            createBytesMessage.writeBytes(message.toByteArray());

            sender.send(createBytesMessage);

        } catch (JMSException e) {
    }
Run Code Online (Sandbox Code Playgroud)

java session azure amqp azureservicebus

0
推荐指数
1
解决办法
790
查看次数

这个异步/等待代码导致潜在的死锁?

我有以下课程:

public abstract class ServiceBusQueueService : IServiceBusQueueService
{
    private readonly string _sbConnect;

    protected ServiceBusQueueService(string sbConnect)
    {
        _sbConnect = sbConnect;
    }

    public async Task EnqueueMessage(IntegrationEvent message)
    {
        var topicClient = new TopicClient(_sbConnect, message.Topic, RetryPolicy.Default);
        await topicClient.SendAsync(message.ToServiceBusMessage());
    }
}
Run Code Online (Sandbox Code Playgroud)

正在使用的是这样的:

public ulong CreateBooking()
{
     // Other code omitted for brevity 
     ulong bookingId = 12345; // Pretend this id is generated sequentially on each call

      _bookingServiceBusQueueService.EnqueueMessage(new BookingCreatedIntegrationEvent
      {
            BookingId = bookingId
      }).GetAwaiter().GetResult();

      return bookingId;
}
Run Code Online (Sandbox Code Playgroud)

EnqueueMessage从我的CreateBooking方法调用该方法时,该程序挂起并且在点击该行之后不再继续await topicClient.SendAsync(message.ToServiceBusMessage());

现在代码工作,当我将调用更改为my …

.net c# multithreading async-await azureservicebus

0
推荐指数
1
解决办法
316
查看次数

使用新的标准库通过 WebSockets 通过 Amqp 连接到 Azure 服务总线

我正在尝试向以 azure 托管的队列发送消息。我的应用程序通过代理与世界通信。我使用 .Net Core 2.1 和新的标准 azure 服务总线 nuget。我需要通过端口 443 通过网络套接字使用 Amqp。不幸的是,我没有成功。有没有人实施过这样的事情?任何人都可以提供有关如何实现这一目标的代码片段吗?

提前致谢!这是我正在尝试的:

var transportProvider = new AmqpTransportProvider();
var amqpSettings = new AmqpSettings
{ 
        RequireSecureTransport = true, TransportProviders = {transportProvider} 
};

var socketSettings = new WebSocketTransportSettings()
{
      Proxy = new WebProxy() {Address = new Uri("http://myproxy"), BypassProxyOnLocal = false},
      SubProtocol = "https",
      Uri = new Uri($"wss://{_azureConfiguration.NameSpace}"),
};

AmqpTransportInitiator transportIntInitiator = new AmqpTransportInitiator(amqpSettings, socketSettings);
var b = transportIntInitiator.ConnectAsync(TimeSpan.FromSeconds(30), new TransportAsyncCallbackArgs());

ServiceBusConnection s = new ServiceBusConnection($"sb://{_azureConfiguration.NameSpace}", TransportType.AmqpWebSockets);
s.TokenProvider = tokenProvider;
s.OperationTimeout = …
Run Code Online (Sandbox Code Playgroud)

c# azureservicebus asp.net-core

0
推荐指数
1
解决办法
3760
查看次数

工人-无法选择最佳技术

我想创建一个微服务来并行处理我通过pushAzure Service Bus 获得的一些任务。如果任务成功完成,此微服务将通知Azure服务总线。请参见下图:

图

我已经考虑了以下三个选项:

  1. 托管服务
  2. Azure Web作业
  3. Azure批处理

由于先决条件之一就是该微服务必须位于.NET Core 3中,因此是否有使用Web Jobs代替Hosted Services的正当理由?哪个选项可确保鲁棒性和可扩展性?

worker azureservicebus microservices .net-core-3.0

0
推荐指数
1
解决办法
74
查看次数