Fee*_*eek 10 java concurrency jvm
我正在模拟一个银行系统,我有100,000个交易要运行.每种类型的事务都实现了runnable,我可以发生各种类型的事务.
transactions 是一个Runnables数组.
理想情况下,以下代码可以解决我的问题:
for (Transaction transaction : transactions) {
new Thread(transaction).start();
}
Run Code Online (Sandbox Code Playgroud)
但是,java.lang.OutOfMemoryError: unable to create new native thread在尝试启动100,000个线程时,显然会发生一个问题.
接下来我尝试实现一个ExecutorService来创建一个线程池来管理我的100,000个runnables.
ExecutorService service;
int cpus = Runtime.getRuntime().availableProcessors();
// cpus == 8 in my case
service = Executors.newFixedThreadPool(cpus);
for (Transaction transaction : transactions) {
service.execute(transaction);
}
Run Code Online (Sandbox Code Playgroud)
尝试这种方法时,长进程会"占用"JVM.例如,一种类型的事务需要30-60秒才能执行.在分析应用程序时,在长事务发生时不允许其他线程运行.

在这种情况下,线程6在其处理完成之前不允许任何其他线程运行.
所以我的问题是:如何在不遇到内存问题的情况下尽快运行100,000个事务?如果ExecutorService是答案,那么如何阻止非常长的事务占用JVM并允许其他事务同时运行?
编辑:
我故意强制某些类型的事务发生30-60秒,以确保我的线程程序正常工作.每个事务锁定一个帐户,有10个帐户.这是我的方法,它占用了JVM :(被称为run())
public void makeTransaction() {
synchronized(account) {
long timeStarted = System.nanoTime();
long timeToEnd = timeStarted + nanos;
this.view = new BatchView(transactionNumber, account.getId());
this.displayView();
while(true) {
if(System.nanoTime() % 1000000000 == 0) {
System.out.println("batch | " + account.getId());
}
if(System.nanoTime() >= timeToEnd) {
break;
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
每次运行此事务时,只有一个帐户被锁定,剩下9个应该可用于处理的帐户.为什么JVM不再处理任何线程,而是挂起直到这个长事务完成?
以下是项目缩小版的链接,用于演示问题:项目
Pet*_*rey 10
在分析应用程序时,在长事务发生时不允许其他线程运行.
最有可能的是,此任务使用的是单线程资源.即写入ti的方式可防止并发使用.
如何在不遇到内存问题的情况下尽快运行100,000个事务?
如果事务是CPU绑定的,则应该有一个与您拥有的CPU数量大小相同的池.
如果事务依赖于数据库,则应该查看对它们进行批处理以更有效地利用数据库.
如果ExecutorService是答案,那么如何阻止非常长的事务占用JVM并允许其他事务同时运行?
使交易更短.如果你的任务运行超过几毫秒,你应该弄清楚为什么这么长时间.我首先看一下网络/ IO必须如何使用和分析任务.大多数交易(如果您有大量交易)应该在0.01秒左右或理想情况下远远不够.
您应该非常小心地考虑如何使用共享资源.如果您的任务使用相同的资源太多,您可能会发现多线程不会更快,甚至更慢.
您的应用程序的问题在于,很快所有线程都会为同一帐户分配一笔交易,然后除了一个线程之外的所有线程都必须等待。您可以在下面的屏幕截图中看到这一点,如果我暂停了应用程序。Thread pool-1-thread-3 当前正在处理 id 为 19 的 Account 对象的事务(该 id 不是您的帐户 id,而是 Eclipse 分配的唯一对象 id),并且所有其他线程正在等待同一对象上的锁账户对象。帐户对象是您的 id 为 9 的帐户对象。

为什么会出现这种情况?在事务 853 中,一个线程启动第一个长时间运行的事务(针对账户 9)。其他线程继续处理其他事务。然而,当任何线程到达帐户 9 的另一个事务时,它必须停止并等待。事务 857、861 和 862 也是针对帐户 9 的,每个事务都会阻塞一个线程,因此此时我的所有线程都被阻塞(在我的四核上)。
怎么解决这个问题呢?这取决于您的用例。
如果在您的实际程序中,只要账户 X 正在运行另一个交易,就可以保证给定账户 X 没有传入交易,那么您不需要进行任何更改。
如果您的帐户数量与线程数量相比非常大,则问题变得更不可能,因此您可能决定接受它。
如果您的帐户数量相对较少(假设可能少于一百左右),那么您应该(正如彼得所说)每个帐户有一个(无限运行的)线程,每个线程都有自己的事务队列。这可能会更有效,因为线程不需要“争夺”共享队列。
另一种解决方案是实施某种形式的“工作窃取”。这意味着每当一个线程被阻塞时,它就会寻找其他工作来做。要实现这一点,您首先需要能够检查线程是否可以获得给定帐户的锁定。在 Java中synchronized这是不可能的,所以你需要类似ReentrantLock.tryLock(). 您还需要能够从每个线程直接访问事务队列,所以我猜您不能ExecutorService在这里使用,但需要自己实现事务处理(使用LinkedBlockingQueue)。
现在,每个线程都会循环轮询队列中的事务。首先,它尝试使用 获取相应帐户的锁tryLock()。如果失败,请将事务添加到(特定于线程的)列表,从队列中获取下一个事务,然后尝试执行此事务,直到找到可以处理的事务。事务完成后,首先在列表中查找现在可以处理的事务,然后再从全局队列中提取另一个事务。您可能大致喜欢以下代码:
public BlockingQueue<Transaction> queue = ...; // the global queue for all threads
public void run() {
LinkedList<Transaction> myTransactions = new LinkedList<>();
while (true) {
Transaction t = queue.take();
while (!t.getLock().tryLock()) {
myTransactions.add(t);
}
try {
// here we hold the lock for t
t.makeTransaction();
} finally {
t.getLock().unlock();
}
Iterator<Transaction> iter = myTransactions.iterator();
while (iter.hasNext()) {
t = iter.next();
if (t.getLock().tryLock()) {
try {
t.makeTransaction();
} finally {
t.getLock().unlock();
}
iter.remove();
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
请注意,这至少仍然存在您可能想要解决的以下问题:
queue.take(),它不会检查其列表中的事务是否变得可用。因此,如果在一段时间内queue为空(例如在处理结束时),则可能会有事务卡在列表中而未得到处理。一个更简单的替代方案可能是将put()事务放入队列(最后),如果您无法获取它们的锁,但这将使它们以非常任意的顺序执行(这也可能发生在上面的解决方案中,但也许不是这样)极其)。
编辑: 更好的解决方案可能是将队列附加到每个帐户,而不是特定于线程的列表。然后,只要线程发现该帐户被阻塞,就会将交易添加到相应帐户的队列中。当线程完成账户 X 的交易时,它应该首先查看账户 X 的队列(是否已添加任何交易),然后再查看全局列表。
| 归档时间: |
|
| 查看次数: |
1022 次 |
| 最近记录: |