标签: azure-queues

单个队列消息上有多个Azure WebJob功能?

我们的后端正在收集原始数据并将其推送到Azure存储队列.我们想对每个排队的消息两件事:

  1. 记录/存档它
  2. 解析它并将解析后的结果发送到新队列

为了保持小而清晰,我们希望有两个指向同一队列的WebJob函数:

    public static void ArchiveRawData([QueueTrigger("raw")] RawData data, [Blob("{Ticks}.dat")] out string raw)
    {
        raw = data.Data;
    }

    public static void ParseRawData([QueueTrigger("raw")] RawData data, [Queue("result")] out Parsed parsedData
    {
        var parsed = Parser.Parse(data.Data);
        parsedData = parsed;
    }
Run Code Online (Sandbox Code Playgroud)

但是,这并不工作:无论是在ArchiveRawData或ParseRawData得到消息,而不是其他.

是否有一个选项可以使上述场景有效?现在看来消息在第一个函数完成后自动出列(无论哪一个).但我认为WebJobs SDK可以检测到具有相同QueueTrigger的多个函数,并且只有在所有函数完成后才能使消息出列.

为了解决这个问题,我们目前在一个函数中有两个输出:

    public static void ParseRawData([QueueTrigger("raw")] RawData data, [Queue("result")] out Parsed parsedData, [Blob("{Ticks}.dat")] out string raw)
    {
        var parsed = Parser.Parse(data.Data);
        parsedData = parsed;
        raw = data.Data;
    } …
Run Code Online (Sandbox Code Playgroud)

c# azure-queues azure-webjobs azure-webjobssdk

4
推荐指数
1
解决办法
1628
查看次数

最大CloudQueueMessage有效负载大小问题

根据MSDN,消息有效载荷可以扩展到8KB(8192字节):

AddMessage方法将消息添加到队列的后面.消息最大可达8 KB.其内容必须采用可以使用UTF-8编码的格式.

但是,当在队列中添加消息时,我收到的消息的例外情况是,有效负载应该小于8192字节,魔术区域似乎大约是6500字节.我发送的数据是纯字符串,其大小是从.Length成员验证的,并且源是从源发送的长度验证的(CRLF分隔符有一个恒定的2字节差异).

所以我的问题是双重的:

1)是否有任何隐藏数据附加到消息有效负载上会导致其大小或导致这种奇怪的行为?(例如限制作为一个整体应用于对象,而不仅仅是它的有效载荷,但即便如此,它如何能够为每条消息占1.5KB?)

2)如何可靠地检查有效载荷是否确实低于8192?

还有一些额外的信息:我正在使用Azure SDK 1.4和VS 2010 Ultimate,运行计算和存储模拟器(我还没有部署这个应用程序)使用SQLExpress(我认为是2008).

还通过代码确认最大大小为8192字节(如果有一些额外的系统强制限制):

Trace.WriteLine("Max Queue Message Size: " + CloudQueueMessage.MaxMessageSize, "CloudQueueMessage");
Run Code Online (Sandbox Code Playgroud)

CloudQueueMessage:最大队列消息大小:8192

c# azure-storage azure-queues

3
推荐指数
1
解决办法
2208
查看次数

如何确定是否已处理Azure队列中的所有邮件?

我刚刚开始修改Windows Azure,并希望得到一个问题的帮助.

如何确定Windows Azure队列是否为空并且其中的所有工作项是否已处理?如果我有多个工作进程查询工作项队列,则如果队列为空,则GetMessage(s)不返回任何消息.但是无法保证当前不可见的消息不会被推回到队列中.

我需要此功能,因为我的工作流的后续行为取决于特定队列中所有工作项的完成.解决此问题的一种可能方法是计算放置和删除的数量.但这将再次要求在共享存储级别进行同步,我想尽可能避免它.

有任何想法吗?

azure azure-storage azure-queues

3
推荐指数
1
解决办法
2846
查看次数

Azure Webjobs SDK和队列处理错误

我正在研究一个webjob,它检查Web服务器是否有一些数据.有时Web服务器很忙,几秒钟后会再次尝试.

问题是,如果我只是忽略它并通过该函数,它将从队列中删除该消息,我将无法重试该调用.

