许多设备正在发送消息,这些消息最终进入单个 Azure 服务总线队列(或主题)。我们希望并行处理多个消息,但我们希望避免在任何给定时间并发处理同一设备的两个消息。
下图说明了目标。有 3 个处理线程(实际上可能有几十个,分布在多个服务器之间)。每个方框表示单个消息的处理时间,颜色表示它属于哪个设备。
您可以看到,在任何时间点都没有来自同一设备的两个或多个重叠消息。
由于涉及多个处理服务器,我可以想象防止并发处理的唯一方法是以设备 ID 作为分区键对消息进行分区,然后每个分区只有一个消费者:
因此,来自“黄色设备”的所有消息都会发送至分区 1,依此类推。
我仍然想在单个进程中运行多个处理线程。现在,我们做一些简单的事情,比如
var client = QueueClient.CreateFromConnectionString(connectionString, queueName);
var options = new OnMessageOptions { MaxConcurrentCalls = x };
client.OnMessage(m =>
{
// Process...
m.Complete();
});
Run Code Online (Sandbox Code Playgroud)
如何将并发限制合并到此类代码中?
我可以想象一些基于参与者或其他并发机制的客户端解决方案。但有没有办法在 Broker 层面解决这个问题呢?
本页介绍如何使用 Azure 服务总线中的会话将来自同一源的消息分组到相同的接收器中。
在无会话队列处理器中,我可以控制可以并行获取的消息数量:
new OnMessageOptions { MaxConcurrentCalls = 10 };
Run Code Online (Sandbox Code Playgroud)
如果我传递这些选项,则同时处理的消息不会超过 10 条。
现在,对于会话式处理器,选项被替换为
new SessionHandlerOptions { MaxConcurrentSessions = 10 };
Run Code Online (Sandbox Code Playgroud)
其含义不同,即同时进行的会话不超过 10 个。
我的会话寿命相对较长,而且大部分都是空闲的,因此我必须将此参数设置为较高的值。但是,我仍然想限制并行消息的数量。
这可以开箱即用吗?
MaxConcurrentSessions 如果我设置为,并行化的实际限制是什么int.MaxValue?
我有一个Azure函数,我想让它从EventHub接收消息(这非常简单并且有效),然后在运行时使用表绑定将该信息放入表存储中。
这是我到目前为止的内容:
public static async Task Run(string eventHubMessage, TraceWriter log, Binder binder)
{
var m = JsonConvert.DeserializeObject<Measurement>(eventHubMessage);
var attributes = new Attribute[]
{
new StorageAccountAttribute("AzureWebJobsTest"),
new TableAttribute(tableName, m.PartitionKey, m.RowKey)
};
using(var output = await binder.BindAsync<MyTableEntity>(attributes))
{
if(output == null)
log.Info($"4. output is null");
else
{
output.Minimum = m.Minimum;
output.Maximum = m.Maximum;
output.Average = m.Average;
output.Timestamp = m.Timestamp;
output.ETag = m.ETag;
output.WriteEntity(/* Need an operationContext*/)
}
}
}
public class MyTableEntity : TableEntity, IDisposable
{
public double Average { get; set;} …Run Code Online (Sandbox Code Playgroud) 我有一个服务器,它需要向多个客户端发送消息,让客户端知道需要做一些事情。
我正在尝试通过使用 Azure 事件中心来实现这一点。
我使用以下代码发送消息:
await eventHubClient.SendAsync(
new EventData(Encoding.UTF8.GetBytes(String.Format("Message {0}, {1}", i, sMessage))),
"1")
.ConfigureAwait(continueOnCapturedContext: false);
await eventHubClient.CloseAsync();
Run Code Online (Sandbox Code Playgroud)
我使用两个 WPF 应用程序作为侦听器,它们将在启动时创建侦听器并将其保存EventProcessorHost在私有变量中。
当我发送消息时,哪个听众将处理该消息是随机的。
是否可以使用 Azure 事件中心向多个收件人发送消息?
Azure Functions附带一组固定的预先存在的绑定。
同时,Azure Functions基于Web Jobs SDK,具有一些可扩展性。它允许创建自定义绑定类型,包括自定义触发器。
是否可以在Azure Function运行时中运行那些自定义绑定?如果是,是否有有关该操作的教程或文档?如果没有,有什么计划吗?
一些示例使用场景将集成到非Azure产品(例如Kafka)或自定义内部协议中。
我有这个查询,因为你firstName包含单引号安东尼奥尼尔
:> g.addV('person')
.property('firstName', 'Anthony O'Neil')
.property('lastName', 'Andersen')
.property('age', 44)
Run Code Online (Sandbox Code Playgroud)
任何想法如何逃脱它?
我正在消耗计划上运行 Azure Function 应用程序,并且我想监视当前运行的实例数量。使用格式的 REST API 端点
https://management.azure.com/subscriptions/{subscr}/resourceGroups/{rg}
/providers/Microsoft.Web/sites/{appname}/instances?api-version=2015-08-01
Run Code Online (Sandbox Code Playgroud)
我能够检索实例。但是,结果与我在 Application Insights/Live Metrics Stream 中看到的信息不匹配。
例如,现在 App Insights 显示有 4 台服务器在线,而 API 调用仅返回一台(这 1 个实例的 GUID 也在 App Insights guid 中)。
我可以信任谁?有没有更好的方法来获取实例计数(例如从 App Insights)?
更新:看起来 REST API 的数据是错误的。
我向队列发送了 10000 条消息,并使用处理请求的相应实例 ID 记录每个函数调用。
虽然消息不断传入且积压工作不断增长,但 REST API 的实例计数似乎是正确的(从 1 缩放到 12)。发送停止后,报告的实例计数迅速下降(最终回到 1,而处理器仍然繁忙)。
但根据速度和执行日志,我可以看出实际实例数不断增长,在处理最后一条消息时最终达到 15 个实例。
UPDATE2:看起来 SDK 拒绝报告超过 20 个服务器。该指标稳定在 20,而 App Insights 保持稳定增长,目前已达到 41。
我应该使用哪种触发器类型来运行 Azure 函数作为 Azure 事件网格主题的订阅?
与事件网格相关的任何地方都提到了此功能,但我没有看到任何教程或代码示例。
我创建了一个示例 Azure 自动化 Powershell Runbook。我正在尝试执行 SQL 命令,然后将该命令中的消息打印到 Workbook 输出中。
我的代码取自使用 PowerShell 从 SQL Server 捕获 InfoMessage 输出,如果我在本地运行它,它就可以工作:
Write-Output "Starting"
$conn = New-Object System.Data.SqlClient.SqlConnection "Data Source=abc.database.windows.net,1433;Initial Catalog=def;Integrated Security=False;User ID=ghj;Password=qwe"
## Attach the InfoMessage Event Handler to the connection to write out the messages
$handler = [System.Data.SqlClient.SqlInfoMessageEventHandler] {param($sender, $event) Write-Output $event.Message };
$conn.add_InfoMessage($handler);
$conn.FireInfoMessageEventOnUserErrors = $true;
$conn.Open();
$cmd = $conn.CreateCommand();
$cmd.CommandText = "PRINT 'This is the message from the PRINT statement'";
$cmd.ExecuteNonQuery();
$cmd.CommandText = "RAISERROR('This is the message from the …Run Code Online (Sandbox Code Playgroud)