我们的后端正在收集原始数据并将其推送到Azure存储队列.我们想对每个排队的消息做两件事:
为了保持小而清晰,我们希望有两个指向同一队列的WebJob函数:
public static void ArchiveRawData([QueueTrigger("raw")] RawData data, [Blob("{Ticks}.dat")] out string raw)
{
raw = data.Data;
}
public static void ParseRawData([QueueTrigger("raw")] RawData data, [Queue("result")] out Parsed parsedData
{
var parsed = Parser.Parse(data.Data);
parsedData = parsed;
}
Run Code Online (Sandbox Code Playgroud)
但是,这并不工作:无论是在ArchiveRawData或ParseRawData得到消息,而不是其他.
是否有一个选项可以使上述场景有效?现在看来消息在第一个函数完成后自动出列(无论哪一个).但我认为WebJobs SDK可以检测到具有相同QueueTrigger的多个函数,并且只有在所有函数完成后才能使消息出列.
为了解决这个问题,我们目前在一个函数中有两个输出:
public static void ParseRawData([QueueTrigger("raw")] RawData data, [Queue("result")] out Parsed parsedData, [Blob("{Ticks}.dat")] out string raw)
{
var parsed = Parser.Parse(data.Data);
parsedData = parsed;
raw = data.Data;
} …Run Code Online (Sandbox Code Playgroud) 根据MSDN,消息有效载荷可以扩展到8KB(8192字节):
AddMessage方法将消息添加到队列的后面.消息最大可达8 KB.其内容必须采用可以使用UTF-8编码的格式.
但是,当在队列中添加消息时,我收到的消息的例外情况是,有效负载应该小于8192字节,魔术区域似乎大约是6500字节.我发送的数据是纯字符串,其大小是从.Length成员验证的,并且源是从源发送的长度验证的(CRLF分隔符有一个恒定的2字节差异).
所以我的问题是双重的:
1)是否有任何隐藏数据附加到消息有效负载上会导致其大小或导致这种奇怪的行为?(例如限制作为一个整体应用于对象,而不仅仅是它的有效载荷,但即便如此,它如何能够为每条消息占1.5KB?)
2)如何可靠地检查有效载荷是否确实低于8192?
还有一些额外的信息:我正在使用Azure SDK 1.4和VS 2010 Ultimate,运行计算和存储模拟器(我还没有部署这个应用程序)使用SQLExpress(我认为是2008).
还通过代码确认最大大小为8192字节(如果有一些额外的系统强制限制):
Trace.WriteLine("Max Queue Message Size: " + CloudQueueMessage.MaxMessageSize, "CloudQueueMessage");
Run Code Online (Sandbox Code Playgroud)
CloudQueueMessage:最大队列消息大小:8192
我刚刚开始修改Windows Azure,并希望得到一个问题的帮助.
如何确定Windows Azure队列是否为空并且其中的所有工作项是否已处理?如果我有多个工作进程查询工作项队列,则如果队列为空,则GetMessage(s)不返回任何消息.但是无法保证当前不可见的消息不会被推回到队列中.
我需要此功能,因为我的工作流的后续行为取决于特定队列中所有工作项的完成.解决此问题的一种可能方法是计算放置和删除的数量.但这将再次要求在共享存储级别进行同步,我想尽可能避免它.
有任何想法吗?
我正在研究一个webjob,它检查Web服务器是否有一些数据.有时Web服务器很忙,几秒钟后会再次尝试.
问题是,如果我只是忽略它并通过该函数,它将从队列中删除该消息,我将无法重试该调用.
我当前的解决方案(我根本不喜欢)是通过一个异常来增加dequque数量并将消息放回队列中.然而,这似乎非常残酷,并且需要运行webjob的线程才能重新启动.
有没有其他办法处理这个?
我正在尝试使用azure-sdk-for-node创建azure服务总线队列,但无法找到资源来设置无限制生存时间。
这是我的示例代码:
var queueOptions = {MaxSizeInMegabytes:'5120',DefaultMessageTimeToLive:'PT1M'};
serviceBusService.createQueueIfNotExists('myqueue',queueOptions,function(error){if(!error){//队列存在}});
无限时间在DefaultMessageTimeToLive中会有什么?
我在Azure中有一个Function项目,当项目放入队列时会触发该应用程序.它看起来像这样(大大简化):
public static async Task Run(string myQueueItem, TraceWriter log)
{
using (var client = new HttpClient())
{
client.BaseAddress = new Uri(Config.APIUri);
client.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
StringContent httpContent = new StringContent(myQueueItem, Encoding.UTF8, "application/json");
HttpResponseMessage response = await client.PostAsync("/api/devices/data", httpContent);
response.EnsureSuccessStatusCode();
string json = await response.Content.ReadAsStringAsync();
ApiResponse apiResponse = JsonConvert.DeserializeObject<ApiResponse>(json);
log.Info($"Activity data successfully sent to platform in {apiResponse.elapsed}ms. Tracking number: {apiResponse.tracking}");
}
}
Run Code Online (Sandbox Code Playgroud)
这一切都很好,运行得很好.每次将项目放入队列时,我们都会将数据发送到我们这边的某个API并记录响应.凉.
当"产生队列消息的东西"出现大幅增加并且许多项目立即被放入队列时,就会出现问题.这往往会在一分钟内发生大约1,000到1,500件物品.错误日志将具有以下内容:
2017-02-14T01:45:31.692 mscorlib:执行函数时出现异常:Functions.SendToLimeade.f-SendToLimeade __- 1078179529:发送请求时发生错误.系统:无法连接到远程服务器.系统:通常只允许使用每个套接字地址(协议/网络地址/端口)123.123.123.123:443.
起初,我认为这是Azure功能应用程序运行本地套接字的问题,如此处所示.但是,我注意到了IP地址.IP地址123.123.123.123(当然在本例中已更改)是我们的IP地址,即HttpClient发布的IP地址.所以,现在我想知道是不是我们的服务器用完了套接字来处理这些请求.
无论哪种方式,我们都会遇到扩展问题.我正试图找出解决问题的最佳方法.
一些想法:
我有一个Azure队列存储触发的Webjob.我的webjob执行的过程是将数据索引到Azure搜索中.Azure搜索的最佳实践是将多个项目一起索引,而不是一次索引一个,这是出于性能原因(索引可能需要一些时间才能完成).
出于这个原因,我希望我的webjob将多个消息一起出列,这样我就可以遍历,处理它们,然后将它们全部索引到Azure Search中.
但是,我无法弄清楚如何让我的webjob一次出列一个以上.如何实现这一目标?
我需要在计划的时间段内为不同的用户运行一项作业(例如 CRM 同步)。此同步由另一个函数放入 ServiceBus 队列上的消息触发,该函数每 30 分钟由 TimerTrigger 触发一次。
现在我需要的是避免每个用户作业并行运行多次,即如果前一个作业未完成,我需要阻止同一用户处理同一作业。
让我们想象一下:
如何避免这种情况?
我是否必须手动实现 Azure 租赁 Blob 的逻辑?
谢谢。
我在 azure 上有一个 WebJob,可以同时处理来自多个队列的消息:
public async static Task ProcessQueueMessage1([QueueTrigger("queue1")] string message)
{
switch (message.Substring(message.Length - 3, 3))
{
case "tze":
await Parser.Process1(message);
break;
default:
break;
}
}
public async static Task ProcessQueueMessage2([QueueTrigger("queue2")] string message)
{
switch (message.Substring(message.Length - 3, 3))
{
case "tzr":
await Parser.Process2(message);
break;
default:
break;
}
}
Run Code Online (Sandbox Code Playgroud)
而在主
static void Main()
{
JobHostConfiguration config = new JobHostConfiguration();
config.Queues.BatchSize = 3;
config.Queues.MaxDequeueCount = 1;
var host = new JobHost(config);
host.RunAndBlock();
}
Run Code Online (Sandbox Code Playgroud)
这里:message.Substring(message.Length - 3, 3)只检查文件的扩展名。
我的问题是,我将如何使 …
我设置了一个天蓝色队列触发器:
[FunctionName("TransformData")]
public async Task Transform(
[QueueTrigger("product-prices")] string message)
{
await TransformAndLoadData(message);
}
Run Code Online (Sandbox Code Playgroud)
每当我通过 QueueClient 将项目添加到队列时:
(RawQueueData 将项目拆分为消息批次)
var rawQueueData = new RawQueueData<T>(data);
var sendMessageTasks = rawQueueData.Messages
.Select(m => _queueClient.SendMessageAsync(m));
await Task.WhenAll(sendMessageTasks);
Run Code Online (Sandbox Code Playgroud)
或者将消息从product-prices-poison移回到product-prices队列(使用Azure存储资源管理器),触发器立即失败并显示:
消息已达到 MaxDequeueCount 5。将消息移至产品价格毒药队列。
同步传递项目也会产生同样的问题。
我唯一一次可以成功触发触发器并处理项目是当我使用 Azure 存储资源管理器手动创建消息时。
增加 MaxDequeueCount 或批量大小不会产生影响。执行前者后的消息是:
消息已达到 MaxDequeueCount 100000。正在将消息移至产品价格毒药队列。
我还可以使用 QueueClient 手动将项目出队,没有任何问题。
我还尝试将接收到的对象类型更改为 QueueMessage、对象和字符串。大多数其他解决方案似乎专注于更新软件包(我正在使用最新的 - 稳定版 12.8.0)。
编辑:host.json:
{
"version": "2.0",
"extensions": {
"blobs": {
"maxDegreeOfParallelism": "4"
},
"queues": {
"maxDequeueCount": 5
}
}
}
Run Code Online (Sandbox Code Playgroud)