标签: azureservicebus

为什么 Azure WebJob ServiceBus 默认反序列化 XML?

我有一个简单的 Azure WebJobs ServiceBusTrigger,看起来像

public static async void ProcessQueueMessage([ServiceBusTrigger("myqueuename")] String json, TextWriter log) { ... }
Run Code Online (Sandbox Code Playgroud)

不幸的是,它无法将 JSON 反序列化为 XML(不足为奇)。我检查了有效负载并确认它只是一个 UTF-8 编码的字节数组。我有两个问题。

  1. 为什么假设我的 String 是 XML?
  2. 我怎么告诉它不,没有 XML,只有一个字符串?

堆栈跟踪:

System.InvalidOperationException: Exception binding parameter 'json' ---> System.Runtime.Serialization.SerializationException: There was an error deserializing the object of type System.String. The input source is not correctly formatted. ---> System.Xml.XmlException: The input source is not correctly formatted.
 at System.Xml.XmlExceptionHelper.ThrowXmlException(XmlDictionaryReader reader, String res, String arg1, String arg2, String arg3)
 at System.Xml.XmlBufferReader.ReadValue(XmlBinaryNodeType nodeType, ValueHandle value)
 at System.Xml.XmlBinaryReader.ReadNode() …
Run Code Online (Sandbox Code Playgroud)

azureservicebus azure-webjobs azure-webjobssdk

5
推荐指数
1
解决办法
4175
查看次数

如何检测kafka主题中的重复消息?

嗨,我有一个类似于下图所示的架构。

我有两个 kafka 生产者,它们将向 kafka 主题发送频繁重复消息的消息。

有没有一种方法可以以简单的方式处理这种情况,例如服务总线主题。

感谢您的帮助。

在此处输入图片说明

azureservicebus apache-kafka kafka-consumer-api kafka-producer-api apache-kafka-streams

5
推荐指数
1
解决办法
9612
查看次数

了解 Azure Function 应用和服务总线成本

我正在尝试使用 Azure 服务总线主题和 Azure 函数应用程序创建一个发布/订阅系统,其中的函数使用 ServiceBusTrigger 进行修饰。我正在为 Function 应用程序使用服务总线和消耗计划的标准层。

这个概念运行良好,可以满足我的目的,但是,我很难理解这将如何影响我的 Azure 成本。

仅运行一个具有该属性的函数似乎每秒向服务总线生成一对请求。最后,我希望有很多带有订阅的主题和每个订阅的 Azure 函数。

如果我理解正确的话,ServiceBusTrigger 生成的每个请求都计入您获得的服务总线操作数。通过一些功能,我将很容易地超过 1300 万次操作。

其中一些订阅者对时间不敏感,以至于他们需要每秒多次检查订阅。有没有办法改变 ServiceBusTrigger 检查订阅的频率以降低成本?我是否从完全错误的角度处理这个问题?

azure azureservicebus azure-functions

5
推荐指数
0
解决办法
529
查看次数

.Net Core 中的 Azure 服务总线 - 如何只接收一条消息

我想在 .Net Core 上使用 Azure 服务总线并且只阅读一条消息。似乎Microsoft.Azure.ServiceBus不支持这种情况。到目前为止,测试代码让我走到了这一步:

public void ReceiveOne()
{
    var queueClient = new QueueClient(ServiceBusConnectionString, "go_testing");

    queueClient.RegisterMessageHandler(
        async (message, token) =>
        {
            var messageBody = Encoding.UTF8.GetString(message.Body);
            Console.WriteLine($"Received: {messageBody}, time: {DateTime.Now}");
            await queueClient.CompleteAsync(message.SystemProperties.LockToken);

            await queueClient.CloseAsync();
        },
        new MessageHandlerOptions(async args => Console.WriteLine(args.Exception))
        { MaxConcurrentCalls = 1, AutoComplete = false });
}
Run Code Online (Sandbox Code Playgroud)

所以我在成功阅读一条消息后关闭队列。它有效,但也会触发异常。

你知道更好的方法来实现它吗?

queue azure azureservicebus .net-core

5
推荐指数
2
解决办法
3568
查看次数

Azure 服务总线“ReceiveAsync”

有没有办法使用Microsoft.Azure.ServiceBus包来等待当前线程从队列中接收消息?

这可能更多地是我的理解和希望以不打算使用的方式使用该技术的问题,但我想做的是结合以下 Microsoft 示例中的发送和接收示例,以便您可以将消息发送到各种队列,并能够侦听和处理“回复”(只是您在队列中收听的消息)并在您完成接收消息后关闭连接。

这里有一些伪代码:

   // send message(s) that will be consumed by other processes / applications, and by doing so later on we will expect some messages back
   await SendMessagesAsync(numberOfMessages);

    var receivedMessages = 0;
    while (receivedMessages < numberOfMessages)
    {
        // there is no "ReceiveAsync" method, this is what I would be looking for
        Message message = await queueClient.ReceiveAsync(TimeSpan.FromSeconds(30));
        receivedMessages++;

        // do something with the message here
   }

   await queueClient.CloseAsync();
Run Code Online (Sandbox Code Playgroud)

这是可能的还是我“做错了”?

c# azure azureservicebus azure-servicebus-queues

5
推荐指数
1
解决办法
4521
查看次数

使用 AWS SQS 并发布到 Azure 服务总线

我需要一块“云胶”来从 AWS SQS 队列读取消息并将结果推送到 Azure 服务总线。我喜欢 Azure 逻辑应用,但它们没有 SQS 连接器。我可以在 Azure 函数中手动滚动代码,但是在 Azure 世界中是否有任何预构建的解决方案来使用 SQS?

