如何为自定义消息处理器实现自定义SamplingService?在检索元素之后和执行序列之前进行日志记录

And*_*ili 12 java esb wso2 wso2esb wso2-enterprise-integrator

我是WSO2 ESB的新手,我有一个具有这种特定行为的自定义消息处理器:在从消息存储中检索元素之后,在执行与此消息处理器相关的序列之前执行操作.

我试着详细解释一下.

这是我的ESB消息处理器定义:

<?xml version="1.0" encoding="UTF-8"?>
<!---<messageProcessor class="org.apache.synapse.message.processor.impl.sampler.SamplingProcessor" messageStore="transferFromMessageStore" name="transferFromMessageProcessor" xmlns="http://ws.apache.org/ns/synapse">-->
<messageProcessor class="com.mycompany.toolkit.messageprocessor.SamplingProcessorHeaderRateLimitation" messageStore="transferFromMessageStore" name="transferFromMessageProcessor" xmlns="http://ws.apache.org/ns/synapse">
    <parameter name="sequence">transferProcessorSequence</parameter>
    <parameter name="interval">1000</parameter>
    <parameter name="is.active">true</parameter>
    <parameter name="concurrency">1</parameter>
</messageProcessor>
Run Code Online (Sandbox Code Playgroud)

它从transferFromMessageStore(队列)中检索一些元素(XML文档),并将此对象传递给使用它的transferProcessorSequence.xml序列.正如您在此时所看到的,我已经实现了一个自定义消息处理器SamplingProcessorHeaderRateLimit,它只是扩展了org.apache.synapse.message.processor.impl.sampler.SamplingProcessor WSO2类.此时,它仅在执行init()方法时显示日志.我将它部署在我的Carbon服务器上,它可以工作.

在这里,您可以找到整个项目代码.

好但是从我所理解的获得所需的行为我不必简单地扩展SamplingProcessor类,因为为了在每个消息消耗和调度序列之间进行自定义实现,需要扩展类SamplingService类,这一个.

我认为我需要覆盖execute()fetch(MessageConsumer msgConsumer).

此时应该还可以插入日志,每次从消息存储库中检索元素时都会写入日志文件,在此之前执行与消息处理器相关的序列.

可能吗?

所以我的主要主要是:

1)我是否要创建一个将SamplingService类扩展到我实现自定义消息处理器的同一项目中的类(此行为必须仅用于我的WSO2 ESB项目中的此特定消息处理器,所有其他使用的消息处理器在这个项目中必须使用标准的SamplingService实现).

2)另一个疑问与此自定义SamplingService实现如何传递给我的自定义消息处理器有关.进入SamplingProcessor WSO2类(如何将特定的自定义消息处理器实现与处理其生命周期的自定义SamplingService实现相关联).

sha*_*zin 1

1) Have I to create a class extending the SamplingService class into the same project in which I am implementing my custom message processor (this behavior have to be used only for this specific message processor in my WSO2 ESB project, all the other message processor used in this project have to use the standard SamplingService implementation).
Run Code Online (Sandbox Code Playgroud)

您的自定义SamplingProcessorHeaderRateLimitation将仅使用传入的消息transferFromMessageStore,并将仅将其使用和处理的消息注入到sequence transferProcessorSequence。所有其他路径将不会被该消息处理器处理。

2) Another doubt is related about how this custom SamplingService implementation is passed to my custom message processor. Into the the SamplingProcessor WSO2 class (how to associate a specific custom message processor implementation with a custom SamplingService implementation handling its lifecycle).
Run Code Online (Sandbox Code Playgroud)

如果您查看您实现的源代码,您已经将您的自定义与您的自定义SamplingProcessorHeaderRateLimitation.getTask()绑定在一起SamplingService2SamplingProcessorHeaderRateLimitation

@Override
protected Task getTask() {
    logger.info("getTask() START");
    System.out.println("getTask() START");
    logger.info("getTask() END");
    System.out.println("getTask() END");
    return (Task) new SamplingService2(this, synapseEnvironment, CONCURRENCY, SEQUENCE, isProcessorStartAsDeactivated());
}
Run Code Online (Sandbox Code Playgroud)