当前位置: 首页 > 知识库问答 >
问题:

Activiti异步服务任务的作业执行者问题(Activiti>=5.17)

葛鸿轩
2023-03-14

请考虑下图

MyProcess.bpmn

<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:activiti="http://activiti.org/bpmn" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:omgdc="http://www.omg.org/spec/DD/20100524/DC" xmlns:omgdi="http://www.omg.org/spec/DD/20100524/DI" typeLanguage="http://www.w3.org/2001/XMLSchema" expressionLanguage="http://www.w3.org/1999/XPath" targetNamespace="http://www.activiti.org/test">
  <process id="myProcess" name="My process" isExecutable="true">
    <startEvent id="startevent1" name="Start"></startEvent>
    <userTask id="evl" name="Evaluation"></userTask>
    <boundaryEvent id="timer_event_autocomplete" name="Timer" attachedToRef="evl" cancelActivity="false">
      <timerEventDefinition>
        <timeDate>PT2S</timeDate>
      </timerEventDefinition>
    </boundaryEvent>
    <serviceTask id="timer_service" name="Timed Autocomplete" activiti:async="true" activiti:class="com.example.service.TimerService"></serviceTask>
    <serviceTask id="store_docs_service" name="Store Documents" activiti:async="true" activiti:class="com.example.service.StoreDocsService"></serviceTask>
    <sequenceFlow id="flow1" sourceRef="startevent1" targetRef="evl"></sequenceFlow>
    <sequenceFlow id="flow2" sourceRef="timer_event_autocomplete" targetRef="timer_service"></sequenceFlow>
    <sequenceFlow id="flow3" sourceRef="evl" targetRef="store_docs_service"></sequenceFlow>
    <sequenceFlow id="flow4" sourceRef="store_docs_service" targetRef="endevent1"></sequenceFlow>
    <endEvent id="endevent1" name="End"></endEvent>
  </process>

</definitions>

为了用文字来描述它,有一个用户任务(评估)和一个附加到它的计时器(配置为在2秒内触发)。触发计时器后,其Java委托中的Timed自动完成异步服务任务TimerService尝试完成用户任务(评估)。完成用户任务(评估)后,流移动到另一个异步服务任务(存储文档),它调用其Java委托,StoreDocsService,流结束。

计时器ervice.java

public class TimerService implements JavaDelegate {
    Logger LOGGER = LoggerFactory.getLogger(TimerService.class);

    @Override
    public void execute(DelegateExecution execution) throws Exception {
        LOGGER.info("*** Executing Timer autocomplete ***");
        Task task = execution.getEngineServices().getTaskService().createTaskQuery().active().singleResult();
        execution.getEngineServices().getTaskService().complete(task.getId());
        LOGGER.info("*** Task: {} autocompleted by timer ***", task.getId());
    }
}

StoreDocsService。Java语言

public class StoreDocsService implements JavaDelegate {
    Logger LOGGER = LoggerFactory.getLogger(StoreDocsService.class);

    @Override
    public void execute(DelegateExecution execution) throws Exception {
        LOGGER.info("*** Executing Store Documents ***");
    }
}

应用程序。Java语言

public class App
{
    public static void main( String[] args ) throws Exception
    {

//        DefaultAsyncJobExecutor demoAsyncJobExecutor = new DefaultAsyncJobExecutor();
//        demoAsyncJobExecutor.setCorePoolSize(10);
//        demoAsyncJobExecutor.setMaxPoolSize(50);
//        demoAsyncJobExecutor.setKeepAliveTime(10000);
//        demoAsyncJobExecutor.setMaxAsyncJobsDuePerAcquisition(50);

        ProcessEngineConfiguration cfg = new StandaloneProcessEngineConfiguration()
                .setJdbcUrl("jdbc:h2:mem:activiti;DB_CLOSE_DELAY=1000")
                .setJdbcUsername("sa")
                .setJdbcPassword("")
                .setJdbcDriver("org.h2.Driver")
                .setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE)
//                .setAsyncExecutorActivate(true)
//                .setAsyncExecutorEnabled(true)
//                .setAsyncExecutor(demoAsyncJobExecutor)
                .setJobExecutorActivate(true)
                ;
        ProcessEngine processEngine = cfg.buildProcessEngine();
        String pName = processEngine.getName();
        String ver = ProcessEngine.VERSION;
        System.out.println("ProcessEngine [" + pName + "] Version: [" + ver + "]");

