dre*_*nda 2 java spring spring-boot spring-async
我有一个 Spring Boot 2.2 应用程序。我创建了一个这样的服务:
@Async
@PreAuthorize("hasAnyRole('ROLE_PBX')")
@PlanAuthorization(allowedPlans = {PlanType.BUSINESS, PlanType.ENTERPRISE})
public Future<AuditCdr> saveCDR(Cdr3CXDto cdrRecord) {
log.debug("Current tenant {}", TenantContext.getCurrentTenantId());
return new AsyncResult<AuditCdr>(auditCdrRepository.save(cdr3CXMapper.cdr3CXDtoToAuditCdr(cdrRecord)));
}
Run Code Online (Sandbox Code Playgroud)
这是我的@Async 配置:
@Configuration
@EnableAsync
public class AsyncConfiguration implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
SecurityContextHolder.setStrategyName(SecurityContextHolder.MODE_INHERITABLETHREADLOCAL);
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("threadAsync");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
}
Run Code Online (Sandbox Code Playgroud)
使用SecurityContextHolder.MODE_INHERITABLETHREADLOCAL我看到安全上下文被传递给@Async 方法。在我的多租户应用程序中,我使用 ThreadLocal 来设置租户的 id:
public class TenantContext {
public final static String TENANT_DEFAULT = "empty";
private static final ThreadLocal<String> code = new ThreadLocal<>();
public static void setCurrentTenantId(String code) {
if (code != null)
TenantContext.code.set(code);
}
public static String getCurrentTenantId() {
String tenantId = code.get();
if (StringUtils.isNotBlank(tenantId)) {
return tenantId;
}
return TENANT_DEFAULT;
}
public static void clear() {
code.remove();
}
}
Run Code Online (Sandbox Code Playgroud)
因为ThreadLocal与线程相关,所以在@Async方法中是不可用的。此外,我的自定义@PlanAuthorizationaop 需要它来验证租户的计划。有没有一种干净的方法可以在我的应用程序的任何 @Async 方法中设置 TenantContext?
wal*_*ros 13
这种情况的解决方案是:
配置自定义线程池,以便您覆盖它的执行方法来设置本地线程(或从主上下文执行任何任务),装饰任务并提交装饰任务来执行,而不是原始任务
指示 @Async 注解使用具体线程池
@Bean("tenantExecutor")
public Executor threadLocalAwareThreadPool() {
final CustomizableThreadFactory threadNameAwareFactory =
new CustomizableThreadFactory("threadAsync");
final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 10,
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(500), threadNameAwareFactory) {
// override original method of thread pool
@Override
public void execute(Runnable originalTask) {
final String tenantId = tenantThreadLocal.get(); // read data from current before passing the task to async thread
// decorate the actual task by creating new task (Runnable) where you first set up the thread local and then execute your actual task
super.execute(() -> {
tenantThreadLocal.set(tenantId); // set data in actual async thread
originalTask.run();
});
}
};
return threadPoolExecutor;
}
Run Code Online (Sandbox Code Playgroud)
现在我们告诉 spring 使用我们的自定义执行器
@Async("tenantExecutor")
public Future<AuditCdr> saveCDR(Cdr3CXDto cdrRecord) {
// your code....
}
Run Code Online (Sandbox Code Playgroud)
我最终使用了一个 TaskDecorator:
@Log4j2
public class MdcTaskDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
// Right now: Web thread context !
// (Grab the current thread MDC data)
String tenantId = TenantContext.getCurrentTenantId();
Long storeId = StoreContext.getCurrentStoreId();
SecurityContext securityContext = SecurityContextHolder.getContext();
Map<String, String> contextMap = MDC.getCopyOfContextMap();
log.info("Saving tenant information for async thread...");
return () -> {
try {
// Right now: @Async thread context !
// (Restore the Web thread context's MDC data)
TenantContext.setCurrentTenantId(tenantId);
StoreContext.setCurrentStoreId(storeId);
SecurityContextHolder.setContext(securityContext);
MDC.setContextMap(contextMap);
log.info("Restoring tenant information for async thread...");
runnable.run();
} catch (Throwable e) {
log.error("Error in async task", e);
} finally {
MDC.clear();
}
};
}
}
Run Code Online (Sandbox Code Playgroud)
我以这种方式使用它:
@Configuration
@EnableAsync
public class AsyncConfiguration implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
SecurityContextHolder.setStrategyName(SecurityContextHolder.MODE_INHERITABLETHREADLOCAL);
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("threadAsync");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setTaskDecorator(new MdcTaskDecorator());
executor.initialize();
return executor;
}
}
Run Code Online (Sandbox Code Playgroud)
它有效,而且似乎也是一个巧妙的解决方案。
| 归档时间: |
|
| 查看次数: |
2950 次 |
| 最近记录: |