使用 Hooks 和 Lift 将 Context 推送到 ThreadLocal

eti*_*iau 8 project-reactor

我问自己是否有办法在订阅者收到 onNext 信号之前将反应性上下文推送到 ThreadLocal 变量中。在研究 reactor-core 时,我发现了 Hooks 类和 Lift BiFunction。

我创建了一个具有以下实现的类。该类由一个 ThreadLocal 变量组成,该变量将保存 Context 并实现必要的 BiFunction 接口。它将所有调用委托给实际订阅者,并且如果在实际订阅者上调用 onNext 之前修改到 ThreadLocal 变量,它还会推送上下文。

package com.example.demo;

import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.util.context.Context;

import java.util.function.BiFunction;

public class ThreadLocalContextLifter<T> implements BiFunction<Scannable, CoreSubscriber<? super T>, CoreSubscriber<? super T>> {

    private static Logger logger = LoggerFactory.getLogger(ThreadLocalContextLifter.class);

    private static final ThreadLocal<Context> contextHolder = new ThreadLocal<>();

    public static Context getContext() {
        Context context = contextHolder.get();
        if (context == null) {
            context = Context.empty();
            contextHolder.set(context);
        }
        return context;
    }

    public static void setContext(Context context) {
        contextHolder.set(context);
    }

    @Override
    public CoreSubscriber<? super T> apply(Scannable scannable, CoreSubscriber<? super T> coreSubscriber) {
        return new ThreadLocalContextCoreSubscriber<>(coreSubscriber);
    }

    final class ThreadLocalContextCoreSubscriber<U> implements CoreSubscriber<U> {

        private CoreSubscriber<? super U> delegate;

        public ThreadLocalContextCoreSubscriber(CoreSubscriber<? super U> delegate) {
            this.delegate = delegate;
        }

        @Override
        public Context currentContext() {
            return delegate.currentContext();
        }

        @Override
        public void onSubscribe(Subscription s) {
            delegate.onSubscribe(s);
        }

        @Override
        public void onNext(U u) {
            Context context = delegate.currentContext();
            if (!context.isEmpty()) {
                Context currentContext = ThreadLocalContextLifter.getContext();
                if (!currentContext.equals(context)) {
                    logger.info("Pushing reactive context to holder {}", context);
                    ThreadLocalContextLifter.setContext(context);
                }
            }
            delegate.onNext(u);
        }

        @Override
        public void onError(Throwable t) {
            delegate.onError(t);
        }

        @Override
        public void onComplete() {
            delegate.onComplete();
        }
    }

}
Run Code Online (Sandbox Code Playgroud)

使用以下代码将该实例加载到 Hooks 中:

Hooks.onEachOperator(Operators.lift(new ThreadLocalContextLifter<>()));
Run Code Online (Sandbox Code Playgroud)

我已经运行了一些测试,它似乎工作正常,但我不相信解决方案。我猜这个钩子会降低反应器的性能,或者在我不知道的某些情况下它不起作用。

我的问题很简单:这是一个坏主意吗?

Sim*_*slé 5

我不认为这个想法有什么问题……每个 Reactor 提供的操作员都使用这个钩子。

Context之间没有变化,onNext因此电梯ThreadLocalContextCoreSubscriber可以捕获它onSubscribe。但是您仍然需要在 中至少检查一次 ThreadLocal onNext,因为onNextonSubscribe 可能发生在两个不同的线程上,因此您的使用解决方案delegate.currentContext()也有效。最后,你的方法看起来不错。