如何将依赖项传递给实现IEventProcessor(Event Hub)的类

Ver*_*ali 4 castle-windsor azure-eventhub

我有以下问题。

我们使用事件中心。在下面的类中,我们从IEventProcessor继承,并且您可以看到我们使用Service Locator。我们无法使其与构造函数/属性注入一起使用。当目标是从IEventProcessor继承的类时,Castle Windsor似乎无法解析依赖项。这是一个已知问题,还是我需要做一些工作才能使其正常工作?

下面是代码:

public class EventProcessor : IEventProcessor
{
    private readonly IEventService _eventService;
    private readonly ILogger _logger;
    private readonly Lazy<RetryPolicy> _retryPolicy;
    private readonly IConfigurationProvider _configurationProvider;

    public EventProcessor()
    {
        try
        {
            _eventService = ContainerProvider.Current.Container.Resolve<IEventService>();
            _logger = ContainerProvider.Current.Container.Resolve<ILogger>();
            _configurationProvider =     ContainerProvider.Current.Container.Resolve<IConfigurationProvider>();

        }
        catch (Exception exception)
        {
            _logger.WriteError(string.Format("Error occured when intializing EventProcessor: '{0}'", exception));
        }
    }

    public Task OpenAsync(PartitionContext context)
    {
        return Task.FromResult(0);
    }

    public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> events)
    {
        var eventsList = events.ToList();
        EventData lastEvent = null;
        foreach (var eventData in eventsList)
        {
            _logger.WriteVerbose(string.Format("Consumming {0} events...", eventsList.Count()));
            _eventService.ProcessEvent(eventData);
            lastEvent = eventData;
        }

        if (lastEvent != null)
        {
            await AzureServiceBusRetryPolicy.ExecuteAsync(async () => await context.CheckpointAsync(lastEvent));
        }
    }

    public async Task CloseAsync(PartitionContext context, CloseReason reason)
    {
        _logger.WriteInfo("EventHub processor was closed for this reason: " + reason);

        if (reason == CloseReason.Shutdown)
        {
            await AzureServiceBusRetryPolicy.ExecuteAsync(async () => await context.CheckpointAsync());
        }

    }


}
Run Code Online (Sandbox Code Playgroud)

谢谢

Rag*_*lly 5

我正在使用Autofac,但遇到了同样的问题。

我通过实现IEventProcessorFactory并在EventProcessorHost中注册处理器时使用它来解决。

举个例子,我EventProcessorHost看起来像这样:

public class EventHubProcessorHost
{
    private readonly IEventProcessorFactory _eventProcessorFactory;
    private readonly string _serviceBusConnectionString;
    private readonly string _storageConnectionString;
    private readonly string _eventHubName;

    public EventHubProcessorHost(IEventProcessorFactory eventProcessorFactory, string serviceBusConnectionString, string storageConnectionString, string eventHubName)
    {
        _eventProcessorFactory = eventProcessorFactory;
        _serviceBusConnectionString = serviceBusConnectionString;
        _storageConnectionString = storageConnectionString;
        _eventHubName = eventHubName;
    }

    public void Start()
    {
        var builder = new ServiceBusConnectionStringBuilder(_serviceBusConnectionString)
        {
            TransportType = TransportType.Amqp
        };

        var client = EventHubClient.CreateFromConnectionString(builder.ToString(), _eventHubName);

        try
        {
            var eventProcessorHost = new EventProcessorHost("singleworker",
              client.Path, client.GetDefaultConsumerGroup().GroupName, builder.ToString(), _storageConnectionString);

            eventProcessorHost.RegisterEventProcessorFactoryAsync(_eventProcessorFactory);
        }
        catch (Exception exp)
        {
            Console.WriteLine("Error on send: " + exp.Message);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

我传入的工厂引用了我的IoC容器:

public class MyEventProcessorFactory : IEventProcessorFactory
{
    private readonly IComponentContext _componentContext;

    public MyEventProcessorFactory(IComponentContext componentContext)
    {
        _componentContext = componentContext;
    }

    public IEventProcessor CreateEventProcessor(PartitionContext context)
    {
        return _componentContext.Resolve<IEventProcessor>();
    }
}
Run Code Online (Sandbox Code Playgroud)

这使我可以像往常一样在我的构造器中使用构造函数注入EventProcessor

public class MyEventProcessor : IEventProcessor
{
    private IFoo _foo;

    public MyEventProcessor(IFoo foo)
    {
        _foo = foo;
    }

    public Task OpenAsync(PartitionContext context)
    {
        return Task.FromResult<object>(null);
    }

    public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> events)
    {
        foreach (var eventData in events)
        {
            // Processing code
        }

        await context.CheckpointAsync();
    }

    public async Task CloseAsync(PartitionContext context, CloseReason reason)
    {
        if (reason == CloseReason.Shutdown)
        {
            await context.CheckpointAsync();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

然后在启动时,我将所有内容正常连接到Autofac容器中:

builder.RegisterType<Foo>().As<IFoo>()

builder.RegisterType<MyEventProcessor>().As<IEventProcessor>()

builder.Register(c => new MyEventProcessorFactory(c.Resolve<IComponentContext>())).As<IEventProcessorFactory>();
Run Code Online (Sandbox Code Playgroud)

希望这可以帮助。


noz*_*man 0

这不是使用温莎城堡注入依赖项的方法。鉴于您的代码构造函数注入将与列出依赖项作为参数的构造函数一起使用。在你的情况下,这看起来像是……。喜欢

public EventProcessor(IEventService eventService, ILogger logger, IConfigurationProvider configProvider)
{
    try
    {
        this._eventService = eventService;
        this._logger = logger; // however, i suggest using Property injection for that, see next example
        this._configurationProvider = configProvider;

    }
    catch (Exception exception)
    {
        this._logger.WriteError(string.Format("Error occured when intializing EventProcessor: '{0}'", exception));
    }
}
Run Code Online (Sandbox Code Playgroud)

然后,您可以解析 EventProcessor 本身,并且将注入这些依赖项。

财产注入适用于公共财产。您只需像普通属性一样定义它们,它们就会独立注入。

public ILogger Logger {get; set;}
Run Code Online (Sandbox Code Playgroud)

考虑使用 Castle Windsors 方式注入记录器(请参阅Winsdor 文档中的 LoggingFacility。也许,您想使用 NLog 或 log4net (像我一样)? --> 然后有一个现成的 nugetpackage (Castle Windor log4net integraton)。