小编Dov*_*vmo的帖子

RxJava API和Java 9 Flow API之间的区别

在最近几个主要版本的Java的每次迭代中,似乎都有新的方法来管理并发任务.

在Java 9中,我们有Flow API,类似于RxJava的Flowable API,但Java 9有一组更简单的类和接口.

Java 9

Flow.Publisher,Flow.Subscriber,Flow.Processor,Flow.Subscription,和SubmissionPublisher,这就是它.

RxJava

拥有全流API状类,即io.reactivex.flowables,io.reactivex.subscribers,io.reactivex.processors,io.reactivex.observers,和io.reactivex.observables这似乎做同样的事情.

这两个库之间的主要区别是什么?为什么有人会使用Java 9 Flow库而不是更多样化的RxJava库,反之亦然?

java rx-java java-9 rx-java2

62
推荐指数
4
解决办法
1万
查看次数

跨多线程解决方案的单一事务

据我了解,所有事务都是线程绑定的(即上下文存储在ThreadLocal中).例如,如果:

  1. 我在事务父方法中启动事务
  2. 使数据库在异步调用中插入#1
  3. 使数据库在另一个异步调用中插入#2

然后,即使它们共享相同的"事务性"父级,也会产生两个不同的事务(每个插入一个).

例如,假设我执行两次插入(并使用非常简单的示例,即为了简洁起见,不使用执行程序或可完成的未来等):

@Transactional
public void addInTransactionWithAnnotation() {
    addNewRow();
    addNewRow();
}
Run Code Online (Sandbox Code Playgroud)

将根据需要执行两个插入作为同一事务的一部分.

但是,如果我想并行化这些插入的性能:

@Transactional
public void addInTransactionWithAnnotation() {
    new Thread(this::addNewRow).start();
    new Thread(this::addNewRow).start();
}
Run Code Online (Sandbox Code Playgroud)

然后,这些生成的线程中的每一个都不会参与事务,因为事务是线程绑定的.

关键问题:有没有办法将事务安全地传播到子线程?

我想到的解决这个问题的唯一解决方案:

  1. 使用JTA或某些XA管理器,根据定义应该能够执行此操作.但是,我理想情况下不希望将XA用于我的解决方案,因为它的开销很大
  2. 将我想要执行的所有事务工作(在上面的示例中,addNewRow()函数)传递给单个线程,并以多线程方式执行所有先前的工作.
  3. 找出在Transaction状态上利用InheritableThreadLocal并将其传播到子线程的某种方法.我不知道该怎么做.

有没有更多可能的解决方案?即使它的味道有点像解决方法(比如上面的解决方案)?

java concurrency multithreading transactions

10
推荐指数
1
解决办法
4106
查看次数

使用AmazonSQSClient消耗缓慢的消息

因此,我在春季jms 50-100使用并发,允许最多连接高达200.一切都按预期工作但如果我尝试从队列中检索100k消息,我的意思是我的sqs上有100k消息,我通过弹簧读取它们jms正常的方法.

@JmsListener
Public void process (String message) {
count++;
Println (count);
//code
 }
Run Code Online (Sandbox Code Playgroud)

我在控制台中看到了所有日志,但是在大约17k之后,它开始抛出异常

像:aws sdk异常:端口已经在使用中.

为什么我会看到这个例外,怎么做.我摆脱它?

我试着在互联网上寻找它.找不到任何东西.

我的设定:

并发50-100

为每个任务设置消息:50

客户承认

timestamp=10:27:57.183, level=WARN , logger=c.a.s.j.SQSMessageConsumerPrefetch, message={ConsumerPrefetchThread-30} Encountered exception during receive in ConsumerPrefetch thread,
javax.jms.JMSException: AmazonClientException: receiveMessage.
    at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.handleException(AmazonSQSMessagingClientWrapper.java:422)
    at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.receiveMessage(AmazonSQSMessagingClientWrapper.java:339)
    at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.getMessages(SQSMessageConsumerPrefetch.java:248)
    at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.run(SQSMessageConsumerPrefetch.java:207)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.SdkClientException: Unable to execute HTTP request: Address already in use: connect
Run Code Online (Sandbox Code Playgroud)

更新:我找到了问题,似乎正在创建新的套接字,直到每个套接字都耗尽.

我的春季jms版本是4.3.10

要复制此问题,只需执行以上配置,最大连接为200,货币设置为50-100,并将大约40k消息推送到sqs队列.可以使用https://github.com/adamw/elasticmq作为本地堆栈服务器复制亚马逊sqs ..完成后直到这里.注释jms监听器并使用soap ui加载测试并调用send消息来触发许多消息.仅仅因为你评论了@jmslistener注释,它就不会消耗来自队列的消息.一旦您看到已发送40k消息,请停止.取消注释@jmslistener并重新启动服务器.