我当前的解决方案(我根本不喜欢)是通过一个异常来增加dequque数量并将消息放回队列中.然而,这似乎非常残酷,并且需要运行webjob的线程才能重新启动.

有没有其他办法处理这个?

azure-queues azure-webjobs azure-webjobssdk

3
推荐指数
1
解决办法
2944
查看次数

如何设置消息时间以无限期地在天蓝色服务总线队列中生存?

我正在尝试使用azure-sdk-for-node创建azure服务总线队列,但无法找到资源来设置无限制生存时间。

这是我的示例代码:

var queueOptions = {MaxSizeInMegabytes:'5120',DefaultMessageTimeToLive:'PT1M'};

serviceBusService.createQueueIfNotExists('myqueue',queueOptions,function(error){if(!error){//队列存在}});

无限时间在DefaultMessageTimeToLive中会有什么?

servicebus azure azure-queues

3
推荐指数
1
解决办法
1876
查看次数

限制Azure Functions队列上的并发作业数

我在Azure中有一个Function项目,当项目放入队列时会触发该应用程序.它看起来像这样(大大简化):

public static async Task Run(string myQueueItem, TraceWriter log)
{
    using (var client = new HttpClient())
    {
        client.BaseAddress = new Uri(Config.APIUri);
        client.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));

        StringContent httpContent = new StringContent(myQueueItem, Encoding.UTF8, "application/json");
        HttpResponseMessage response = await client.PostAsync("/api/devices/data", httpContent);
        response.EnsureSuccessStatusCode();

        string json = await response.Content.ReadAsStringAsync();
        ApiResponse apiResponse = JsonConvert.DeserializeObject<ApiResponse>(json);

        log.Info($"Activity data successfully sent to platform in {apiResponse.elapsed}ms.  Tracking number: {apiResponse.tracking}");
    }
}
Run Code Online (Sandbox Code Playgroud)

这一切都很好,运行得很好.每次将项目放入队列时,我们都会将数据发送到我们这边的某个API并记录响应.凉.

当"产生队列消息的东西"出现大幅增加并且许多项目立即被放入队列时,就会出现问题.这往往会在一分钟内发生大约1,000到1,500件物品.错误日志将具有以下内容:

2017-02-14T01:45:31.692 mscorlib:执行函数时出现异常:Functions.SendToLimeade.f-SendToLimeade __- 1078179529:发送请求时发生错误.系统:无法连接到远程服务器.系统:通常只允许使用每个套接字地址(协议/网络地址/端口)123.123.123.123:443.

起初,我认为这是Azure功能应用程序运行本地套接字的问题,如此处所示.但是,我注意到了IP地址.IP地址123.123.123.123(当然在本例中已更改)是我们的IP地址,即HttpClient发布的IP地址.所以,现在我想知道是不是我们的服务器用完了套接字来处理这些请求.

无论哪种方式,我们都会遇到扩展问题.我正试图找出解决问题的最佳方法.

一些想法:

  1. 如果它是本地套接字限制,则上面文章中有一个使用增加本地端口范围的示例Req.ServicePoint.BindIPEndPointDelegate.这似乎很有希望,但是当你真正 …

azure azure-queues azure-functions

3
推荐指数
3
解决办法
7263
查看次数

Azure Queue Storage Triggered-Webjob - 一次将多个消息出列

我有一个Azure队列存储触发的Webjob.我的webjob执行的过程是将数据索引到Azure搜索中.Azure搜索的最佳实践是将多个项目一起索引,而不是一次索引一个,这是出于性能原因(索引可能需要一些时间才能完成).

出于这个原因,我希望我的webjob将多个消息一起出列,这样我就可以遍历,处理它们,然后将它们全部索引到Azure Search中.

但是,我无法弄清楚如何让我的webjob一次出列一个以上.如何实现这一目标?

azure-queues azure-webjobs

3
推荐指数
1
解决办法
741
查看次数

Azure WebJobs - 基于用户的单例 WebJob

我需要在计划的时间段内为不同的用户运行一项作业(例如 CRM 同步)。此同步由另一个函数放入 ServiceBus 队列上的消息触发,该函数每 30 分钟由 TimerTrigger 触发一次。

现在我需要的是避免每个用户作业并行运行多次,即如果前一个作业未完成,我需要阻止同一用户处理同一作业。

让我们想象一下:

  • 早上09:00,TimerTrigger函数被执行。该功能决定了用户A和用户B必须同步。对于每个用户,ServiceBus 队列上都会放置一条消息。
  • 一旦消息放入 ServiceBus 队列,就会触发不同的 Azure WebJob。作业已生成并正在运行。
  • 09:10 用户 A 同步作业完成,而用户 B 的同步作业仍在运行。假设它在 09:30 仍在运行。
  • 09:30,TimeTrigger函数再次执行。该函数再次确定用户A和B必须同步。队列上的消息被再次放入。
  • 由于这两条消息,函数再次被触发两次,但当前用户 B 有两个并发函数正在运行。

如何避免这种情况?

我是否必须手动实现 Azure 租赁 Blob 的逻辑?

谢谢。

azure azure-queues azure-webjobs

3
推荐指数
1
解决办法
1472
查看次数

Azure WebJobs:具有不同批处理大小的队列触发器

我在 azure 上有一个 WebJob,可以同时处理来自多个队列的消息:

public async static Task ProcessQueueMessage1([QueueTrigger("queue1")] string message)
    {


        switch (message.Substring(message.Length - 3, 3))
        {
            case "tze":
                await Parser.Process1(message);
                break;
            default:
                break;
        }
    }


    public async static Task ProcessQueueMessage2([QueueTrigger("queue2")] string message)
    {


        switch (message.Substring(message.Length - 3, 3))
        {
            case "tzr":
                await Parser.Process2(message);
                break;
            default:
                break;
        }
    }
Run Code Online (Sandbox Code Playgroud)

而在主

static void Main()
    {

        JobHostConfiguration config = new JobHostConfiguration();
        config.Queues.BatchSize = 3;
        config.Queues.MaxDequeueCount = 1;
        var host = new JobHost(config);
        host.RunAndBlock();

    }
Run Code Online (Sandbox Code Playgroud)

这里:message.Substring(message.Length - 3, 3)只检查文件的扩展名。

我的问题是,我将如何使 …

c# asp.net azure azure-queues azure-webjobs

3
推荐指数
1
解决办法
1159
查看次数

Azure 队列触发器立即达到 MaxDequeueCount

我设置了一个天蓝色队列触发器:

    [FunctionName("TransformData")]
    public async Task Transform(
        [QueueTrigger("product-prices")] string message)
    {
        await TransformAndLoadData(message);
    }
Run Code Online (Sandbox Code Playgroud)

每当我通过 QueueClient 将项目添加到队列时:
(RawQueueData 将项目拆分为消息批次)

    var rawQueueData = new RawQueueData<T>(data);

    var sendMessageTasks = rawQueueData.Messages
        .Select(m => _queueClient.SendMessageAsync(m));
    
    await Task.WhenAll(sendMessageTasks);
Run Code Online (Sandbox Code Playgroud)

或者将消息从product-prices-poison移回到product-prices队列(使用Azure存储资源管理器),触发器立即失败并显示:

消息已达到 MaxDequeueCount 5。将消息移至产品价格毒药队列。

同步传递项目也会产生同样的问题。

我唯一一次可以成功触发触发器并处理项目是当我使用 Azure 存储资源管理器手动创建消息时。

增加 MaxDequeueCount 或批量大小不会产生影响。执行前者后的消息是:

消息已达到 MaxDequeueCount 100000。正在将消息移至产品价格毒药队列。

我还可以使用 QueueClient 手动将项目出队,没有任何问题。


我还尝试将接收到的对象类型更改为 QueueMessage、对象和字符串。大多数其他解决方案似乎专注于更新软件包(我正在使用最新的 - 稳定版 12.8.0)。

编辑:host.json:

{
  "version": "2.0",
  "extensions": {
    "blobs": {
      "maxDegreeOfParallelism": "4"
    },
    "queues": {
      "maxDequeueCount": 5
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

.net azure azure-queues azure-functions azure-triggers

3
推荐指数
1
解决办法
2362
查看次数