如何使用千分尺计时器记录异步方法的持续时间(返回Mono或Flux)

Mar*_*ark 5 asynchronous reactive-programming spring-kafka micrometer

我想使用Micrometer记录异步方法最终发生时的执行时间。有推荐的方法吗?

示例:Kafka回复模板。我想记录实际执行sendAndReceive调用(在请求主题上发送消息并在回复主题上接收响应)所花费的时间。

    public Mono<String> sendRequest(Mono<String> request) {
        return request
            .map(r -> new ProducerRecord<String, String>(requestsTopic, r))
            .map(pr -> {
                pr.headers()
                        .add(new RecordHeader(KafkaHeaders.REPLY_TOPIC,
                                "reply-topic".getBytes()));
                return pr;
            })
            .map(pr -> replyingKafkaTemplate.sendAndReceive(pr))
            ... // further maps, filters, etc.
Run Code Online (Sandbox Code Playgroud)

就像是

responseGenerationTimer.record(() -> replyingKafkaTemplate.sendAndReceive(pr)))
Run Code Online (Sandbox Code Playgroud)

在这里不会工作;它仅记录创建所需的时间Supplier,而不是实际的执行时间。

m.a*_*sik 5

您可以只使用Mono/Flux()中的metrics()(在这里查看metrics(): https: //projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html)然后您可以做类似的事情

public Mono<String> sendRequest(Mono<String> request) {
    return request
        .map(r -> new ProducerRecord<String, String>(requestsTopic, r))
        .map(pr -> {
            pr.headers()
                    .add(new RecordHeader(KafkaHeaders.REPLY_TOPIC,
                            "reply-topic".getBytes()));
            return pr;
        })
        .map(pr -> replyingKafkaTemplate.sendAndReceive(pr)).name("my-metricsname").metrics()
Run Code Online (Sandbox Code Playgroud)

例如,在 Graphite 中,您将看到测量的此调用的延迟(您可以在此处查看更多信息:如何将 Micrometer 计时器与 webflux 端点一起使用


Ale*_*kin 3

你可以使用reactor.util.context.Context

import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

import static org.hamcrest.Matchers.is;

public class TestMonoTimer {
    private static final Logger LOG = LoggerFactory.getLogger(TestMonoTimer.class);

    private static final String TIMER_SAMPLE = "TIMER_SAMPLE";
    private static final Timer TIMER = new SimpleMeterRegistry().timer("test");
    private static final AtomicBoolean EXECUTION_FLAG = new AtomicBoolean();

    @Test
    public void testMonoTimer() {
        Mono.fromCallable(() -> {
            Thread.sleep(1234);
            return true;
        }).transform(timerTransformer(TIMER))
                .subscribeOn(Schedulers.parallel())
                .subscribe(EXECUTION_FLAG::set);

        Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAtomic(EXECUTION_FLAG, is(true));
        Assert.assertTrue(TIMER.totalTime(TimeUnit.SECONDS) > 1);
    }

    private static <T> Function<Mono<T>, Publisher<T>> timerTransformer(Timer timer) {
        return mono -> mono
                .flatMap(t -> Mono.subscriberContext()
                        .flatMap(context -> Mono.just(context.<Timer.Sample>get(TIMER_SAMPLE).stop(timer))
                                .doOnNext(duration -> LOG.info("Execution time is [{}] seconds",
                                        duration / 1000000000D))
                                .map(ignored -> t)))
                .subscriberContext(context -> context.put(TIMER_SAMPLE, Timer.start(Clock.SYSTEM)));
    }
}
Run Code Online (Sandbox Code Playgroud)