我正在尝试在 C# 中创建一个通用(通用)管道,以便在许多项目中重用。这个想法与ASP.NET Core Middleware非常相似。它更像是一个可以动态组合的巨大函数(双向管道)(类似于BRE)
它需要获取一个输入模型,管理之前加载的一系列处理器,并在输入旁边返回一个包裹在超模型中的输出模型。
这就是我所做的。我创建了一个Context类,代表整体数据/模型:
public class Context<InputType, OutputType> where InputType : class, new() where OutputType : class, new()
{
public Context()
{
UniqueToken = new Guid();
Logs = new List<string>();
}
public InputType Input { get; set; }
public OutputType Output { get; set; }
public Guid UniqueToken { get; }
public DateTime ProcessStartedAt { get; set; }
public DateTime ProcessEndedAt { get; set; }
public long ProcessTimeInMilliseconds
{
get
{
return (long)ProcessEndedAt.Subtract(ProcessStartedAt).TotalMilliseconds;
}
}
public List<string> Logs { get; set; }
}
Run Code Online (Sandbox Code Playgroud)
然后我创建了一个接口,以在真实处理器上强制执行签名:
public interface IProcessor
{
void Process<InputType, OutputType>(Context<InputType, OutputType> context, IProcessor next) where InputType : class, new() where OutputType : class, new();
}
Run Code Online (Sandbox Code Playgroud)
然后我创建了一个Container, 来管理整个管道:
public class Container<InputType, OutputType> where InputType : class, new() where OutputType : class, new()
{
public static List<IProcessor> Processors { get; set; }
public static void Initialize()
{
LoadProcessors();
}
private static void LoadProcessors()
{
// loading processors from assemblies dynamically
}
public static Context<InputType, OutputType> Execute(InputType input)
{
if (Processors.Count == 0)
{
throw new FrameworkException("No processor is found to be executed");
}
if (input.IsNull())
{
throw new BusinessException($"{nameof(InputType)} is not provided for processing pipeline");
}
var message = new Context<InputType, OutputType>();
message.Input = input;
message.ProcessStartedAt = DateTime.Now;
Processors[0].Process(message, Processors[1]);
message.ProcessEndedAt = DateTime.Now;
return message;
}
}
Run Code Online (Sandbox Code Playgroud)
我知道如何从给定文件夹中的程序集中动态加载处理器,所以这不是问题。但我被困在以下几点:
我建议稍微不同的设计。这个想法基于装饰者模式。
首先,我会创建Context一个非泛型类并删除输入和输出值。在我的设计中,上下文只保存上下文信息(比如处理时间和消息):
public class Context
{
public Context()
{
UniqueToken = new Guid();
Logs = new List<string>();
}
public Guid UniqueToken { get; }
public DateTime ProcessStartedAt { get; set; }
public DateTime ProcessEndedAt { get; set; }
public long ProcessTimeInMilliseconds
{
get
{
return (long)ProcessEndedAt.Subtract(ProcessStartedAt).TotalMilliseconds;
}
}
public List<string> Logs { get; set; }
}
Run Code Online (Sandbox Code Playgroud)
然后,我将使处理器接口通用:
public interface IProcessor<InputType, OutputType>
{
OutputType Process(InputType input, Context context);
}
Run Code Online (Sandbox Code Playgroud)
然后我把你Container变成了一个Pipeline带有泛型类型参数的:
public interface IPipeline<InputType, OutputType>
{
OutputType Execute(InputType input, out Context context);
OutputType ExecuteSubPipeline(InputType input, Context context);
}
Run Code Online (Sandbox Code Playgroud)
这两个函数的区别在于前者初始化上下文,后者只使用它。如果您不希望您的客户访问ExecuteSubPipeline().
然后的想法是将具有越来越多处理器的多个管道对象相互封装。您从只有一个处理器的管道对象开始。比你把它包装在另一个管道对象中等等。为此,我从一个抽象基类开始。这个基类与一个处理器相关联,并有一个函数AppendProcessor()来创建一个新的管道,添加一个给定的处理器:
public abstract class PipelineBase<InputType, ProcessorInputType, OutputType> : IPipeline<InputType, OutputType>
{
protected IProcessor<ProcessorInputType, OutputType> currentProcessor;
public PipelineBase(IProcessor<ProcessorInputType, OutputType> processor)
{
currentProcessor = processor;
}
public IPipeline<InputType, ProcessorOutputType> AppendProcessor<ProcessorOutputType>(IProcessor<OutputType, ProcessorOutputType> processor)
{
return new Pipeline<InputType, OutputType, ProcessorOutputType>(processor, this);
}
public OutputType Execute(InputType input, out Context context)
{
context = new Context();
context.ProcessStartedAt = DateTime.Now;
var result = ExecuteSubPipeline(input, context);
context.ProcessEndedAt = DateTime.Now;
return result;
}
public abstract OutputType ExecuteSubPipeline(InputType input, Context context);
}
Run Code Online (Sandbox Code Playgroud)
现在,我们有这个管道的两个具体实现:一个是任何管道起点的终端实现和一个包装管道:
public class TerminalPipeline<InputType, OutputType> : PipelineBase<InputType, InputType, OutputType>
{
public TerminalPipeline(IProcessor<InputType, OutputType> processor)
:base(processor)
{ }
public override OutputType ExecuteSubPipeline(InputType input, Context context)
{
return currentProcessor.Process(input, context);
}
}
public class Pipeline<InputType, ProcessorInputType, OutputType> : PipelineBase<InputType, ProcessorInputType, OutputType>
{
IPipeline<InputType, ProcessorInputType> previousPipeline;
public Pipeline(IProcessor<ProcessorInputType, OutputType> processor, IPipeline<InputType, ProcessorInputType> previousPipeline)
: base(processor)
{
this.previousPipeline = previousPipeline;
}
public override OutputType ExecuteSubPipeline(InputType input, Context context)
{
var previousPipelineResult = previousPipeline.ExecuteSubPipeline(input, context);
return currentProcessor.Process(previousPipelineResult, context);
}
}
Run Code Online (Sandbox Code Playgroud)
为了便于使用,我们还创建了一个辅助函数来创建终端启动管道(以允许类型参数推导):
public static class Pipeline
{
public static TerminalPipeline<InputType, OutputType> Create<InputType, OutputType>(IProcessor<InputType, OutputType> processor)
{
return new TerminalPipeline<InputType, OutputType>(processor);
}
}
Run Code Online (Sandbox Code Playgroud)
然后,我们可以将这种结构用于各种处理器。例如:
class FloatToStringProcessor : IProcessor<float, string>
{
public string Process(float input, Context context)
{
return input.ToString();
}
}
class RepeatStringProcessor : IProcessor<string, string>
{
public string Process(string input, Context context)
{
return input + input + input;
}
}
class Program
{
public static void Main()
{
var pipeline = Pipeline
.Create(new FloatToStringProcessor())
.AppendProcessor(new RepeatStringProcessor());
Context ctx;
var result = pipeline.Execute(5, out ctx);
Console.WriteLine($"Pipeline result: {result}");
Console.WriteLine($"Pipeline execution took {ctx.ProcessTimeInMilliseconds} milliseconds");
}
}
Run Code Online (Sandbox Code Playgroud)
这将打印
Pipeline result: 555
Pipeline execution took 6 milliseconds
Run Code Online (Sandbox Code Playgroud)
我不明白你说的短路是什么意思。在我看来,短路仅对(至少)不需要计算一个操作数的二元运算符有意义。但是由于您的运算符都是一元的,因此无法真正应用。处理器可以随时检查输入,当发现不需要处理时直接返回。
动态加载可以通过向界面添加类似的东西LoadProcessors()来轻松添加IPipeline,类似于ExecuteSubPipeline(). 在这种情况下,处理器对象必须是代表(仍然正确键入)。然后,LoadProcessors()可以在加载它们后用它们的实际处理器替换它们。