有没有办法获取Azure主题订阅的当前消息计数?
我看到SubscriptionDescription类具有MessageCount属性,但此类似乎仅用于创建订阅.我没有看到为现有订阅检索SubscriptionDescription对象的方法.
我正在编写一个使用Microsoft.ServiceBus.dll 1.0.0.0(.NET 3.5版本)的POC应用程序.
我的WCF合同和服务如下所示:
[ServiceContract(Name="MyServiceContract", Namespace = "http://mydomain.com/")]
internal interface IServiceContract
{
[WebInvoke(Method = "POST", UriTemplate = "/DoOperation")]
[OperationContract]
Stream RelayRequest(Stream requestBody);
}
[ServiceBehavior(Name = "Service1", Namespace = "http://mydomain.com/Service1/", InstanceContextMode = InstanceContextMode.Single)]
internal class Service1 : IServiceContract
{
Stream RelayRequest(Stream requestBody)
{
var contents = GetJsonResponse();
var responseStream = new MemoryStream();
var streamWriter = new StreamWriter(responseStream);
streamWriter.AutoFlush = true;
var writer = new JsonTextWriter(streamWriter);
var serializer = new JsonSerializer();
serializer.Serialize(responseStream, contents);
responseStream.Position = 0 // reset the position of the …Run Code Online (Sandbox Code Playgroud) 我试图在将BrokeredMessage.Properties集合属性发送到Azure Service Bus之前将其添加到集合中.自定义属性类型是枚举:
[Serializable, DataContract]
public enum FooBar
{
[EnumMember]
Foo = 0,
[EnumMember]
Bar = 1
}
Run Code Online (Sandbox Code Playgroud)
我也尝试过各种各样的属性组合,以及没有属性的版本.
这是添加属性并发送消息的代码:
var brokeredMessage = new BrokeredMessage(new MessageObject(){ //etc });
brokeredMessage.Properties.Add("FooBar", FooBar.Foo);
queueClient.Send(brokeredMessage);
Run Code Online (Sandbox Code Playgroud)
尝试发送消息时返回以下错误:
System.Runtime.Serialization.SerializationException : Serialization operation failed due to unsupported type Namespace.FooBar.
Run Code Online (Sandbox Code Playgroud)
我试图找到一些关于BrokeredMessage.Properties限制的详细文档(如果有的话),并且找不到任何指定只能使用基本类型的文档.
关于为什么这不起作用的任何想法?
编辑:
应该说我使用的是Microsoft.ServiceBus的V2.1.0.0.
我正在写一段代码,这将允许我们:
前2分我没有问题; 使用Peek接收模式,我可以显示消息列表,我们可以编辑和重新发送,没有任何问题.
当我想从死信队列中删除消息时,问题就来了.
我们如何通过消息级别对消息执行此操作?我们可能只想删除驻留在死信队列中的2条消息,并保留其他消息以便稍后查看.调用.Complete()死信队列中的消息是否像在主订阅中那样删除它?
以供参考; 这是我们获取SubscriptionClient死信队列的代码:
private SubscriptionClient GetOrCreateSubscriptionClient(string connectionString)
{
if (!NamespaceManager.TopicExists(_topicName))
{
NamespaceManager.CreateTopic(new TopicDescription(_topicName)
{
MaxSizeInMegabytes = 5120,
DefaultMessageTimeToLive = TimeSpan.FromSeconds(DEFAULT_LOCK_DURATION_IN_SECONDS)
});
}
if (!NamespaceManager.SubscriptionExists(_topicName, _subscriptionName))
{
NamespaceManager.CreateSubscription(_topicName, _subscriptionName);
}
var deadLetterPath = SubscriptionClient.FormatDeadLetterPath(_topicName, _subscriptionName);
var client = SubscriptionClient.CreateFromConnectionString(
connectionString, deadLetterPath, _subscriptionName, ReceiveMode.PeekLock);
return client;
}
Run Code Online (Sandbox Code Playgroud) 我最近创建了一些分区队列,从我发送和接收的所有时间.他们过去没有分区,没有我当前的任何问题.
问题是,队列中有一些我无法接收的消息.在没有向队列发送任何其他消息时我总是得不到任何回报,但是当我向队列发送消息时,我收到新消息没有问题.卡在队列中的消息是活动消息,而不是死信.
我怀疑他们被困在一个特定的分区,但我不知道如何接收它们.
由于我无法接收这些消息,有没有办法重置ServiceBus队列?
有任何想法吗?
更新:
数量不是恒定的,而是非常缓慢地增加.在我们的测试环境中,我们每小时收到的消息少于2000条,并且在重置为0(零)后,队列在过去12小时内每次收到大约20条消息.那些消息就在那里但不可接收.至少不是以通常的方式.
分区后,问题仅出现在两个名称空间中.问题不在我们不使用分区队列的实时环境中.
我试图在我们的ASP.NET MVC网站上制作类似facebook的通知系统
在一个场景中,通知系统的工作方式如下
NotificationItem到NotificationManagervia API请求.POST api/notifications/send
NotificationManager 然后处理此notificationItem,然后将其保存到Azure表存储Run Code Online (Sandbox Code Playgroud)class NotificationManager { void SaveNotification(NotificationItem item) { // save it to azure table storage } }
保存项目后,客户端(User2)然后通过NotificationHub(SignalR集线器)订阅notificationEvent
NotificationHub然后通知User2以及处理过的通知数据.
Run Code Online (Sandbox Code Playgroud)class NotificationHub: Hub { async Task NotifyUser(string recipientId) { // query data from storage and then process it var notificationData= await _repo.GetProcessedNotificationDataAsync(recipientId); Clients.Group(recipientId).notifyUser(notificationData); } }
我试图在这个图像上说明CURRENT过程和架构

现在,困扰我的是第5步中的这行代码
var notificationData= await _repo.GetProcessedNotificationDataAsync(recipientId);
Run Code Online (Sandbox Code Playgroud)
它背后的作用是查询notificationItem存储,将其处理为用户可读通知(例如"User1现在跟随你"),然后更新notificationItem后面的(IsSent,DateSent)状态.
不用说,它以某种方式执行"重"操作.而且它会实时每次有交付或广播给每个用户一个新的NotificationItem时间被触发.
显然,我在这里谈论性能和可伸缩性问题.所以我研究了一些可以解决这个问题的技术或方法.并且似乎使用Azure Service Bus背板方法是一个可行的选择
asp.net azure signalr azureservicebus azure-servicebus-queues
我们继续看到Azure Service Bus的不稳定性,并正在寻找替代方案.理想情况下,我们希望能够在Windows 2012 R2域中本地运行并具有许多与Azure SB相同的功能集.我们已经看过Windows 1.1 Service Bus,但该产品暂时没有更新,我们不确定它的未来.我们使用C#,因此理想情况下会有一个客户端API /包装器,它可以使我们与现有应用程序的集成相对容易.开源和免费软件是完全可以接受的.:-)
背景技术在几周之内,我们已经在我们选择托管队列的数据中心中关闭了SB.我们已经迁移到另一个数据中心,它偶尔也会出现问题.问题通常持续2分钟到几乎整整一天,最终微软让它们再次运行.
我在 Azure 中创建了一个 ServiceBus 命名空间,以及一个主题和一个订阅。我还有一个简单的 Azure 版本 1 函数,它触发 ServiceBus 中接收到的主题,如下所示:
[FunctionName("MyServiceBusTriggerFunction")]
public static void Run([ServiceBusTrigger("myTopic", "mySubscription", Connection = "MyConnection")]string mySbMsg, TraceWriter log)
{
log.Info($"C# ServiceBus topic trigger function processed message: {mySbMsg}");
}
Run Code Online (Sandbox Code Playgroud)
当我使用主题的共享访问策略在函数应用程序设置中定义连接字符串时,该函数可以很好地触发 ServiceBus 中的主题,如下所示:
Endpoint=sb://MyNamespace.servicebus.windows.net/;SharedAccessKeyName=mypolicy;SharedAccessKey=UZ...E0=
Run Code Online (Sandbox Code Playgroud)
现在,我想使用托管服务标识 (MSI) 来访问 ServiceBus,而不是共享访问密钥。根据这个(https://docs.microsoft.com/en-us/azure/active-directory/managed-service-identity/services-support-msi)应该是可能的,除非我误解了一些东西。我还没有设法让它工作。
我试过的是
该功能未在此设置中触发,所以我错过了什么或我做错了什么?如果您有任何建议可以帮助我走得更远,我将不胜感激。谢谢。
triggers azureservicebus azure-functions azure-managed-identity
我有一个带有服务总线触发器的Azure功能:
public static async Task Run([ServiceBusTrigger(
"%inputTopicName%",
"%subscriptionName%",
AccessRights.Manage,
Connection = "connection")]string mySbMsg)
Run Code Online (Sandbox Code Playgroud)
在99.9%的调用中,触发器成功解析为Azure Service Bus上的订阅.但有时,我在日志中看到以下错误:
Microsoft.Azure.WebJobs.Host.FunctionInvocationException: Exception while executing function: UptimeChecker ---> System.ArgumentException: The argument connectionString is null or white space.
Parameter name: connectionString
at Microsoft.Azure.WebJobs.Host.Executors.FunctionExecutor.FunctionInvocationFilterInvoker.d__9.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Microsoft.Azure.WebJobs.Host.Executors.FunctionExecutor.d__24.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Microsoft.Azure.WebJobs.Host.Executors.FunctionExecutor.d__23.MoveNext()
--- …Run Code Online (Sandbox Code Playgroud) 嗨,我有一个用于创建带有主题和订阅的ServiceBus的ARM模板。但是我只能完成1个主题-1个订阅,因为我无法进行嵌套循环来为每个主题创建多个订阅。
我希望我可以执行这样的模板:
参数:
{
"serviceBusName": "mybus",
"topics":
[
{
"topicName": "mytopic1",
"subscriptions": [ "mysubscription1", "mysubscription2"]
},
{
"topicName": "mytopic2",
"subscriptions": [ "mysubscription1"]
}
]
}
Run Code Online (Sandbox Code Playgroud)
这是我的实际模板:
{
"$schema": "http://schema.management.azure.com/schemas/2014-04-01-preview/deploymentTemplate.json#",
"contentVersion": "1.0.0.0",
"parameters": {
"ServiceBusNamespaceName": {
"type": "string"
},
"ServiceBusSku": {
"type": "string",
"allowedValues": [
"Basic",
"Standard"
],
"defaultValue": "Standard"
},
"ServiceBusSmallSizeTopicInMb": {
"type": "int",
"defaultValue": 1024
},
"ServiceBusMaxSizeTopicInMb": {
"type": "int",
"defaultValue": 1024
},
"Topics": {
"type": "array"
}
},
"variables": {
"DefaultSASKeyName": "RootManageSharedAccessKey",
"DefaultAuthRuleResourceId": "[resourceId('Microsoft.ServiceBus/namespaces/authorizationRules', parameters('ServiceBusNamespaceName'), variables('DefaultSASKeyName'))]",
"SbVersion": "2017-04-01" …Run Code Online (Sandbox Code Playgroud) servicebus azure azureservicebus azure-resource-manager azure-servicebus-topics
azureservicebus ×10
azure ×8
c# ×4
.net ×1
asp.net ×1
dead-letter ×1
queue ×1
servicebus ×1
signalr ×1
triggers ×1
wcf ×1
wcf-rest ×1