我正在使用代码来实现 Azure 服务总线主题,可以在此处找到这些主题: https: //learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-how-to-使用主题订阅
我尝试运行订阅者程序的两个实例,其中包含以下方法:
private static void RegisterOnMessageHandlerAndReceiveMessages()
{
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
{
MaxConcurrentCalls = 1,
AutoComplete = false
};
_subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
}
private static async Task ProcessMessagesAsync(Message message, CancellationToken token)
{
Console.WriteLine($"Received #{message.SystemProperties.SequenceNumber} message: {Encoding.UTF8.GetString(message.Body)}");
await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
}
Run Code Online (Sandbox Code Playgroud)
但是,这不允许两个订阅者都收到该消息,而是会在两个订阅者之间分别收到一条消息。
如何确保两个订阅者都能收到来自总线的所有消息?
在这被标记为重复之前:我已经阅读了 SO 上的每个问题/答案以及上述错误消息,但没有一个解决了我的问题。我一定错过了一些简单的东西,因为应该简单的东西不起作用。
我创建了一个事件中心命名空间,其中包含“发送”共享访问策略和命名空间中的事件中心。
使用Python Event Hub SDK中的代码(在另一个答案中建议),我有以下脚本来创建Authorization标头:
import time
from base64 import b64encode, b64decode
from hashlib import sha256
from hmac import HMAC
from urllib.parse import quote_plus, urlencode
def generate_sas_token(uri, policy, policy_key, expiry_days=14):
expiry = time.time() + expiry_days * 60 * 60 * 24
encoded_uri = quote_plus(uri)
ttl = int(expiry)
sign_key = '{}\n{}'.format(encoded_uri, ttl)
signature = b64encode(HMAC(b64decode(policy_key), sign_key.encode('utf-8'), sha256).digest())
result = {
'sr': uri,
'sig': signature,
'se': str(ttl),
'skn': policy
}
return 'SharedAccessSignature ' + …Run Code Online (Sandbox Code Playgroud) Microsoft 在 NuGet 上有两个 Azure 服务总线包:
将此用于 Microsoft Azure 服务总线队列、主题、EventHub 和中继后端操作。
这是下一代 Azure 服务总线 .NET Standard 客户端库,专注于队列和主题
乍一听你应该选择WindowsAzure.ServiceBus. 包描述实际上是一个“使用它”用于队列的命令。但Microsoft.Azure.ServiceBus称自己为“下一代”。
GitHub 项目Microsoft.Azure.ServiceBus也不完全有帮助:自述文件链接到使用的代码示例WindowsAzure.ServiceBus(截至 2020 年 2 月)。
.net azure azureservicebus azure-servicebus-queues .net-core
我们如何强制 MassTransit 使用 HTTPS 而不是 AMQP 连接到 Azure 服务总线?
我们的应用程序位于企业防火墙后面。它需要从Azure主题发送/接收消息。我们更愿意使用 HTTPS 而不是 AMQP,这样更容易实现安全管理。
该应用程序有以下设置
从Azure服务总线文档中,我们应该能够将传输类型设置为AmqpWebSockets,它应该使用HTTPS进行通信。连接字符串如下所示:
Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=secret;TransportType=AmqpWebSockets
Run Code Online (Sandbox Code Playgroud)
(参考: https: //github.com/MicrosoftDocs/azure-docs/issues/14133)
如果我们使用TopicClientfrom Microsoft.Azure.ServiceBus 4.1.1,我们可以看到流量通过端口 443。示例代码是
class Program
{
public static async Task Main(string[] args)
{
var serviceBus = new ServiceBus();
await serviceBus.SendMessageAsync();
Console.ReadKey();
}
}
public class ServiceBus
{
public async Task SendMessageAsync()
{
const int numberOfMessages = 10;
topicClient = new TopicClient(ServiceBusConnectionString, TopicName);
// Send messages.
await SendMessagesAsync(numberOfMessages);
Console.ReadKey();
await topicClient.CloseAsync();
} …Run Code Online (Sandbox Code Playgroud) 据我所知,Azure 服务总线不支持 DTC,如果您尝试这样做,您会收到如下异常:“其他资源管理器/DTC 不支持本地事务。” '
我的问题是,我需要向服务总线发送消息,并且代码可能与可能的数据库操作一起在事务范围内执行。但服务总线不需要特别成为该事务的一部分;因此,这里并不真正需要 DTC。然而,服务总线客户端似乎自动参与环境事务,从而将该事务提升为 DTC。
例子:
这运行正确(服务总线代码是事务中唯一的代码):
using (var tx = new TransactionScope())
{
//A simple Azure Bus operation
var builder = new ServiceBusConnectionStringBuilder(connectionString);
var queueClient = new QueueClient(builder);
var messageBody = new Message(Encoding.UTF8.GetBytes("Hello"));
messageBody.MessageId = Guid.NewGuid().ToString("N");
queueClient.SendAsync(messageBody).GetAwaiter().GetResult();
tx.Complete();
}
Run Code Online (Sandbox Code Playgroud)
但从另一个系统参与(这里是 Sql 连接)的那一刻起,“ Azure 服务总线不支持 DTC ”-抛出异常:
using (var tx = new TransactionScope())
{
//A simple DB operation
SqlConnection sqlConnection = new SqlConnection(dbConnectionString);
sqlConnection.Open();
SqlCommand cmd = new SqlCommand("INSERT INTO [dbo].[Table_1]([Name]) values ('Hello')", sqlConnection);
cmd.ExecuteNonQuery();
//A …Run Code Online (Sandbox Code Playgroud) 我想使用 JMS 将计划消息发送到 Azure 服务总线。我的代码是基于org.apache.qpid.jms.message.JmsMessage. 我找到了针对给定问题的一种解决方案,但它使用org.apache.qpid.proton.message.Messagewhich has .getMessageAnnotations(),它允许编辑消息注释并添加一些由 Azure 服务总线正确识别和处理的属性。我的消息 impl 缺少该方法。
我在官方文档和node.js 中的实现中发现,要使用Azure 服务总线安排消息,您需要发送BrokerProperties/brokerProperties具有有效json 的标头。其他标头/属性将被标记为Customer properties并被 Azure 服务总线忽略。
关于 JMS 的官方 azure 文档ScheduledEnqueueTimeUtc表示JMS API 不正式支持该设置。但可以通过手动设置属性来实现。
因此,当我将消息发送到队列时,我可以在 lambda 中对其进行后期处理并设置一些属性:
jmsTemplate.convertAndSend(queue, payload, message -> {
var date = Date.from(ZonedDateTime.now(ZoneId.of("UTC")).plus(delay, ChronoUnit.MILLIS).toInstant());
var brokerProps = Map.of("ScheduledEnqueueTimeUtc", date.toGMTString());
message.setStringProperty(
"brokerProperties",
objectMapper.writeValueAsString(brokerProps)
);
return message;
});
Run Code Online (Sandbox Code Playgroud)
但这不起作用。消息到达队列,但当我尝试在 Azure 上的服务总线资源管理器上查看它时,它会在浏览器控制台中抛出错误,并且该操作将永远持续。我想设置该属性brokerProperties会对服务总线产生一些影响。我还尝试发送带有日期作为字符串的地图(带有Azure使用的日期格式)"ScheduledEnqueueTimeUtc", "Thu, 25 Mar 2021 12:54:00 GMT",但它也被服务总线识别为错误(窥视永远持续,并在浏览器控制台中引发错误) 。
我尝试设置类似x-opt-scheduled-enqueue-time或x-ms-scheduled-enqueue-time …
基于此 Microsoft 文档: https://learn.microsoft.com/en-us/rest/api/eventhub/generate-sas-token
Node.js 的代码是:
function createSharedAccessToken(uri, saName, saKey) {
if (!uri || !saName || !saKey) {
throw "Missing required parameter";
}
var encoded = encodeURIComponent(uri);
var now = new Date();
var week = 60*60*24*7;
var ttl = Math.round(now.getTime() / 1000) + week;
var signature = encoded + '\n' + ttl;
var signatureUTF8 = utf8.encode(signature);
var hash = crypto.createHmac('sha256', saKey).update(signatureUTF8).digest('base64');
return 'SharedAccessSignature sr=' + encoded + '&sig=' +
encodeURIComponent(hash) + '&se=' + ttl + '&skn=' + saName;
}
Run Code Online (Sandbox Code Playgroud)
我正在 …
Azure 队列存储和 Azure 服务在死信队列和有害消息方面有什么区别?
我如何从这些队列中读取消息?
是否可以通过sdk在azure服务总线上创建队列?
与 RabbitMQ 相同
channel.QueueDeclare(queue: "" ....
Run Code Online (Sandbox Code Playgroud) 我按照此处列出的步骤进行操作,但对于 python 代码: https ://learn.microsoft.com/en-us/azure/azure-functions/functions-identity-based-connections-tutorial-2
目标是创建一个简单的 (hello world) 函数应用程序,该应用程序由使用基于身份的连接的 Azure 服务总线消息队列触发。当通过连接字符串引用 ASB 时,函数应用程序工作正常,但在尝试通过函数应用程序的托管服务标识进行连接时会出现此错误(使用特定配置模式 __filledQualifiedNamespace)。MSI 已被授予 ASB 上的角色(Azure 服务总线数据接收器)。
Microsoft.Azure.WebJobs.ServiceBus: Microsoft Azure WebJobs SDK ServiceBus connection string 'ServiceBusConnection__fullyQualifiedNamespace' is missing or empty.
功能代码(自动生成)
import logging
import azure.functions as func
def main(msg: func.ServiceBusMessage):
logging.info('Python ServiceBus queue trigger processed message: %s',
msg.get_body().decode('utf-8'))
Run Code Online (Sandbox Code Playgroud)
function.json(根据ms文档修改的连接值)
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "msg",
"type": "serviceBusTrigger",
"direction": "in",
"queueName": "erpdemoqueue",
"connection": "ServiceBusConnection"
}
]
}
Run Code Online (Sandbox Code Playgroud)
host.json(基于ms文档修改的版本)
{
"version": "2.0",
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle", …Run Code Online (Sandbox Code Playgroud)