将给定ID的任务绑定到同一线程的线程池

Sim*_*onC 16 java multithreading threadpool

是否存在线程池(在Java中)的任何实现,以确保在同一线程上执行相同逻辑ID的所有任务?

我所追求的逻辑是,如果在给定逻辑ID的特定线程上已经执行了任务,则在同一线程上安排具有相同ID的新任务.如果没有线程为同一ID执行任务,则可以使用任何线程.

这将允许并行执行不相关ID的任务,但是同一ID的任务将以串行和提交的顺序执行.

如果没有,是否有任何关于如何扩展ThreadPoolExecutor以获得此行为的建议(如果可能的话)?

UPDATE

花了更长时间考虑这个问题,我实际上并不要求在同一个线程上执行相同逻辑ID的任务,只是它们不会同时执行.

这方面的一个例子是处理客户订单的系统,可以同时处理多个订单,但不能同一个客户(并且必须按顺序处理同一客户的所有订单).

我现在采用的方法是使用标准的ThreadPoolExecutor,自定义BlockingQueueRunnable使用自定义包装器进行包装.的Runnable包装器逻辑是:

  1. 以原子方式尝试将ID添加到并发"running"set(ConcurrentHashMap)以查看当前是否正在运行相同ID的任务
    • 如果添加失败,请将任务重新推送到队列的前面并立即返回
    • 如果成功,继续
  2. 运行任务
  3. 从"正在运行"的集合中删除任务的关联ID

然后,队列的poll()方法只返回具有当前不在"运行"集中的ID的任务.

这样做的问题在于,我确信会有很多我没有想过的极端情况,因此需要进行大量的测试.

h22*_*h22 6

创建一组执行程序服务,每个执行程序服务运行一个线程,并通过项目 ID 的哈希码将队列条目分配给它们。该数组可以是任意大小,具体取决于您最多要使用多少个线程。

这将限制我们可以从执行程序服务中使用,但仍然允许使用其功能在不再需要时关闭唯一的线程(使用allowCoreThreadTimeOut(true))并根据需要重新启动它。此外,所有排队的东西都可以工作而无需重写。


San*_*osh -1

扩展ThreadPoolExecutor将是相当困难的。我建议你采用生产者-消费者系统。这就是我的建议。

  1. 您可以创建典型的生产者消费者系统。查看此问题中提到的代码。
  2. 现在每个系统都会有一个队列和一个消费者线程,它将串行处理队列中的任务
  3. 现在,创建一个此类单独系统的池
  4. 当您提交相关ID的任务时,查看是否已经有一个标记该相关ID的系统当前正在处理任务,如果是则提交任务,
  5. 如果它未处理任何任务,则使用此新的相关 ID 标记该系统并提交任务。
  6. 这样,单个系统将只满足一个逻辑相关的 ID。

在这里,我假设相关 ID 是单个 ID 的逻辑集合,并且将为相关 ID 而不是单个 ID 创建生产者消费者系统。