Mar*_*ark 5 asynchronous reactive-programming spring-kafka micrometer
我想使用Micrometer记录异步方法最终发生时的执行时间。有推荐的方法吗?
示例:Kafka回复模板。我想记录实际执行sendAndReceive调用(在请求主题上发送消息并在回复主题上接收响应)所花费的时间。
public Mono<String> sendRequest(Mono<String> request) {
return request
.map(r -> new ProducerRecord<String, String>(requestsTopic, r))
.map(pr -> {
pr.headers()
.add(new RecordHeader(KafkaHeaders.REPLY_TOPIC,
"reply-topic".getBytes()));
return pr;
})
.map(pr -> replyingKafkaTemplate.sendAndReceive(pr))
... // further maps, filters, etc.
Run Code Online (Sandbox Code Playgroud)
就像是
responseGenerationTimer.record(() -> replyingKafkaTemplate.sendAndReceive(pr)))
Run Code Online (Sandbox Code Playgroud)
在这里不会工作;它仅记录创建所需的时间Supplier,而不是实际的执行时间。
您可以只使用Mono/Flux()中的metrics()(在这里查看metrics(): https: //projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html)然后您可以做类似的事情
public Mono<String> sendRequest(Mono<String> request) {
return request
.map(r -> new ProducerRecord<String, String>(requestsTopic, r))
.map(pr -> {
pr.headers()
.add(new RecordHeader(KafkaHeaders.REPLY_TOPIC,
"reply-topic".getBytes()));
return pr;
})
.map(pr -> replyingKafkaTemplate.sendAndReceive(pr)).name("my-metricsname").metrics()
Run Code Online (Sandbox Code Playgroud)
例如,在 Graphite 中,您将看到测量的此调用的延迟(您可以在此处查看更多信息:如何将 Micrometer 计时器与 webflux 端点一起使用)
你可以使用reactor.util.context.Context
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import static org.hamcrest.Matchers.is;
public class TestMonoTimer {
private static final Logger LOG = LoggerFactory.getLogger(TestMonoTimer.class);
private static final String TIMER_SAMPLE = "TIMER_SAMPLE";
private static final Timer TIMER = new SimpleMeterRegistry().timer("test");
private static final AtomicBoolean EXECUTION_FLAG = new AtomicBoolean();
@Test
public void testMonoTimer() {
Mono.fromCallable(() -> {
Thread.sleep(1234);
return true;
}).transform(timerTransformer(TIMER))
.subscribeOn(Schedulers.parallel())
.subscribe(EXECUTION_FLAG::set);
Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAtomic(EXECUTION_FLAG, is(true));
Assert.assertTrue(TIMER.totalTime(TimeUnit.SECONDS) > 1);
}
private static <T> Function<Mono<T>, Publisher<T>> timerTransformer(Timer timer) {
return mono -> mono
.flatMap(t -> Mono.subscriberContext()
.flatMap(context -> Mono.just(context.<Timer.Sample>get(TIMER_SAMPLE).stop(timer))
.doOnNext(duration -> LOG.info("Execution time is [{}] seconds",
duration / 1000000000D))
.map(ignored -> t)))
.subscriberContext(context -> context.put(TIMER_SAMPLE, Timer.start(Clock.SYSTEM)));
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2439 次 |
| 最近记录: |