在最近几个主要版本的Java的每次迭代中,似乎都有新的方法来管理并发任务.
在Java 9中,我们有Flow API,类似于RxJava的Flowable API,但Java 9有一组更简单的类和接口.
Java 9
有Flow.Publisher,Flow.Subscriber,Flow.Processor,Flow.Subscription,和SubmissionPublisher,这就是它.
RxJava
拥有全包的流API状类,即io.reactivex.flowables,io.reactivex.subscribers,io.reactivex.processors,io.reactivex.observers,和io.reactivex.observables这似乎做同样的事情.
这两个库之间的主要区别是什么?为什么有人会使用Java 9 Flow库而不是更多样化的RxJava库,反之亦然?
据我了解,所有事务都是线程绑定的(即上下文存储在ThreadLocal中).例如,如果:
然后,即使它们共享相同的"事务性"父级,也会产生两个不同的事务(每个插入一个).
例如,假设我执行两次插入(并使用非常简单的示例,即为了简洁起见,不使用执行程序或可完成的未来等):
@Transactional
public void addInTransactionWithAnnotation() {
addNewRow();
addNewRow();
}
Run Code Online (Sandbox Code Playgroud)
将根据需要执行两个插入作为同一事务的一部分.
但是,如果我想并行化这些插入的性能:
@Transactional
public void addInTransactionWithAnnotation() {
new Thread(this::addNewRow).start();
new Thread(this::addNewRow).start();
}
Run Code Online (Sandbox Code Playgroud)
然后,这些生成的线程中的每一个都不会参与事务,因为事务是线程绑定的.
关键问题:有没有办法将事务安全地传播到子线程?
我想到的解决这个问题的唯一解决方案:
addNewRow()函数)传递给单个线程,并以多线程方式执行所有先前的工作.有没有更多可能的解决方案?即使它的味道有点像解决方法(比如上面的解决方案)?
因此,我在春季jms 50-100使用并发,允许最多连接高达200.一切都按预期工作但如果我尝试从队列中检索100k消息,我的意思是我的sqs上有100k消息,我通过弹簧读取它们jms正常的方法.
@JmsListener
Public void process (String message) {
count++;
Println (count);
//code
}
Run Code Online (Sandbox Code Playgroud)
我在控制台中看到了所有日志,但是在大约17k之后,它开始抛出异常
像:aws sdk异常:端口已经在使用中.
为什么我会看到这个例外,怎么做.我摆脱它?
我试着在互联网上寻找它.找不到任何东西.
我的设定:
并发50-100
为每个任务设置消息:50
客户承认
timestamp=10:27:57.183, level=WARN , logger=c.a.s.j.SQSMessageConsumerPrefetch, message={ConsumerPrefetchThread-30} Encountered exception during receive in ConsumerPrefetch thread,
javax.jms.JMSException: AmazonClientException: receiveMessage.
at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.handleException(AmazonSQSMessagingClientWrapper.java:422)
at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.receiveMessage(AmazonSQSMessagingClientWrapper.java:339)
at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.getMessages(SQSMessageConsumerPrefetch.java:248)
at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.run(SQSMessageConsumerPrefetch.java:207)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.SdkClientException: Unable to execute HTTP request: Address already in use: connect
Run Code Online (Sandbox Code Playgroud)
更新:我找到了问题,似乎正在创建新的套接字,直到每个套接字都耗尽.
我的春季jms版本是4.3.10
要复制此问题,只需执行以上配置,最大连接为200,货币设置为50-100,并将大约40k消息推送到sqs队列.可以使用https://github.com/adamw/elasticmq作为本地堆栈服务器复制亚马逊sqs ..完成后直到这里.注释jms监听器并使用soap ui加载测试并调用send消息来触发许多消息.仅仅因为你评论了@jmslistener注释,它就不会消耗来自队列的消息.一旦您看到已发送40k消息,请停止.取消注释@jmslistener并重新启动服务器.
更新:
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setDestinationResolver(new …Run Code Online (Sandbox Code Playgroud) 根据Spring 的文档,我添加了一个关闭钩子:
SpringApplication app = new SpringApplication(App.class);
DefaultProfileUtil.addDefaultProfile(app);
appContext = app.run(args);
appContext.registerShutdownHook();
Run Code Online (Sandbox Code Playgroud)
但是,@PreDestroy如果应用程序在启动后进行编辑,则不会调用该方法kill。
import org.springframework.stereotype.Service;
import javax.annotation.PreDestroy;
import javax.annotation.PostConstruct;
@Service
public class Processor {
public Processor() {
...
}
@PostConstruct
public void init() {
System.err.println("processor started");
}
//not called reliably
@PreDestroy
public void shutdown() {
System.err.println("starting shutdown");
try {Thread.sleep(1000*10);} catch (InterruptedException e) {e.printStackTrace();}
System.err.println("shutdown completed properly");
}
}
Run Code Online (Sandbox Code Playgroud)
我所看到的只是processor started...
processor started
^C
Run Code Online (Sandbox Code Playgroud)
如果我等待至少 30 秒让 spring 完成启动,然后执行kill该过程,那么@PreDestroy带注释的函数就会被调用。
processor …Run Code Online (Sandbox Code Playgroud) 是否有与PooledConnectionFactoryArtemis的 ActiveMQ 5 等效的版本?为什么它在一个中可用而不在另一个中可用?
例如,Spring 提供了一个通用的CachingConnectionFactory. 这很好,但它实现了SingleConnectionFactory并且唯一的“池”一个连接。
在实际汇集多个连接的 Artemis 客户端中拥有类似的机制将是关键。
另一个想法是可能没有实现,因为单个连接支持并发会话!我还没有测试每个会话使用新连接的性能。性能是否相同或相似?
我正在尝试使用 postgres 数据库设置 Spring Boot 项目。我的实体是:-
用户
@Entity
public class User implements UserDetails {
@Id
@GeneratedValue(strategy=GenerationType.AUTO)
@Column(name="id", nullable = false, updatable = false)
private Long id;
private String username;
private String password;
private String firstName;
private String lastName;
@Column(name="email", nullable = false, updatable = false)
private String email;
private String phone;
private boolean enabled=true;
@OneToMany(mappedBy = "user", cascade = CascadeType.ALL, fetch = FetchType.EAGER)
@JsonIgnore
private Set<UserRole> userRoles = new HashSet<>();
public Long getId() {
return id;
}
public void setId(Long …Run Code Online (Sandbox Code Playgroud) ActiveMQ 中的持久主题(这似乎是 JMS 本身的一个障碍)似乎只有一个使用者可以在订阅者上处于活动状态。
也就是说,在 ActiveMQ 文档中:
JMS 持久订阅者 MessageConsumer 是使用唯一的 JMS clientID 和持久订阅者名称创建的。要与 JMS 兼容,在任何时间点只能为一个 JMS clientID 激活一个 JMS 连接,并且对于一个 clientID 和订阅者名称只能激活一个使用者。即,只有一个线程可以从给定的逻辑主题订阅者中主动消费。
但是,其他排队系统(根据文档,Azure 服务总线似乎是这样做的)似乎很容易在单个“订阅”上允许多个线程“订阅者”。在这个时代,人们会认为这是理所当然的。
为什么会这样?这是否会在 JMS 和/或 ActiveMQ 的未来版本中得到解决?
PS,对于这种情况,“虚拟主题”(上面引用的文档)似乎不是一种理想且性能不佳的解决方法,因为它似乎在后台为每个订阅者创建了一个完全独立的队列。
java ×4
spring-boot ×3
spring ×2
amazon-sqs ×1
concurrency ×1
java-9 ×1
postgresql ×1
rx-java ×1
rx-java2 ×1
spring-jms ×1
transactions ×1