amazon-sqs azureservicebus azure-logic-apps

5
推荐指数
1
解决办法
1785
查看次数

轨道交通。使用不同命名空间中定义的相同对象

首先,请原谅我的英语,非常糟糕。我将 MassTransit 与 Azure 服务总线一起用于微服务之间的异步通信。根据它们自己的定义,为了避免它们之间产生依赖关系,不同微服务之间发送的消息被定义在每个微服务中,即它们是不同命名空间的一部分。MassTransit 的自动管理导致队列和主题按对象类型进行管理,从而阻止消费消息的微服务接收微服务发布者发送的消息。同样的事情发生在同一个命名空间中具有相同属性但具有不同类名的两个类。

有没有办法解决这个问题?我想到的选项是:

  • 从目标地址的端点删除命名空间,仅使用类的名称命名。
  • MassTransit 可以根据对象的序列化来管理队列和主题的创建,而不是根据对象类型来管理它(也许通过某种类型的包装对象?)

我留下一个例子,希望可以帮助您理解问题。

//FIRST PROGRAM - MESSAGE CONSUMER 

namespace Consumer
{
    public class Example
    {
        public string PropOne { get; set; }

        public string PropTwo { get; set; }
    }

    public class ExampleConsumer : 
        IConsumer<Example>
    {
        public List<Example> ConsumedTestObjectList { get; } = new List<Example>();

        //THIS METHOD NEVER CALL !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
        public Task Consume(ConsumeContext<ExampleConsumer> context)
        {
            ConsumedTestObjectList.Add(context.Message);
            return Task.CompletedTask;
        }
    }

    public class ConsumerProgram
    {
        public static void Main()
        {
            var bus = …
Run Code Online (Sandbox Code Playgroud)

ipc masstransit azureservicebus microservices .net-core

5
推荐指数
1
解决办法
1513
查看次数

具有部署槽的函数应用上的 ServiceBusTrigger

我有一个带有部署槽的功能应用程序,用于开发测试(例如蓝色/绿色)

其中一项功能设置为ServiceBusTrigger绑定到服务总线队列。

希望开发部署槽中的函数被服务总线中的队列元素绊倒。我已在 Azure 门户的部署槽中禁用了此功能。什么是正确的方法?

编辑:在开发槽中禁用该功能是不好的,因为一旦槽被交换,该功能在生产中就被禁用了。我目前的解决方法是创建一个全新的“开发”服务总线,并将它的连接字符串用于开发槽。

azure azureservicebus azure-servicebus-queues azure-functions

5
推荐指数
1
解决办法
533
查看次数

如何为 Azure 服务编写 MassTransit Json 解串器

这是我将对象发布到事件网格的方式。我希望能够使用azure服务总线来收听它。

        public void Publicar<T>(T model, string operation, string entity)
    {
        _nomeEvento = entity + operation;

        Boolean.TryParse(Configuration["EventGridConfig:Enabled"], out var eventGridIsActive);
        if (!eventGridIsActive)
            return;

        var primaryTopicKey = Configuration["EventGridConfig:AcessKey"];
        var primaryTopic = Configuration["EventGridConfig:Endpoint"];

        var primaryTopicHostname = new Uri(primaryTopic).Host;

        var topicCredentials = new TopicCredentials(primaryTopicKey);
        var client = new EventGridClient(topicCredentials);

        client.PublishEventsAsync(primaryTopicHostname, GetEventsList(model)).GetAwaiter().GetResult();
    }

    private List<EventGridEvent> GetEventsList<T>(T model)
    {
        return new List<EventGridEvent>
        {
            new EventGridEvent()
            {
                Id = Guid.NewGuid().ToString(),
                EventType = _nomeEvento,
                Data = model,
                EventTime = DateTime.Now,
                Subject = "MS_Clientes",
                DataVersion = "1.0",
            }
        }; …
Run Code Online (Sandbox Code Playgroud)

json azure azureservicebus json-deserialization azure-eventgrid

5
推荐指数
2
解决办法
2543
查看次数

Azure 服务总线触发函数 - 绑定到 MessageReceiver

我正在尝试在 Azure 服务总线触发函数中绑定到 MessageReceiver。我的目标是处理死信队列消息并完成它们。

public static class Function1
{
    [FunctionName("Function1")]
    public static async Task Run([ServiceBusTrigger("<topicName>", "<subscriptionName>/$DeadLetterQueue", Connection = "connectionstring")]Message message,
        ILogger logger,
        MessageReceiver messageReceiver)
    {
        // TODO: Perform some actions

        await messageReceiver.CompleteAsync(message.SystemProperties.LockToken);
    }
Run Code Online (Sandbox Code Playgroud)

问题是它无法绑定到 Message 接收器类。

Microsoft.Azure.WebJobs.Host:索引方法“Function1”时出错。Microsoft.Azure.WebJobs.Host:无法将参数“接收器”绑定到类型 MessageReceiver。确保绑定支持参数类型。如果您使用绑定扩展(例如 Azure Storage、ServiceBus、Timers 等),请确保您已在启动代码(例如 builder.AddAzureStorage()、builder.AddServiceBus() 中调用了扩展的注册方法)、builder.AddTimers() 等)。

为什么我绑定失败的任何想法?

c# azure azureservicebus .net-core azure-servicebus-topics

5
推荐指数
1
解决办法
756
查看次数