更新:

DefaultJmsListenerContainerFactory factory =
                new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setDestinationResolver(new …
Run Code Online (Sandbox Code Playgroud)

java amazon-sqs spring-jms spring-boot

6
推荐指数
1
解决办法
1063
查看次数

什么时候依赖 Spring 的 @PreDestroy 是安全的?

根据Spring 的文档,我添加了一个关闭钩子:

SpringApplication app = new SpringApplication(App.class);
DefaultProfileUtil.addDefaultProfile(app);
appContext = app.run(args);
appContext.registerShutdownHook();
Run Code Online (Sandbox Code Playgroud)

但是,@PreDestroy如果应用程序在启动后进行编辑,则不会调用该方法kill

import org.springframework.stereotype.Service;
import javax.annotation.PreDestroy;
import javax.annotation.PostConstruct;
@Service
public class Processor {
  public Processor() {
    ...
  }
  @PostConstruct
  public void init() {
    System.err.println("processor started");
  }
  //not called reliably
  @PreDestroy
  public void shutdown() {
    System.err.println("starting shutdown");
    try {Thread.sleep(1000*10);} catch (InterruptedException e) {e.printStackTrace();}
    System.err.println("shutdown completed properly");
  }
}
Run Code Online (Sandbox Code Playgroud)

我所看到的只是processor started...

processor started

^C
Run Code Online (Sandbox Code Playgroud)

如果我等待至少 30 秒让 spring 完成启动,然后执行kill该过程,那么@PreDestroy带注释的函数就会被调用。

processor …
Run Code Online (Sandbox Code Playgroud)

spring spring-annotations spring-boot

5
推荐指数
0
解决办法
910
查看次数

ActiveMQ Artemis 的池化连接工厂

是否有与PooledConnectionFactoryArtemis的 ActiveMQ 5 等效的版本?为什么它在一个中可用而不在另一个中可用?

例如,Spring 提供了一个通用的CachingConnectionFactory. 这很好,但它实现了SingleConnectionFactory并且唯一的“池”一个连接。

在实际汇集多个连接的 Artemis 客户端中拥有类似的机制将是关键。

另一个想法是可能没有实现,因为单个连接支持并发会话!我还没有测试每个会话使用新连接的性能。性能是否相同或相似?

spring activemq-artemis

3
推荐指数
1
解决办法
2140
查看次数

Postgres DDL 错误:''user' 处或附近的语法错误'

我正在尝试使用 postgres 数据库设置 Spring Boot 项目。我的实体是:-

用户

@Entity
public class User implements UserDetails {

    @Id
    @GeneratedValue(strategy=GenerationType.AUTO)
    @Column(name="id", nullable = false, updatable = false)
    private Long id;
    private String username;
    private String password;
    private String firstName;
    private String lastName;

    @Column(name="email", nullable = false, updatable = false)
    private String email;
    private String phone;
    private boolean enabled=true;


    @OneToMany(mappedBy = "user", cascade = CascadeType.ALL, fetch = FetchType.EAGER)
    @JsonIgnore
    private Set<UserRole> userRoles = new HashSet<>();

    public Long getId() {
        return id;
    }
    public void setId(Long …
Run Code Online (Sandbox Code Playgroud)

postgresql spring-data-jpa spring-boot

3
推荐指数
1
解决办法
3077
查看次数

ActiveMQ 主题上的并行消息消费

ActiveMQ 中的持久主题(这似乎是 JMS 本身的一个障碍)似乎只有一个使用者可以在订阅者上处于活动状态。

也就是说,在 ActiveMQ 文档中:

JMS 持久订阅者 MessageConsumer 是使用唯一的 JMS clientID 和持久订阅者名称创建的。要与 JMS 兼容,在任何时间点只能为一个 JMS clientID 激活一个 JMS 连接,并且对于一个 clientID 和订阅者名称只能激活一个使用者。即,只有一个线程可以从给定的逻辑主题订阅者中主动消费。

但是,其他排队系统(根据文档,Azure 服务总线似乎是这样做的)似乎很容易在单个“订阅”上允许多个线程“订阅者”。在这个时代,人们会认为这是理所当然的。

为什么会这样?这是否会在 JMS 和/或 ActiveMQ 的未来版本中得到解决?

PS,对于这种情况,“虚拟主题”(上面引用的文档)似乎不是一种理想且性能不佳的解决方法,因为它似乎在后台为每个订阅者创建了一个完全独立的队列。

java multithreading activemq-classic activemq-artemis

2
推荐指数
1
解决办法
1666
查看次数