        RepositoryService repositoryService = processEngine.getRepositoryService();
        Deployment deployment = repositoryService.createDeployment()
                .addClasspathResource("MyProcess.bpmn").deploy();
        ProcessDefinition processDefinition = repositoryService.createProcessDefinitionQuery()
                .deploymentId(deployment.getId()).singleResult();
        System.out.println(
                "Found process definition ["
                        + processDefinition.getName() + "] with id ["
                        + processDefinition.getId() + "]");

        final Map<String, Object> variables = new HashMap<String, Object>();
        final RuntimeService runtimeService = processEngine.getRuntimeService();

        ProcessInstance id = runtimeService.startProcessInstanceByKey("myProcess", variables);
        System.out.println("Started Process Id: "+id.getId());
        try {
            final TaskService taskService = processEngine.getTaskService();
//            List<Task> tasks = taskService.createTaskQuery().active().list();
//            while (!tasks.isEmpty()) {
//                Task task = tasks.get(0);
//                taskService.complete(task.getId());
//                tasks = taskService.createTaskQuery().active().list();
//            }

        } catch (Exception e) {
            System.out.println(e.getMessage());
        } finally {

        }

        while(!runtimeService.createExecutionQuery().list().isEmpty()) {
        }
        processEngine.close();
    }


}

活动5.15

当计时器触发时,如上所述执行上图。我们使用Activiti的DefaultJobExecutor,如日志中所示:

[main] INFO  org.activiti.engine.impl.ProcessEngineImpl  - ProcessEngine default created
[main] INFO  org.activiti.engine.impl.jobexecutor.JobExecutor  - Starting up the JobExecutor[org.activiti.engine.impl.jobexecutor.DefaultJobExecutor].
[Thread-1] INFO  org.activiti.engine.impl.jobexecutor.AcquireJobsRunnable  - JobExecutor[org.activiti.engine.impl.jobexecutor.DefaultJobExecutor] starting to acquire jobs
ProcessEngine [default] Version: [5.15]
[main] INFO  org.activiti.engine.impl.bpmn.deployer.BpmnDeployer  - Processing resource MyProcess.bpmn
Found process definition [My process] with id [myProcess:1:3]
Started Process Id: 4
[pool-1-thread-1] INFO  com.example.service.TimerService  - *** Executing Timer autocomplete ***
[pool-1-thread-1] INFO  com.example.service.TimerService  - *** Task: 9 autocompleted by timer ***
[pool-1-thread-1] INFO  com.example.service.StoreDocsService  - *** Executing Store Documents ***
[main] INFO  org.activiti.engine.impl.jobexecutor.JobExecutor  - Shutting down the JobExecutor[org.activiti.engine.impl.jobexecutor.DefaultJobExecutor].
[Thread-1] INFO  org.activiti.engine.impl.jobexecutor.AcquireJobsRunnable  - JobExecutor[org.activiti.engine.impl.jobexecutor.DefaultJobExecutor] stopped job acquisition

活动

日志:

[main] INFO  org.activiti.engine.impl.ProcessEngineImpl  - ProcessEngine default created
[main] INFO  org.activiti.engine.impl.jobexecutor.JobExecutor  - Starting up the JobExecutor[org.activiti.engine.impl.jobexecutor.DefaultJobExecutor].
[Thread-1] INFO  org.activiti.engine.impl.jobexecutor.AcquireJobsRunnableImpl  - JobExecutor[org.activiti.engine.impl.jobexecutor.DefaultJobExecutor] starting to acquire jobs
ProcessEngine [default] Version: [5.17.0.2]
[main] INFO  org.activiti.engine.impl.bpmn.deployer.BpmnDeployer  - Processing resource MyProcess.bpmn
Found process definition [My process] with id [myProcess:1:3]
Started Process Id: 4
[pool-1-thread-2] INFO  com.example.service.TimerService  - *** Executing Timer autocomplete ***
[pool-1-thread-2] INFO  com.example.service.TimerService  - *** Task: 9 autocompleted by timer ***
  • 正在更改为异步作业执行器。5.17版本的一个特性是新的异步作业执行器(但默认的非异步执行器仍配置为默认)。因此,尝试在应用程序中启用异步执行器。java通过以下行:
        DefaultAsyncJobExecutor demoAsyncJobExecutor = new DefaultAsyncJobExecutor();
        demoAsyncJobExecutor.setCorePoolSize(10);
        demoAsyncJobExecutor.setMaxPoolSize(50);
        demoAsyncJobExecutor.setKeepAliveTime(10000);
        demoAsyncJobExecutor.setMaxAsyncJobsDuePerAcquisition(50);

        ProcessEngineConfiguration cfg = new StandaloneProcessEngineConfiguration()
                .setJdbcUrl("jdbc:h2:mem:activiti;DB_CLOSE_DELAY=1000")
                .setJdbcUsername("sa")
                .setJdbcPassword("")
                .setJdbcDriver("org.h2.Driver")
                .setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE)
                .setAsyncExecutorActivate(true)
                .setAsyncExecutorEnabled(true)
                .setAsyncExecutor(demoAsyncJobExecutor)
                ;

