Pal*_*lmi 1 events dependency-injection rabbitmq multi-tenant asp.net-core
我有一个微服务架构,其中包含 ASP.Net Core 应用程序和 RabbitMq 作为微服务之间的事件总线。
我还想支持多租户。
因此,我在 中定义了以下依赖项注入服务,Startup.cs以便根据用户的租户 ID 在每个请求上打开与数据库的连接。
services.AddScoped<IDocumentSession>(ds =>
{
var store = ds.GetRequiredService<IDocumentStore>();
var httpContextAccessor = ds.GetRequiredService<IHttpContextAccessor>();
var tenant = httpContextAccessor?.HttpContext?.User?.Claims.FirstOrDefault(c => c.Type == "tid")?.Value;
return tenant != null ? store.OpenSession(tenant) : store.OpenSession();
});
Run Code Online (Sandbox Code Playgroud)
问题是当服务处理事件总线消息(如 UserUpdatedEvent)时。
在这种情况下,当它尝试打开 Db 连接时,它显然没有来自 http 上下文的用户信息。
在注入作用域服务并使用 RabbitMq 处理事件时,如何发送/访问相应用户的租户 ID?
或者改写我的问题:执行依赖项注入代码时,有什么方法可以访问 RabbitMQ 消息(例如其标头)?
由于没有HttpContext,因为 RabbitMq 请求不是 Http 请求,正如 @istepaniuk 的答案中指出的那样,我创建了自己的上下文并调用它AmqpContext:
public interface IAmqpContext
{
void ClearHeaders();
void AddHeaders(IDictionary<string, object> headers);
string GetHeaderByKey(string headerKey);
}
public class AmqpContext : IAmqpContext
{
private readonly Dictionary<string, object> _headers;
public AmqpContext()
{
_headers = new Dictionary<string, object>();
}
public void ClearHeaders()
{
_headers.Clear();
}
public void AddHeaders(IDictionary<string, object> headers)
{
foreach (var header in headers)
_headers.Add(header.Key, header.Value);
}
public string GetHeaderByKey(string headerKey)
{
if (_headers.TryGetValue(headerKey, out object headerValue))
{
return Encoding.Default.GetString((byte[])headerValue);
}
return null;
}
}
Run Code Online (Sandbox Code Playgroud)
当发送 RabbitMq 消息时,我通过标头发送租户 ID,如下所示:
var properties = channel.CreateBasicProperties();
if (tenantId != null)
{
var headers = new Dictionary<string, object>
{
{ "tid", tenantId }
};
properties.Headers = headers;
}
channel.BasicPublish(exchange: BROKER_NAME,
routingKey: eventName,
mandatory: true,
basicProperties: properties,
body: body);
Run Code Online (Sandbox Code Playgroud)
然后,当在接收服务上时,我将其注册AmqpContext为范围服务Startup.cs:
services.AddScoped<IAmqpContext, AmqpContext>();
Run Code Online (Sandbox Code Playgroud)
当接收到 RabbitMq 消息时,在消费者通道内,会创建一个作用域和 Amqp 上下文:
consumer.Received += async (model, ea) =>
{
var eventName = ea.RoutingKey;
var message = Encoding.UTF8.GetString(ea.Body);
var properties = ea.BasicProperties;
using (var scope = _serviceProvider.CreateScope())
{
var amqpContext = scope.ServiceProvider.GetService<IAmqpContext>();
if (amqpContext != null)
{
amqpContext.ClearHeaders();
if (properties.Headers != null && amqpContext != null)
{
amqpContext.AddHeaders(properties.Headers);
}
}
var handler = scope.ServiceProvider.GetService(subscription.HandlerType);
if (handler == null) continue;
var eventType = _subsManager.GetEventTypeByName(eventName);
var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
}
channel.BasicAck(ea.DeliveryTag, multiple: false);
};
Run Code Online (Sandbox Code Playgroud)
然后,当创建范围数据库连接服务时(请参阅我的问题),我可以从消息标头访问租户 id:
services.AddScoped<IDocumentSession>(ds =>
{
var store = ds.GetRequiredService<IDocumentStore>();
string tenant = null;
var httpContextAccessor = ds.GetRequiredService<IHttpContextAccessor>();
if (httpContextAccessor.HttpContext != null)
{
tenant = httpContextAccessor.HttpContext.User?.Claims.FirstOrDefault(c => c.Type == "tid")?.Value;
}
else
{
var amqpContext = ds.GetRequiredService<IAmqpContext>();
tenant = amqpContext.GetHeaderByKey("tid");
}
return tenant != null ? store.OpenSession(tenant) : store.OpenSession();
});
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1736 次 |
| 最近记录: |