sit*_*ant 5 spring slf4j mdc apache-kafka spring-kafka
kafka 监听消息之前/之后是否有任何类型的钩子可用?
使用案例:必须设置 MDC 关联 ID 才能执行日志可追溯性
我在寻找什么?之前/之后回调方法,以便可以在进入时设置 MDC 关联 ID,并最终在退出时清除 MDC。
编辑后的场景: 我将关联 id 作为 Kafka 标头的一部分,并且我想在 Kafka 监听器中收到消息后立即在 MDC 中进行设置
感谢您的帮助
您可以向您的侦听器 bean 添加周围建议...
@SpringBootApplication
public class So59854374Application {
public static void main(String[] args) {
SpringApplication.run(So59854374Application.class, args);
}
@Bean
public static BeanPostProcessor bpp() { // static is important
return new BeanPostProcessor() {
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof MyListener) {
ProxyFactoryBean pfb = new ProxyFactoryBean();
pfb.setTarget(bean);
pfb.addAdvice(new MethodInterceptor() {
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
try {
System.out.println("Before");
return invocation.proceed();
}
finally {
System.out.println("After");
}
}
});
return pfb.getObject();
}
return bean;
}
};
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so59854374").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> template.send("so59854374", "foo");
}
}
@Component
class MyListener {
@KafkaListener(id = "so59854374", topics = "so59854374")
public void listen(String in) {
System.out.println(in);
}
}
Run Code Online (Sandbox Code Playgroud)
和
Before
foo
After
Run Code Online (Sandbox Code Playgroud)
编辑
如果将@Header("myMdcHeader") byte[] mdc附加参数添加到 kafka 侦听器方法中,则可以getArguments()[1]在调用中使用。
RecordInterceptor另一个解决方案是向侦听器容器工厂添加一个,它允许您在将原始数据ConsumerRecord传递到侦听器适配器之前访问原始数据。
Before
foo
After
Run Code Online (Sandbox Code Playgroud)
/**
* Set an interceptor to be called before calling the listener.
* Does not apply to batch listeners.
* @param recordInterceptor the interceptor.
* @since 2.2.7
*/
public void setRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) {
this.recordInterceptor = recordInterceptor;
}
Run Code Online (Sandbox Code Playgroud)
如果您使用批处理侦听器,Kafka 会提供一个ConsumerInterceptor.
| 归档时间: |
|
| 查看次数: |
3605 次 |
| 最近记录: |