Den*_*nov 11 java spring spring-transactions spring-boot spring-async
我的应用程序应该有2个核心端点:push,pull for sending和fetching data.
拉操作应该异步工作并产生DeferredResult.当用户通过休息调用pull service时,会创建新的DefferedResult并将其存储到Map<Long, DefferedResult> results = new ConcurrentHashMap<>()等待新数据的位置或直到超时到期.
推送操作也会呼叫用户过度休息,此操作会检查此操作所推送的数据接收者的结果映射.当map包含收件人的结果时,这些数据被设置为他的结果,返回DefferedResult.
这是基本代码:
@Service
public class FooServiceImpl {
Map<Long, DefferedResult> results = new ConcurrentHashMap<>();
@Transactional
@Override
public DeferredResult<String> pull(Long userId) {
// here is database call, String data = fooRepository.getNewData(); where I check if there are some new data in database, and if there are, just return it, if not add deferred result into collection to wait for it
DeferredResult<String> newResult = new DeferredResult<>(5000L);
results.putIfAbsent(userId, newResult);
newResult.onCompletion(() -> results.remove(userId));
// if (data != null)
// newResult.setResult(data);
return newResult;
}
@Transactional
@Override
public void push(String data, Long recipientId) {
// fooRepository.save(data, recipientId);
if (results.containsKey(recipientId)) {
results.get(recipientId).setResult(data);
}
}
}
Run Code Online (Sandbox Code Playgroud)
代码正在运行,因为我预期的问题是,它也适用于多个用户.我想调用pull操作的最大活动用户数最多为1000.所以每次调用pull都会花费最多5秒,因为我在DefferedResult中设置但不是.
正如您在图像中看到的那样,如果我立即从我的javascript客户端多次调用其余拉动操作,您可以看到任务将按顺序而不是同时执行.我最后解雇的任务大约需要25秒,但我需要当1000个用户同时执行pull操作时,该操作应该花费最多5秒+延迟.
如何配置我的应用程序同时执行这些任务并确保每个任务约5秒或更少(当另一个用户向等待用户发送内容时)?我尝试将此配置添加到属性文件中:
server.tomcat.max-threads=1000
Run Code Online (Sandbox Code Playgroud)
还有这个配置:
@Configuration
public class AsyncConfig extends AsyncSupportConfigurer {
@Override
protected AsyncTaskExecutor getTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(1000);
taskExecutor.initialize();
return taskExecutor;
}
}
Run Code Online (Sandbox Code Playgroud)
但它没有帮助,仍然是相同的结果.你可以帮我配置吗?谢谢你的建议.
编辑:
这是我从角度调用此服务的方式:
this.http.get<any>(this.url, {params})
.subscribe((data) => {
console.log('s', data);
}, (error) => {
console.log('e', error);
});
Run Code Online (Sandbox Code Playgroud)
当我尝试使用纯JS代码多次调用它时,如下所示:
function httpGet()
{
var xmlHttp = new XMLHttpRequest();
xmlHttp.open( "GET", 'http://localhost:8080/api/pull?id=1', true );
xmlHttp.send( null );
return xmlHttp.responseText;
}
setInterval(httpGet, 500);
Run Code Online (Sandbox Code Playgroud)
它将更快地执行每个请求调用(大约7秒).我预计增加会导致数据库呼叫服务,但它仍然优于25秒.以角度调用此服务有什么问题吗?
编辑2:
我尝试了另一种形式的测试,而不是浏览器,我使用了jMeter.我在100个线程中执行100个请求,结果如下:
正如您所看到的那样,请求将继续执行10,并且在达到50之后请求应用程序抛出异常:
java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30000ms.
at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:667) ~[HikariCP-2.7.8.jar:na]
at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:183) ~[HikariCP-2.7.8.jar:na]
at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:148) ~[HikariCP-2.7.8.jar:na]
at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:128) ~[HikariCP-2.7.8.jar:na]
at org.hibernate.engine.jdbc.connections.internal.DatasourceConnectionProviderImpl.getConnection(DatasourceConnectionProviderImpl.java:122) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
at org.hibernate.internal.NonContextualJdbcConnectionAccess.obtainConnection(NonContextualJdbcConnectionAccess.java:35) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.acquireConnectionIfNeeded(LogicalConnectionManagedImpl.java:106) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.getPhysicalConnection(LogicalConnectionManagedImpl.java:136) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
at org.hibernate.internal.SessionImpl.connection(SessionImpl.java:523) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
at sun.reflect.GeneratedMethodAccessor61.invoke(Unknown Source) ~[na:na]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_171]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_171]
at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:223) ~[spring-core-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:207) ~[spring-core-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.orm.jpa.vendor.HibernateJpaDialect$HibernateConnectionHandle.doGetConnection(HibernateJpaDialect.java:391) ~[spring-orm-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.orm.jpa.vendor.HibernateJpaDialect.beginTransaction(HibernateJpaDialect.java:154) ~[spring-orm-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.orm.jpa.JpaTransactionManager.doBegin(JpaTransactionManager.java:400) ~[spring-orm-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:378) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:474) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:289) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at sk.moe.zoya.service.impl.FooServiceImpl$$EnhancerBySpringCGLIB$$ebab570a.pull(<generated>) ~[classes/:na]
at sk.moe.zoya.web.FooController.pull(FooController.java:25) ~[classes/:na]
at sun.reflect.GeneratedMethodAccessor60.invoke(Unknown Source) ~[na:na]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_171]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_171]
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:209) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:877) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:783) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:991) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:925) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:974) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:866) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:635) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:851) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:742) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52) ~[tomcat-embed-websocket-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:109) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:81) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:496) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:81) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:342) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:803) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:790) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1459) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-8.5.29.jar:8.5.29]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_171]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_171]
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-8.5.29.jar:8.5.29]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]
2018-06-02 13:21:47.163 WARN 26978 --- [io-8080-exec-48] o.h.engine.jdbc.spi.SqlExceptionHelper : SQL Error: 0, SQLState: null
2018-06-02 13:21:47.163 WARN 26978 --- [io-8080-exec-40] o.h.engine.jdbc.spi.SqlExceptionHelper : SQL Error: 0, SQLState: null
2018-06-02 13:21:47.163 ERROR 26978 --- [io-8080-exec-48] o.h.engine.jdbc.spi.SqlExceptionHelper : HikariPool-1 - Connection is not available, request timed out after 30000ms.
2018-06-02 13:21:47.163 ERROR 26978 --- [io-8080-exec-40] o.h.engine.jdbc.spi.SqlExceptionHelper : HikariPool-1 - Connection is not available, request timed out after 30000ms.
2018-06-02 13:21:47.164 ERROR 26978 --- [io-8080-exec-69] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is org.hibernate.exception.JDBCConnectionException: Unable to acquire JDBC Connection] with root cause
Run Code Online (Sandbox Code Playgroud)
我还评论我使用存储库的代码,以确保数据库没有任何内容,并且结果相同.另外,我使用AtomicLong类为每个请求设置了uniqe userId.
编辑3:
我发现当我发表评论时@Transactional一切正常!那么你能告诉我如何在不增加延迟的情况下为大量操作设置spring的事务吗?
我添加spring.datasource.maximumPoolSize=1000了增加池大小我认为应该,所以唯一的问题是如何使用@Transactional加速方法.
每次调用pull方法都使用@Transactional注释,因为我首先需要从数据库加载数据并检查是否有新数据,因为是的,我不必创建等待延迟结果.push方法也必须是@Transaction的注释,因为我首先需要在数据库中存储接收的数据,然后将该值设置为等待结果.对于我的数据,我使用的是Postgres.
这里的问题似乎是您的数据库池中的连接用完了。
您将方法标记为 ,@Transaction但您的控制器也期望方法的结果,即DeferredResult尽快交付,以便线程被释放。
现在,这是运行请求时发生的情况:
@Transaction功能是在 Spring 代理中实现的,它必须打开一个连接,调用您的主题方法,然后提交或回滚事务。fooService.pull方法时,它实际上是在调用代理。DeferredResult,然后将其传递给控制器以使其返回。现在,问题是它DeferredResult的设计方式应该异步使用。换句话说,promise 预计稍后会在其他线程中解决,我们应该尽快释放请求线程。
事实上,关于DeferredResult 的Spring 文档说:
@GetMapping("/quotes")
@ResponseBody
public DeferredResult<String> quotes() {
DeferredResult<String> deferredResult = new DeferredResult<String>();
// Save the deferredResult somewhere..
return deferredResult;
}
// From some other thread...
deferredResult.setResult(data);
Run Code Online (Sandbox Code Playgroud)
您代码中的问题正是DeferredResult在同一个请求线程中解决。
所以,问题是当 Spring 代理请求连接到数据库池时,当您进行重负载测试时,许多请求会发现池已满并且没有可用连接。因此请求被搁置,但此时您的DeferredResult尚未创建,因此其超时功能不存在。
您的请求基本上是在等待来自数据库池的某个连接变为可用。所以,假设 5 秒过去了,然后请求获得一个连接,现在您获得DeferredResult控制器用于处理响应的内容。最终,5 秒后它超时。因此,您必须添加等待池中连接的时间和等待DeferredResult解决的时间。
这就是为什么您可能会看到,当您使用 JMeter 进行测试时,随着连接从数据库池中耗尽,请求时间逐渐增加。
您可以通过添加以下 application.properties 文件来为线程池启用一些日志记录:
logging.level.com.zaxxer.hikari=DEBUG
Run Code Online (Sandbox Code Playgroud)
您还可以配置数据库池的大小,甚至添加一些 JMX 支持,以便您可以从 Java Mission Control 监控它:
spring.datasource.hikari.maximumPoolSize=10
spring.datasource.hikari.registerMbeans=true
Run Code Online (Sandbox Code Playgroud)
使用 JMX 支持,您将能够看到数据库池是如何耗尽的。
这里的技巧在于将解决承诺的逻辑移动到另一个线程:
@Override
public DeferredResult pull(Long previousId, String username) {
DeferredResult result = createPollingResult(previousId, username);
CompletableFuture.runAsync(() -> {
//this is where you encapsulate your db transaction
List<MessageDTO> messages = messageService.findRecents(previousId, username); // should be final or effective final
if (messages.isEmpty()) {
pollingResults.putIfAbsent(username, result);
} else {
result.setResult(messages);
}
});
return result;
}
Run Code Online (Sandbox Code Playgroud)
通过这样做,您DeferredResult将立即返回,并且 Spring 可以执行异步请求处理的魔力,同时释放宝贵的 Tomcat 线程。
| 归档时间: |
|
| 查看次数: |
2078 次 |
| 最近记录: |