如何在 C# 中创建通用管道?

moh*_*eli 3 c# pipeline

我正在尝试在 C# 中创建一个通用(通用)管道,以便在许多项目中重用。这个想法与ASP.NET Core Middleware非常相似。它更像是一个可以动态组合的巨大函数(双向管道)(类似于BRE

它需要获取一个输入模型,管理之前加载的一系列处理器,并在输入旁边返回一个包裹在超模型中的输出模型。

这就是我所做的。我创建了一个Context类,代表整体数据/模型:

public class Context<InputType, OutputType> where InputType : class, new() where OutputType : class, new()
{
    public Context()
    {
        UniqueToken = new Guid();
        Logs = new List<string>();
    }

    public InputType Input { get; set; } 

    public OutputType Output { get; set; }

    public Guid UniqueToken { get; }

    public DateTime ProcessStartedAt { get; set; }

    public DateTime ProcessEndedAt { get; set; }

    public long ProcessTimeInMilliseconds
    {
        get
        {
            return (long)ProcessEndedAt.Subtract(ProcessStartedAt).TotalMilliseconds;
        }
    }

    public List<string> Logs { get; set; }
}
Run Code Online (Sandbox Code Playgroud)

然后我创建了一个接口,以在真实处理器上强制执行签名:

public interface IProcessor
{
    void Process<InputType, OutputType>(Context<InputType, OutputType> context, IProcessor next) where InputType : class, new() where OutputType : class, new();
}
Run Code Online (Sandbox Code Playgroud)

然后我创建了一个Container, 来管理整个管道:

public class Container<InputType, OutputType> where InputType : class, new() where OutputType : class, new()
{
    public static List<IProcessor> Processors { get; set; }

    public static void Initialize()
    {
        LoadProcessors();
    }

    private static void LoadProcessors()
    {
        // loading processors from assemblies dynamically
    }

    public static Context<InputType, OutputType> Execute(InputType input)
    {
        if (Processors.Count == 0)
        {
            throw new FrameworkException("No processor is found to be executed");
        }
        if (input.IsNull())
        {
            throw new BusinessException($"{nameof(InputType)} is not provided for processing pipeline");
        }
        var message = new Context<InputType, OutputType>();
        message.Input = input;
        message.ProcessStartedAt = DateTime.Now;
        Processors[0].Process(message, Processors[1]);
        message.ProcessEndedAt = DateTime.Now;
        return message;
    }
}
Run Code Online (Sandbox Code Playgroud)

我知道如何从给定文件夹中的程序集中动态加载处理器,所以这不是问题。但我被困在以下几点:

  1. 如何将下一个处理器注入每个处理器(我可以Next在每个处理器上强制使用一个属性,但我猜它是反对SRP 的,因为每个处理器应该只关心完成它的工作,而不是保持链)
  2. 如何确保正确的排序(一种选择是Order在每个处理器中都有属性,并确保它们没有重复的值,但这似乎违反了SRP,每个处理器应该只关心处理,而不是它的顺序)
  3. 如何保证简单的使用?在为团队创建基础设施方面,对开发人员友好是一件大事。否则团队成员不会接受它。
  4. 如何设计链条,使短路成为可能?

Nic*_*ler 5

我建议稍微不同的设计。这个想法基于装饰者模式。

首先,我会创建Context一个非泛型类并删除输入和输出值。在我的设计中,上下文只保存上下文信息(比如处理时间和消息):

public class Context
{
    public Context()
    {
        UniqueToken = new Guid();
        Logs = new List<string>();
    }        

    public Guid UniqueToken { get; }

    public DateTime ProcessStartedAt { get; set; }

    public DateTime ProcessEndedAt { get; set; }

    public long ProcessTimeInMilliseconds
    {
        get
        {
            return (long)ProcessEndedAt.Subtract(ProcessStartedAt).TotalMilliseconds;
        }
    }

    public List<string> Logs { get; set; }
}
Run Code Online (Sandbox Code Playgroud)

然后,我将使处理器接口通用:

public interface IProcessor<InputType, OutputType>
{
    OutputType Process(InputType input, Context context);
}
Run Code Online (Sandbox Code Playgroud)

然后我把你Container变成了一个Pipeline带有泛型类型参数的:

public interface IPipeline<InputType, OutputType>
{
    OutputType Execute(InputType input, out Context context);
    OutputType ExecuteSubPipeline(InputType input, Context context);
}
Run Code Online (Sandbox Code Playgroud)

