我正在尝试在 Activiti 中实现两个应该并行运行的服务任务。下面编写的代码随机(有趣的是)可以正常工作。
我的意思是它偶尔只打印“ first”(或“ second”)或打印两个first“一个second”等。
问题:如何使这些服务始终并行运行?无论当前运行的服务数量如何?
PS:当我activiti:async="true"从流程定义中删除时,它只打印“ first”或“ second”。我想我需要那个:)
<?xml version='1.0' encoding='UTF-8'?>
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
xmlns:activiti="http://activiti.org/bpmn" targetNamespace="Examples">
<process id='testparallelact' name="Developer Hiring" isExecutable="true" activiti:exclusive="false" activiti:async="true">
<startEvent id="theStart" />
<sequenceFlow id="flow1" sourceRef="theStart" targetRef="fork" />
<parallelGateway id="fork" activiti:async="true" />
<sequenceFlow sourceRef="fork" targetRef="receivePayment" />
<sequenceFlow sourceRef="fork" targetRef="shipOrder" />
<serviceTask id="receivePayment" name="Receive Payment" activiti:async="true" activiti:exclusive="false"
activiti:expression="${serviceConnections.runThis2('First')}"/>
<sequenceFlow sourceRef="receivePayment" targetRef="join" />
<serviceTask id="shipOrder" name="Ship Order" activiti:async="true" activiti:exclusive="false"
activiti:expression="${serviceConnections.runThis2('Second')}"/>
<sequenceFlow sourceRef="shipOrder" targetRef="join" />
<parallelGateway id="join" />
<sequenceFlow sourceRef="join" targetRef="theEnd" />
<endEvent id="theEnd" />
</process>
</definitions>
Run Code Online (Sandbox Code Playgroud)
public void runThis2(String test1) throws InterruptedException {
while(true)
{
Thread.sleep(1000);
System.out.println(test1);
}
}
Run Code Online (Sandbox Code Playgroud)
了解如何在 Activiti 引擎中执行作业很重要。以下论坛主题很好地描述了它:
https://community.alfresco.com/thread/221453-multiinstance-wont-run-task-in-parallel
2013 年 10 月 25 日,来自 Activiti 原架构师 Tijs Rademakers 的重要摘录:
例如,并行网关和多实例结构能够并行运行多个用户任务。但是对于服务和脚本任务,它们基本上仍然是串行执行的。如果您还将exclusive设置为false(默认为true),则异步可以更改此行为。然后作业执行器将只执行所有可用的作业,而不是连续执行。因此,尝试将 async 设置为 true 并将独占设置为 false。
现在,通过设置activiti:async="true"and activiti:exclusive="false",您实际上所做的是通过将服务任务(通常是串行处理的)分配给 Job Executor 在流程中创建“等待状态”。
但:
现在完全由 Job Executor 配置控制。(线程池大小、超时、并发作业数、作业块大小均可配置。)
现在,这并不是您所期望的,这意味着它取决于您的作业队列的大小、单次扫描中执行的作业数量以及每个作业的持续时间以及服务任务将在何时执行。意思是,它们可能并行执行,也可能串行执行。同样,您无法控制他们的顺序,因为再次由 Job Executor 来决定它做什么以及何时执行。
好的,假设这满足您的要求......
...您可能还会遇到另一个问题(实际上,这就是activiti:exclusive最初引入标志的原因)。服务任务完成后,执行上下文将提交到数据库中的流程实例记录以及历史记录。Activiti出于性能目的使用“乐观锁定”来记录。
现在,如果您的流程分支在时间上相对接近地完成,那么您可能(实际上很有可能)会收到Optimistic Locking Exception关于数据库的更新,如下所示:
09:59:52,432 [flowable-async-job-executor-thread-2] ERROR org.flowable.job.service.impl.asyncexecutor.DefaultAsyncRunnableExecutionExceptionHandler - Job 12575 failed org.flowable.engine.common.api.FlowableOptimisticLockingException: ProcessInstance[12567] was updated by another transaction concurrently
(注意:上面的错误实际上并不是来自 Activiti,而是来自一个名为“ Flowable ”的项目。但是,在最初提出这个问题时,它们的代码库与 Activiti 6 基本相同。(2017 年 11 月)。
这将导致服务任务被标记为 FAILED 并且将被重新尝试。如果您对 SOR(记录系统)或其他遗留系统进行外部调用,这可能会出现问题。(考虑会发生什么,如果你的航班实际上是成功预定,但调用储备它是由因为它被认为已经失败第二次。)
所有有趣的东西以及所有可以通过良好设计和使用最佳实践来解决的事情。
希望这可以帮助您了解正在发生的事情。
格雷格@BP3
Alfresco 论坛帖子包含几个死链接。以下是实时链接。
附加说明:Activiti 目前(2019 年)不再使用这两个(codehaus.org 或 atlassian.net)跟踪器。相反,他们使用这个 GitHub 跟踪器:https : //github.com/Activiti/Activiti/issues
activiti:async标志:https : //www.activiti.org/userguide/#asyncContinuationsactiviti:exclusive标志:https : //www.activiti.org/userguide/#exclusiveJobs