我有一个Spring服务:
@Service
@Transactional
public class SomeService {
@Async
public void asyncMethod(Foo foo) {
// processing takes significant time
}
}
Run Code Online (Sandbox Code Playgroud)
我有一个集成测试SomeService:
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = Application.class)
@WebAppConfiguration
@IntegrationTest
@Transactional
public class SomeServiceIntTest {
@Inject
private SomeService someService;
@Test
public void testAsyncMethod() {
Foo testData = prepareTestData();
someService.asyncMethod(testData);
verifyResults();
}
// verifyResult() with assertions, etc.
}
Run Code Online (Sandbox Code Playgroud)
这是问题所在:
SomeService.asyncMethod(..)与注释@Async和SpringJUnit4ClassRunner坚持@Async语义该testAsyncMethod线程将呼叫分叉someService.asyncMethod(testData)到自己的工作线程,然后直接继续执行verifyResults(),以前的工作线程完成其工作可能之前.
someService.asyncMethod(testData)在验证结果之前,我该如何等待完成?请注意,如何使用Spring 4和注释编写单元测试来验证异步行为?不要在这里申请,作为someService.asyncMethod(testData) …
我的应用程序应该有2个核心端点:push,pull for sending和fetching data.
拉操作应该异步工作并产生DeferredResult.当用户通过休息调用pull service时,会创建新的DefferedResult并将其存储到Map<Long, DefferedResult> results = new ConcurrentHashMap<>()等待新数据的位置或直到超时到期.
推送操作也会呼叫用户过度休息,此操作会检查此操作所推送的数据接收者的结果映射.当map包含收件人的结果时,这些数据被设置为他的结果,返回DefferedResult.
这是基本代码:
@Service
public class FooServiceImpl {
Map<Long, DefferedResult> results = new ConcurrentHashMap<>();
@Transactional
@Override
public DeferredResult<String> pull(Long userId) {
// here is database call, String data = fooRepository.getNewData(); where I check if there are some new data in database, and if there are, just return it, if not add deferred result into collection to wait for it
DeferredResult<String> newResult = new DeferredResult<>(5000L);
results.putIfAbsent(userId, …Run Code Online (Sandbox Code Playgroud) 我正在使用Spring启动@EnableScheduling和@EnableAsync.
我有一个注释的方法@Scheduled.我有几个方法,用注释@Async.
现在我在@Async方法中调用这些方法,@Scheduled并在异步方法中打印出当前线程的名称.我看到的是它们都有相同的线程名称,实际上它是运行该@Scheduled方法的线程.
我没有看到异步方法执行.这有什么不对?
这是我的应用程序启动类
@SpringBootApplication
@EnableScheduling
@EnableAsync
public class ApplicationBoot {
public static void main(String[] args) {
SpringApplication.run(ApplicationBoot.class, args);
}
}
Run Code Online (Sandbox Code Playgroud)
这是我的调度程序类
@Component
public class TaskScheduler {
private static final Logger logger = Logger.getLogger(TaskScheduler.class);
@Scheduled(fixedDelay = 10000)
public void ScheduledMethod() {
methodOne();
methodTwo();
methodThree();
}
@Async
private void methodOne() {
logger.info("Method one called by Thread : " + Thread.currentThread().getName() + " at " + new …Run Code Online (Sandbox Code Playgroud) 我有一个 Spring Flux 应用程序,在某些时候我需要在后台执行一些繁重的任务,调用者(HTTP 请求)不需要等到该任务完成。
如果没有反应器,我可能只会使用Async注释,在不同的线程上执行该方法。对于reactor,我不确定是否应该继续采用这种方法,或者是否已经有内置机制可以实现这一点。
例如,给定一个接受Resource对象的Controller:
@PostMapping("/create")
public Mono<Resource> create(@Valid @RequestBody Resource r) {
processor.run(r); // the caller should not wait for the resource to be processed
return repository.save(r);
}
Run Code Online (Sandbox Code Playgroud)
和一个处理器类:
@Async
void run(Resource r) {
WebClient webClient = WebClient.create("http://localhost:8080");
Mono<String> result = webClient.get()
.retrieve()
.bodyToMono(String.class);
String response = result.block(); //block for now
}
Run Code Online (Sandbox Code Playgroud)
HTTP 调用方/create不需要等待run方法完成。
@PostConstruct
public void performStateChecks() {
throw new RuntimeException("test");
}
Run Code Online (Sandbox Code Playgroud)
如果我spring使用上面的代码启动应用程序,它将阻止该应用程序启动。
我正在寻找的是在启动后直接执行一个方法,但是异步的。意味着,它不应该延迟启动,并且即使发生故障也不应该阻止应用程序运行。
如何使初始化异步?
为异步方法提供Spring配置类:
@Configuration
@EnableAsync(proxyTargetClass = true)
@EnableScheduling
public class AsyncConfiguration {
@Autowired
private ApplicationContext applicationContext;
@Bean
public ActivityMessageListener activityMessageListener() {
return new ActivityMessageListener();
}
@Bean
public TaskExecutor defaultExecutor()
{
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(10);
threadPoolTaskExecutor.setMaxPoolSize(10);
threadPoolTaskExecutor.setQueueCapacity(Integer.MAX_VALUE);
return threadPoolTaskExecutor;
}
Run Code Online (Sandbox Code Playgroud)
我所有的@Async方法按预期工作,但如果我执行AsyncConfigurer到AsyncConfiguration为了赶实施异常getAsyncUncaughtExceptionHandler()的方法,我的豆子没有被代理这样的方法@Async并不在池中执行运行.
这是非工作配置:
@Configuration
@EnableAsync(proxyTargetClass = true)
@EnableScheduling
public class AsyncConfiguration implements AsyncConfigurer {
@Autowired
private ApplicationContext applicationContext;
@Bean
public ActivityMessageListener activityMessageListener() {
return new ActivityMessageListener();
}
@Override
public Executor getAsyncExecutor() { …Run Code Online (Sandbox Code Playgroud) 我需要缓存一些异步计算的结果.详细地说,为了克服这个问题,我试图使用Spring 4.3缓存和异步计算功能.
举个例子,我们来看下面的代码:
@Service
class AsyncService {
@Async
@Cacheable("users")
CompletableFuture<User> findById(String usedId) {
// Some code that retrieves the user relative to id userId
return CompletableFuture.completedFuture(user);
}
}
Run Code Online (Sandbox Code Playgroud)
可能吗?我的意思是,Spring的缓存抽象是否会正确处理类型的对象CompletableFuture<User>?我知道Caffeine Cache有类似的东西,但是我无法理解如果Spring正确配置它是否会使用它.
编辑:我对User对象本身不感兴趣,但是CompletableFuture代表了计算.
我已经配置了两个不同的线程池,一个用于@Scheduled和另一个用于@Async.但是,我注意到@Async没有使用线程池.
这是Scheduler配置
@Configuration
@EnableScheduling
public class SchedulerConfig implements SchedulingConfigurer {
private final int POOL_SIZE = 10;
@Override
public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize(POOL_SIZE);
threadPoolTaskScheduler.setThreadNamePrefix("my-sched-pool-");
threadPoolTaskScheduler.initialize();
scheduledTaskRegistrar.setTaskScheduler(threadPoolTaskScheduler);
}
}
Run Code Online (Sandbox Code Playgroud)
这是Async的配置
@Configuration
@EnableAsync
public class AppConfig {
@Bean(name = "asyncTaskExecutor")
public TaskExecutor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(15);
executor.setMaxPoolSize(15);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("my-async-pool-");
executor.initialize();
return executor;
}
}
Run Code Online (Sandbox Code Playgroud)
这是我如何调用它们
@Scheduled(fixedRateString = "2000" )
public void schedule() {
log.debug("here is the message …Run Code Online (Sandbox Code Playgroud) threadpoolexecutor spring-scheduled spring-boot taskscheduler spring-async
我正在使用Spring @EnableAsync和@Async注释构建一个多线程的Spring Boot应用程序.当我使用单个线程(CorePoolSize 1,MaxPoolSize 1)运行应用程序时,一切都按预期工作.当我将池大小增加到1以上似乎是随机出现时,我得到java.sql.SQLNonTransientException:[Amazon] JDBC并非所有参数都已填充.调用Amazon AWS Redshift数据库时出错.
在ServiceProcessBean.java中,我自动连接了我的ProcessService类(要完成的线程工作)和ShipmentDAO,它加载了一个由ProcessService.process()方法处理的装运ID列表,代码如下.
@Component
public class ShipmentBatchBean {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private ShipmentDAO shipmentDAO;
@Autowired
private ProcessService processService;
@Scheduled(
initialDelayString = "${executor.delay.initial}",
fixedDelayString = "${executor.delay.fixed}"
)
public void cronJob() throws InterruptedException, ExecutionException {
List<CompletableFuture<Boolean>> asyncResponse = new ArrayList<>();
logger.info("Starting cronJob() method");
try {
List<String> shipments = shipmentDAO.getAllShipmentsReadyForIeta();
logger.info("There are {} shipments to be processed in the data store",
shipments.size());
for(String shipment : shipments) {
asyncResponse.add(processService.process(shipment));
}
} catch (Exception …Run Code Online (Sandbox Code Playgroud) 我在 Scala 中有一个带有多个端点的 Spring Boot API。所有端点都是async并 return DeferredResult。在某些情况下,我想使用过滤器来记录响应正文。我创建了一个顺序为 1 的过滤器来缓存请求和响应,如下所示:
@Component
@Order(1)
class ContentCachingFilter extends OncePerRequestFilter {
override def doFilterInternal(request: HttpServletRequest, response: HttpServletResponse, filterChain: FilterChain): Unit = {
val requestToCache = new ContentCachingRequestWrapper(request)
val responseToCache = new ContentCachingResponseWrapper(response)
filterChain.doFilter(requestToCache, responseToCache)
responseToCache.copyBodyToResponse()
}
}
Run Code Online (Sandbox Code Playgroud)
我的日志过滤器类似于下面(删除了域特定逻辑):
@Component
@Order(2)
class RequestResponseLoggingFilter extends OncePerRequestFilter {
override def doFilterInternal(request: HttpServletRequest, response: HttpServletResponse, filterChain: FilterChain): Unit = {
logger.info(
s"Request -> URL : ${request.getMethod} ${request.getRequestURI} ${request.getParameterMap.asScala.stringRepresentation()}")
logger.debug(
s"Request Headers -> Content-type : ${request.getContentType} …Run Code Online (Sandbox Code Playgroud) spring-async ×10
java ×7
spring ×7
spring-boot ×6
asynchronous ×1
cglib ×1
junit ×1
spring-cache ×1
spring-jdbc ×1
spring-mvc ×1