流似乎执行正确,在TimerService之后调用StoreDocsService,但它永远不会结束(!runtimeService.createExecutionQuery())。列表()。isEmpty())应用程序中的语句。java总是正确的)!

日志:

[main] INFO  org.activiti.engine.impl.ProcessEngineImpl  - ProcessEngine default created
[main] INFO  org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor  - Starting up the default async job executor [org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor].
[main] INFO  org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor  - Creating thread pool queue of size 100
[main] INFO  org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor  - Creating executor service with corePoolSize 10, maxPoolSize 50 and keepAliveTime 10000
[Thread-1] INFO  org.activiti.engine.impl.asyncexecutor.AcquireTimerJobsRunnable  - {} starting to acquire async jobs due
[Thread-2] INFO  org.activiti.engine.impl.asyncexecutor.AcquireAsyncJobsDueRunnable  - {} starting to acquire async jobs due
ProcessEngine [default] Version: [5.17.0.2]
[main] INFO  org.activiti.engine.impl.bpmn.deployer.BpmnDeployer  - Processing resource MyProcess.bpmn
Found process definition [My process] with id [myProcess:1:3]
Started Process Id: 4
[pool-1-thread-2] INFO  com.example.service.TimerService  - *** Executing Timer autocomplete ***
[pool-1-thread-2] INFO  com.example.service.TimerService  - *** Task: 9 autocompleted by timer ***
[pool-1-thread-3] INFO  com.example.service.StoreDocsService  - *** Executing Store Documents ***

!!!!更新!!!

Activiti 6.0.0

尝试了相同的场景,但使用了Activiti版本6.0.0TimerService中需要的更改,无法从委托执行中获取Engineering Services

public class TimerService implements JavaDelegate {
    Logger LOGGER = LoggerFactory.getLogger(TimerService.class);

    @Override
    public void execute(DelegateExecution execution) {
        LOGGER.info("*** Executing Timer autocomplete ***");
        Task task = Context.getProcessEngineConfiguration().getTaskService().createTaskQuery().active().singleResult();
        Context.getProcessEngineConfiguration().getTaskService().complete(task.getId());
//        Task task = execution.getEngineServices().getTaskService().createTaskQuery().active().singleResult();
//        execution.getEngineServices().getTaskService().complete(task.getId());
        LOGGER.info("*** Task: {} autocompleted by timer ***", task.getId());
    }
}

而这个版本只有异步执行器,所以在应用程序中的ProcessEngineConfiguration。java更改为:

        ProcessEngineConfiguration cfg = new StandaloneProcessEngineConfiguration()
                .setJdbcUrl("jdbc:h2:mem:activiti;DB_CLOSE_DELAY=1000")
                .setJdbcUsername("sa")
                .setJdbcPassword("")
                .setJdbcDriver("org.h2.Driver")
                .setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE)
                .setAsyncExecutorActivate(true)
//                .setAsyncExecutorEnabled(true)
//                .setAsyncExecutor(demoAsyncJobExecutor)
//                .setJobExecutorActivate(true)
                ;

使用6.0.0版本和异步执行器,流程成功完成,如日志所示:

