使用.NET Core SDK创建筛选的服务总线订阅

Ale*_*idt 4 azure azureservicebus .net-core

我想从.NET Core 2.1应用程序内部使用Azure Service Bus。我熟悉Nuget软件包随附的SDK,Microsoft.Azure.ServiceBus并使用它目前正在写一个主题并从那里接收消息。

我现在想要的是为每个客户端在此主题上创建订阅。关键是我想使用创建过滤订阅SqlFilter。这意味着,我必须通过代码创建预订。

顺便说一句,据我所知,在代码中执行此操作的另一种方法是使用ARM模板进行部署,因为门户不允许我创建过滤的订阅。

我知道,这Microsoft.Azure.ServiceBus无法创建资源,因此我去了Nuget包Microsoft.Azure.Management.ServiceBus。因此,我现在可以通过如下代码创建新的订阅:

private static async Task CreateSubscriptionAsync()
{
    var context = new AuthenticationContext($"https://login.microsoftonline.com/{Configuration["AppSettings:AzureManagement:TenantId"]}");
    var token = await context.AcquireTokenAsync(
        "https://management.core.windows.net/",
        new ClientCredential(Configuration["AppSettings:AzureManagement:ClientId"], Configuration["AppSettings:AzureManagement:ClientSecret"]));
    var creds = new TokenCredentials(token.AccessToken);
    using (var sbClient = new ServiceBusManagementClient(creds)
    {
        SubscriptionId = VariableHelper.Configuration["AppSettings:AzureManagement:AzureSubscriptionId"]
    })
    {                
        var queueParams = new SBSubscription
        {
            LockDuration = TimeSpan.FromSeconds(10)
        };          
        await sbClient.Subscriptions.CreateOrUpdateAsync(
            Configuration["AppSettings:ServiceBus:ResourceGroup"],
            Configuration["AppSettings:ServiceBus:Namespace"],
            Configuration["AppSettings:ServiceBus:TopicName"],
            "mysub",
            queueParams);                
    }
}
Run Code Online (Sandbox Code Playgroud)

这有效,因此我有一个新订阅,也可以删除。但是现在在哪里可以定义过滤器?该SBSubscription型不包含选项来定义过滤器,并且没有方法-过载sbClient.Subscriptions.CreateOrUpdateAsync

我发现在代码中执行此操作的唯一方法是基于旧的Nuget WindowsAzure.ServiceBus(不适用于.NET Core)。所以我会用NamespaceManager这样的:

var manager = NamespaceManager.CreateFromConnectionString("MyConnectionString");            
var rule = new RuleDescription("property > 4");
var sub = manager.CreateSubscriptionAsync(new SubscriptionDescription("mytopic", "mysub"), rule); 
Run Code Online (Sandbox Code Playgroud)

没有此功能,完整的主题对我来说似乎是无用的,而且我不敢相信这意味着Azure Service Bus根本无法为.NET Core做好准备。

编辑:解决方案,由Arunprabhu建议

我只是想完整地介绍解决方案。

  1. 添加对NuGet的引用Microsoft.Azure.Management.ServiceBus
  2. 添加对NuGet的引用Microsoft.Azure.ServiceBus
  3. 创建子,如下所示:
private static async Task CreateSubscriptionAsync()
{
    // this is to show that it works with any subscription-name
    var subName = Guid.NewGuid().ToString("N");     
    // create the subscription using Azure Management API
    var context = new AuthenticationContext($"https://login.microsoftonline.com/{Configuration["AppSettings:AzureManagement:TenantId"]}");
    var token = await context.AcquireTokenAsync(
        "https://management.core.windows.net/",
        new ClientCredential(Configuration["AppSettings:AzureManagement:ClientId"], Configuration["AppSettings:AzureManagement:ClientSecret"]));
    var creds = new TokenCredentials(token.AccessToken);
    using (var sbClient = new ServiceBusManagementClient(creds)
    {
        SubscriptionId = VariableHelper.Configuration["AppSettings:AzureManagement:AzureSubscriptionId"]
    })
    {                
        var queueParams = new SBSubscription
        {
            LockDuration = TimeSpan.FromSeconds(10)
        };          
        await sbClient.Subscriptions.CreateOrUpdateAsync(
            Configuration["AppSettings:ServiceBus:ResourceGroup"],
            Configuration["AppSettings:ServiceBus:Namespace"],
            Configuration["AppSettings:ServiceBus:TopicName"],
            subName,
            queueParams);                
    }
    // add filter-rule using Azure ServiceBus API
    var client = new SubscriptionClient(ServiceBusConnectionString, Configuration["AppSettings:ServiceBus:TopicName"], subName);
    await client.AddRuleAsync("default", new SqlFilter("1=1"));
    // That's it        
}
Run Code Online (Sandbox Code Playgroud)

Sea*_*man 5

我了解到Microsoft.Azure.ServiceBus无法创建资源

Microsoft.Azure.ServiceBus的 3.1.0-preview版本开始,您可以使用ManagementClientMicrosoft.Azure.Management.ServiceBus库中的对象来创建实体。它有一个重载CreateSubscriptionAsync(),将RuleDescription其作为默认规则创建。或者,不提供RuleDescription将自动为您设置默认规则。

  • 这是一个[示例](https://github.com/Particular/NServiceBus.Transport.AzureServiceBus/blob/develop/src/Transport/Administration/QueueCreator.cs#L44)。从概念上讲,它与旧的NamespaceManager没有什么不同。 (3认同)

Cha*_*ock 2

由于 Sean Feldman 的示例链接现已损坏,因此详细阐述了 Sean Feldman 的答案,您可以执行以下操作:

public static async Task CreateSubscription(
        string connection,
        string topicPath,
        string subscriptionName,
        string matchExpression,
        string ruleDescription,
        bool checkForExisting = false,
        CancellationToken cancellationToken = default(CancellationToken))
{
    if (checkForExisting)
    {
        var exists = (await new ManagementClient(connection)
            .GetSubscriptionsAsync(topicPath, cancellationToken: cancellationToken))
            .Any(sub => sub.SubscriptionName == subscriptionName);

        if (exists) return;
    }

    var subscriptionDescription = new SubscriptionDescription(topicPath, subscriptionName);
    var rule = new RuleDescription(ruleDescription, new SqlFilter(matchExpression));
    await new ManagementClient(connection).CreateSubscriptionAsync(subscriptionDescription, 
        rule, 
        cancellationToken);
}
Run Code Online (Sandbox Code Playgroud)

为了简单起见,我删除了强制性的输入验证,例如空字符串或无效字符。我还提供了一种检查订阅是否已存在的方法。理想情况下,您只需执行一次该检查,然后在内存中保留一个正在运行的选项卡,这样您就不会每次都执行该 API 调用。

对于您来说matchExpression,您可以根据发布消息时添加到消息中的用户属性,使用 SQLish 语法定义该订阅的过滤器。实现看起来像这样:

await CreateSubscription(
    Configuration.GetConnectionString("MyServiceBusConnectionString"),
    "myTopicName",
    "mySubscription",
    "MyUserProperty='MyValue'",
    "MyRule",
    true
)
Run Code Online (Sandbox Code Playgroud)