这两个函数的区别在于前者初始化上下文,后者只使用它。如果您不希望您的客户访问ExecuteSubPipeline().

然后的想法是将具有越来越多处理器的多个管道对象相互封装。您从只有一个处理器的管道对象开始。比你把它包装在另一个管道对象中等等。为此,我从一个抽象基类开始。这个基类与一个处理器相关联,并有一个函数AppendProcessor()来创建一个新的管道,添加一个给定的处理器:

public abstract class PipelineBase<InputType, ProcessorInputType, OutputType> : IPipeline<InputType, OutputType>
{
    protected IProcessor<ProcessorInputType, OutputType> currentProcessor;

    public PipelineBase(IProcessor<ProcessorInputType, OutputType> processor)
    {
        currentProcessor = processor;
    }

    public IPipeline<InputType, ProcessorOutputType> AppendProcessor<ProcessorOutputType>(IProcessor<OutputType, ProcessorOutputType> processor)
    {
        return new Pipeline<InputType, OutputType, ProcessorOutputType>(processor, this);
    }

    public OutputType Execute(InputType input, out Context context)
    {
        context = new Context();
        context.ProcessStartedAt = DateTime.Now;
        var result = ExecuteSubPipeline(input, context);
        context.ProcessEndedAt = DateTime.Now;
        return result;
    }

    public abstract OutputType ExecuteSubPipeline(InputType input, Context context);
}
Run Code Online (Sandbox Code Playgroud)

现在,我们有这个管道的两个具体实现:一个是任何管道起点的终端实现和一个包装管道:

public class TerminalPipeline<InputType, OutputType> : PipelineBase<InputType, InputType, OutputType>
{       
    public TerminalPipeline(IProcessor<InputType, OutputType> processor)
        :base(processor)
    { }

    public override OutputType ExecuteSubPipeline(InputType input, Context context)
    {
        return currentProcessor.Process(input, context);
    }
}

public class Pipeline<InputType, ProcessorInputType, OutputType> : PipelineBase<InputType, ProcessorInputType, OutputType>
{
    IPipeline<InputType, ProcessorInputType> previousPipeline;

    public Pipeline(IProcessor<ProcessorInputType, OutputType> processor, IPipeline<InputType, ProcessorInputType> previousPipeline)
        : base(processor)
    {
        this.previousPipeline = previousPipeline;
    }

    public override OutputType ExecuteSubPipeline(InputType input, Context context)
    {
        var previousPipelineResult = previousPipeline.ExecuteSubPipeline(input, context);
        return currentProcessor.Process(previousPipelineResult, context);
    }
}
Run Code Online (Sandbox Code Playgroud)

为了便于使用,我们还创建了一个辅助函数来创建终端启动管道(以允许类型参数推导):

public static class Pipeline
{
    public static TerminalPipeline<InputType, OutputType> Create<InputType, OutputType>(IProcessor<InputType, OutputType> processor)
    {
        return new TerminalPipeline<InputType, OutputType>(processor);
    }
}
Run Code Online (Sandbox Code Playgroud)

然后,我们可以将这种结构用于各种处理器。例如:

class FloatToStringProcessor : IProcessor<float, string>
{
    public string Process(float input, Context context)
    {
        return input.ToString();
    }
}

class RepeatStringProcessor : IProcessor<string, string>
{
    public string Process(string input, Context context)
    {
        return input + input + input;
    }
}

class Program
{
    public static void Main()
    {
        var pipeline = Pipeline
            .Create(new FloatToStringProcessor())
            .AppendProcessor(new RepeatStringProcessor());

        Context ctx;
        var result = pipeline.Execute(5, out ctx);
        Console.WriteLine($"Pipeline result: {result}");
        Console.WriteLine($"Pipeline execution took {ctx.ProcessTimeInMilliseconds} milliseconds");
    }
}
Run Code Online (Sandbox Code Playgroud)

这将打印

Pipeline result: 555
Pipeline execution took 6 milliseconds
Run Code Online (Sandbox Code Playgroud)

我不明白你说的短路是什么意思。在我看来,短路仅对(至少)不需要计算一个操作数的二元运算符有意义。但是由于您的运算符都是一元的,因此无法真正应用。处理器可以随时检查输入,当发现不需要处理时直接返回。

动态加载可以通过向界面添加类似的东西LoadProcessors()来轻松添加IPipeline,类似于ExecuteSubPipeline(). 在这种情况下,处理器对象必须是代表(仍然正确键入)。然后,LoadProcessors()可以在加载它们后用它们的实际处理器替换它们。