[main] INFO  org.activiti.engine.impl.ProcessEngineImpl  - ProcessEngine default created
[main] INFO  org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor  - Starting up the default async job executor [org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor].
[main] INFO  org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor  - Creating thread pool queue of size 100
[main] INFO  org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor  - Creating executor service with corePoolSize 2, maxPoolSize 10 and keepAliveTime 5000
[Thread-1] INFO  org.activiti.engine.impl.asyncexecutor.AcquireAsyncJobsDueRunnable  - {} starting to acquire async jobs due
[Thread-2] INFO  org.activiti.engine.impl.asyncexecutor.AcquireTimerJobsRunnable  - {} starting to acquire async jobs due
[Thread-3] INFO  org.activiti.engine.impl.asyncexecutor.ResetExpiredJobsRunnable  - {} starting to reset expired jobs
ProcessEngine [default] Version: [6.0.0.4]
Found process definition [My process] with id [myProcess:1:3]
Started Process Id: 4
[activiti-async-job-executor-thread-2] INFO  com.example.service.TimerService  - *** Executing Timer autocomplete ***
[activiti-async-job-executor-thread-2] INFO  com.example.service.TimerService  - *** Task: 10 autocompleted by timer ***
[activiti-async-job-executor-thread-2] INFO  com.example.service.StoreDocsService  - *** Executing Store Documents ***
[main] INFO  org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor  - Shutting down the default async job executor [org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor].
[activiti-reset-expired-jobs] INFO  org.activiti.engine.impl.asyncexecutor.ResetExpiredJobsRunnable  - {} stopped resetting expired jobs
[activiti-acquire-timer-jobs] INFO  org.activiti.engine.impl.asyncexecutor.AcquireTimerJobsRunnable  - {} stopped async job due acquisition
[activiti-acquire-async-jobs] INFO  org.activiti.engine.impl.asyncexecutor.AcquireAsyncJobsDueRunnable  - {} stopped async job due acquisition

Process finished with exit code 0

2个问题:

  1. 我们已从Activiti 5.15升级到5.22.0,并且不使用异步作业执行器。是否有任何方法可以保持此图表的功能与5.15中的功能相同
  2. 如果切换到异步作业执行器是不可避免的,那么为了成功完成此过程,我们缺少什么

上述示例项目可在以下网址找到:https://github.com/pleft/DemoActiviti

共有1个答案

闻人献
2023-03-14

在没有明确回答您的问题(这需要设置环境和调试)的情况下,我建议您至少转到Activiti 6。第5条。Activiti的x分支机构已经有5年多没有维护了,实际上已经死亡。即使是6。随着核心开发人员都转向“Flowable”项目,x线几乎被放弃了。如果你选择留在Activiti 5。x、 您的选择包括:

  1. 自己维护代码库(并希望为项目做出任何更改/增强)
  2. 合同活动支持服务。有几个供应商提供这种服务
 类似资料:
  • 我正在从Activiti Version5.21迁移到6.0.0。除了预期的更改(一些意外的更改)之外,我在执行异步邮件任务时遇到了一个有趣的问题。邮件任务按预期发送消息,并且没有引发错误,但是尽管操作成功,但仍然遵循了重试失败流。在3次尝试(发送了3封电子邮件)之后,作业会移动到死信表,就好像发生了错误一样。表中的错误消息为 “JobEntity[ID=某个ID]被另一个事务同时更新” 奇怪的是

  • 这是在一次Android采访中被问到的。有人问我是否可以从异步任务 1 的 doInBackground() 方法(让它成为 Task1)启动另一个异步任务(让它成为 Task2)。我浏览了文档,其中说了以下内容: 必须在UI线程上创建任务实例。 必须在 UI 线程上调用 execute(Params...)。 根据这些陈述,我认为从另一个任务的后台方法启动一个任务是不可能的。此外,async任务

  • 在Server程序中如果需要执行很耗时的操作,比如一个聊天服务器发送广播,Web服务器中发送邮件。如果直接去执行这些函数就会阻塞当前进程,导致服务器响应变慢。 Swoole提供了异步任务处理的功能,可以投递一个异步任务到TaskWorker进程池中执行,不影响当前请求的处理速度。 程序代码 基于第一个TCP服务器,只需要增加onTask和onFinish 2个事件回调函数即可。另外需要设置task

  • 问题内容: 快速回顾一下-我有一个Java EE前端,可以接受用户请求,然后针对每个请求使用ExecutorService(SingleThreadedExecutor设置为守护程序)启动冗长的工作流,该工作流包含在库中并且可以工作很好,并且在通过Eclipse以独立模式运行时按预期运行。当从website(servlet)调用时,我观察到工作流始终在初始化Velocity Engine(Velo

  • 我正在使用RESTEasy与Spring MVC的集成,如“39.2.Spring MVC集成”一节http://docs.jboss.org/RESTEasy/docs/2.0.0.ga/userguide/html/resteasy_spring_integration.html所述 我想试验一下Resteasy对“异步作业服务”的实现,如下所述:http://docs.jboss.org/r

  • 本文向大家介绍C#异步执行任务的方法,包括了C#异步执行任务的方法的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了C#异步执行任务的方法。分享给大家供大家参考。具体如下: 希望本文所述对大家的C#程序设计有所帮助。