我问自己是否有办法在订阅者收到 onNext 信号之前将反应性上下文推送到 ThreadLocal 变量中。在研究 reactor-core 时,我发现了 Hooks 类和 Lift BiFunction。
我创建了一个具有以下实现的类。该类由一个 ThreadLocal 变量组成,该变量将保存 Context 并实现必要的 BiFunction 接口。它将所有调用委托给实际订阅者,并且如果在实际订阅者上调用 onNext 之前修改到 ThreadLocal 变量,它还会推送上下文。
package com.example.demo;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.util.context.Context;
import java.util.function.BiFunction;
public class ThreadLocalContextLifter<T> implements BiFunction<Scannable, CoreSubscriber<? super T>, CoreSubscriber<? super T>> {
private static Logger logger = LoggerFactory.getLogger(ThreadLocalContextLifter.class);
private static final ThreadLocal<Context> contextHolder = new ThreadLocal<>();
public static Context getContext() {
Context context = contextHolder.get();
if (context == null) {
context = Context.empty();
contextHolder.set(context);
}
return context;
}
public static void setContext(Context context) {
contextHolder.set(context);
}
@Override
public CoreSubscriber<? super T> apply(Scannable scannable, CoreSubscriber<? super T> coreSubscriber) {
return new ThreadLocalContextCoreSubscriber<>(coreSubscriber);
}
final class ThreadLocalContextCoreSubscriber<U> implements CoreSubscriber<U> {
private CoreSubscriber<? super U> delegate;
public ThreadLocalContextCoreSubscriber(CoreSubscriber<? super U> delegate) {
this.delegate = delegate;
}
@Override
public Context currentContext() {
return delegate.currentContext();
}
@Override
public void onSubscribe(Subscription s) {
delegate.onSubscribe(s);
}
@Override
public void onNext(U u) {
Context context = delegate.currentContext();
if (!context.isEmpty()) {
Context currentContext = ThreadLocalContextLifter.getContext();
if (!currentContext.equals(context)) {
logger.info("Pushing reactive context to holder {}", context);
ThreadLocalContextLifter.setContext(context);
}
}
delegate.onNext(u);
}
@Override
public void onError(Throwable t) {
delegate.onError(t);
}
@Override
public void onComplete() {
delegate.onComplete();
}
}
}
Run Code Online (Sandbox Code Playgroud)
使用以下代码将该实例加载到 Hooks 中:
Hooks.onEachOperator(Operators.lift(new ThreadLocalContextLifter<>()));
Run Code Online (Sandbox Code Playgroud)
我已经运行了一些测试,它似乎工作正常,但我不相信解决方案。我猜这个钩子会降低反应器的性能,或者在我不知道的某些情况下它不起作用。
我的问题很简单:这是一个坏主意吗?
我不认为这个想法有什么问题……每个 Reactor 提供的操作员都使用这个钩子。
Context之间没有变化,onNext因此电梯ThreadLocalContextCoreSubscriber可以捕获它onSubscribe。但是您仍然需要在 中至少检查一次 ThreadLocal onNext,因为onNext和onSubscribe 可能发生在两个不同的线程上,因此您的使用解决方案delegate.currentContext()也有效。最后,你的方法看起来不错。
| 归档时间: |
|
| 查看次数: |
1228 次 |
| 最近记录: |