Stu*_*uck 6 java postgresql spring hibernate stream-processing
给定一个包含大量项目的结果流,我想存储它们并处理潜在的并发冲突:
public void onTriggerEvent(/* params */) {
Stream<Result> results = customThreadPool.submit(/*...complex parallel computation on multiple servers...*/).get();
List<Result> conflicts = store(results);
resolveConflictsInNewTransaction(conflicts);
}
Run Code Online (Sandbox Code Playgroud)
我被困在如何store(...)有效地实施。在Result由描述数据,需要在其各自的数据库表被更新这两件不更改和拆卸对象。
@Value
public static class Result {
A a; // describes update for row in table a
B b; // describes update for row in table b
}
Run Code Online (Sandbox Code Playgroud)
A并且B每个引用两个用户,其中(u1, u2)是各自数据库表上的一个键。
@Value
public static class A {
long u1;
long u2;
// ... computed data fields ...
}
// B accordingly
Run Code Online (Sandbox Code Playgroud)
流计算本身可能会同时触发(onTriggerEvent并行多次调用),这通常很好,但有时可能会导致某些结果发生冲突(大约 0.1% 发生冲突,例如一个流有一个结果(53,21),另一个调用也(53,21)同时更新)。A和/或的冲突由与操作开始时不同B的updatedAt字段指示。在这里,当然,我们不想扔掉所有结果而只是重试,而只想解决冲突的行。
所以我想知道有什么好方法可以 (1) 存储所有的Result.a并且Result.b不冲突的和 (2) 得到一个List存在Result冲突并且需要特殊处理的s 。
public List<Result> store(Stream<Result> results) {
// store all a
// store all b (ideally without using results * 2 RAM)
// do update other stuff if a and b are not in conflict and do it in the same ACID transaction as the update of the related a and b.
// return those in Conflict
}
Run Code Online (Sandbox Code Playgroud)
如何在不解包每个结果的情况下实现它,将其发送到自己的事务中的数据库等?理想情况下,我需要一次将所有内容发送到数据库并获取尚未存储的冲突列表(另一个应该已保留)。我也对不同的方法持开放态度。
如果相关,我们会使用 JPA/Hibernate。
最简单的方法是将持久性简化为 FIFO 队列(存在很多技术,但一般来说,这将成为“每个事务单个条目”的方式,这不是所需的方法)。
因此,对于第二个选项,我会将并发冲突定义的逻辑从数据库持久操作移至单独的服务。
您可以实现类似 UserId-to-Reentrant 锁的内存映射之类的东西(与同步块相比,这些操作非常快)。
在第一次调用持久化期间,锁被锁定;成功坚持后锁被释放。同时(在单独的线程中)您可以检查锁的状态,并通过该状态进行过滤,或者等待锁被释放。请注意等待状态:您有流,因此处理流的整个线程将进入等待状态。
就我个人而言,我会坚持第一个“每个事务单个条目”,中间有一些(持久的)消息队列,并具有用于锁定检查的单独服务。首先,这将使我们能够轻松配置写操作的并发性;其次,可以轻松地在写入器中使用等待状态,因为只有一个条目将被锁定。
| 归档时间: |
|
| 查看次数: |
173 次 |
| 最近记录: |