cod*_*ent 6 spring-boot spring-cloud spring-cloud-stream spring-kafka micrometer
升级到 Spring Boot 3 后,我必须更新跟踪/关联配置,以从 Spring Cloud Sleuth 切换到新的 Micrometer 跟踪库。
此时,我可以在日志中看到 traceId/spanId 信息,这些信息已使用带有自动检测的 .txt 文件的 HTTP 调用正确传输到其他服务WebClient
。
然而,Spring Cloud Streams Kafka 生产者和消费者似乎没有被检测到。
这是生产者的示例:
logger.debug("Sending message to kafka queue {}", message)
streamBridge.send(bindingName, message)
Run Code Online (Sandbox Code Playgroud)
带有traceId、spanId的日志:
[producer,638b36642e4fe0b203b7f32b746b22de,03b7f32b746b22de] 52233 --- [ctor-http-nio-3] i.s.m.p.p.ProjectTaskEventProducer : Sending message to kafka queue GenericMessage [xxx]
[producer,638b36642e4fe0b203b7f32b746b22de,03b7f32b746b22de] 52233 --- [ctor-http-nio-3] o.s.c.s.binder.DefaultBinderFactory : Creating binder: kafka
Run Code Online (Sandbox Code Playgroud)
在消费者方面,我有这个简单的 KStream:
@Bean
fun processEvent() =
Function<KStream<EventKey, EventValue>, KStream<EventKey, EventValue?>> { events ->
events.process(
ProcessorSupplier {
Processor<EventKey, EventValue, EventKey, EventValue> {
logger.info("{}", it.headers())
}
}
)
}
Run Code Online (Sandbox Code Playgroud)
日志
[consumer,,] 52544 --- [-StreamThread-1] ventKStreamConfiguration$$SpringCGLIB$$0 : RecordHeaders(headers = [RecordHeader(key = target-protocol, value = [107, 97, 102, 107, 97]), RecordHeader(key = spring_json_header_types, value = [123, 34, 116, 97, 114, 103, 101, 116, 45, 112, 114, 111, 116, 111, 99, 111, 108, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 125])], isReadOnly = false)
Run Code Online (Sandbox Code Playgroud)
正如您所看到的,仅传输了两个标头(target-protocol
和spring_json_header_types
),b3
标头丢失。因此,MDC 日志也没有被设置。
Micrometer文档中有关消息传递工具的内容非常稀疏,因此不清楚如何在 Spring Cloud Stream 的上下文中执行此操作。
StreamBridge
像 一样WebClient
被自动检测吗?更新1:
我已经ProducerMessageHandlerCustomizer
按照指示添加了一个,以便能够观察底层KafkaTemplate
.
@Configuration
class KafkaProducerConfiguration {
@Bean
fun kafkaProducerObservationCustomizer () : ProducerMessageHandlerCustomizer<KafkaProducerMessageHandler<EventKey, EventValue>> {
return ProducerMessageHandlerCustomizer<KafkaProducerMessageHandler<EventKey, EventValue>> {
handler, destinationName ->
handler.kafkaTemplate.setObservationEnabled(true)
}
}
}
Run Code Online (Sandbox Code Playgroud)
调用时,StreamBridge
执行最终会在将 ObservationEnabled 属性设置为 true 的定制器中结束:
然而,消费者仍然只得到两个标头:
如果您比较ObservationRegistry
HTTP 调用的相关日志:
它和de KafkaTemplate里面的不一样:
问题似乎出在KafkaTemplate
:
ObservationRegistry 在应用程序启动期间初始化,此时ProducerMessageHandlerCustomizer
尚未调用。因此,observationEnabled 的值将始终为 false,不执行 if 块并默认为 NOOP 注册表:
private ObservationRegistry observationRegistry = ObservationRegistry.NOOP;
Run Code Online (Sandbox Code Playgroud)
更新2:
我已经尝试过这个解决方法
@Configuration
class KafkaProducerConfiguration {
@Bean
fun kafkaProducerObservationCustomizer (applicationContext: ApplicationContext, observationRegistry: ObservationRegistry) : ProducerMessageHandlerCustomizer<KafkaProducerMessageHandler<ClientProjectId, ProjectTaskEvent>> {
return ProducerMessageHandlerCustomizer<KafkaProducerMessageHandler<ClientProjectId, ProjectTaskEvent>> {
handler, destinationName ->
handler.kafkaTemplate.setObservationEnabled(true)
handler.kafkaTemplate.setApplicationContext(applicationContext)
handler.kafkaTemplate.afterSingletonsInstantiated()
}
}
}
Run Code Online (Sandbox Code Playgroud)
但这不起作用。它似乎扰乱了生产者的配置,覆盖了它的值。就我而言,它会查找本地 Kafka 集群而不是配置的集群:
2022-12-05T17:36:06.815+01:00 INFO [metrics-ingestor,,] 8056 --- [| adminclient-2] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-2] Node -1 disconnected.
2022-12-05T17:36:06.816+01:00 WARN [metrics-ingestor,,] 8056 --- [| adminclient-2] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-2] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
Run Code Online (Sandbox Code Playgroud)
底层KafkaTemplate
默认不开启微米追踪,需要设置observationEnabled
为true
。
https://docs.spring.io/spring-kafka/docs/current/reference/html/#observation
借助 Spring Cloud Stream,您可以通过ProducerMessageHandlerCustomizer
@Bean
处理程序类型是KafkaProducerMessageHandler
;所以用它handler.getKafkaTemplate().set...
来改变它的属性。
归档时间: |
|
查看次数: |
1959 次 |
最近记录: |