elastic-job是当当开源的一款分布式定时作业框架。在这之前,我们开发定时任务一般都是使用quartz或者spring-task(ScheduledExecutorService
),无论是使用quartz还是spring-task,我们都会至少遇到两个痛点:
elastic-job在2.x之后,出了两个产品线:Elastic-Job-Lite
和Elastic-Job-Cloud
。我们一般使用Elastic-Job-Lite
就能够满足需求,本文也是以Elastic-Job-Lite
为主。1.x系列对应的就只有Elastic-Job-Lite
,并且在2.x里修改了一些核心类名,差别虽大,原理类似,建议使用2.x系列。
Springboot 集成Elastic-job
1.maven pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.3.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.kongliand</groupId>
<artifactId>springboot-elastic-job</artifactId>
<version>1.0.0</version>
<name>springboot-elastic-job</name>
<description>springboot-elastic-job</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2.Elastic-job核心配置
package com.kongliand.elasticjob.config; import com.dangdang.ddframe.job.api.simple.SimpleJob; import com.dangdang.ddframe.job.config.JobCoreConfiguration; import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration; import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener; import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration; import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter; import com.kongliand.elasticjob.annotation.TaskJob; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.Iterator; import java.util.Map; /** * @description 任务配置信息 * @class_name ElasticJobConfig * @author kevin * @created 2020/8/30 6:59 下午 * @version 1.0 */ @Configuration public class ElasticJobConfig { @Resource private ApplicationContext applicationContext ; @Resource private ZookeeperRegistryCenter zookeeperRegistryCenter; @Value("${job-config.cron}") private String cron ; @Value("${job-config.shardCount}") private int shardCount ; @Value("${job-config.shardItem}") private String shardItem ; /** * 配置任务监听器 */ @Bean public ElasticJobListener elasticJobListener() { return new TaskJobListener(); } /** * 初始化配置任务 */ @PostConstruct public void initTaskJob() { Map<String, SimpleJob> jobMap = this.applicationContext.getBeansOfType(SimpleJob.class); Iterator iterator = jobMap.entrySet().iterator(); while (iterator.hasNext()) { // 自定义注解管理 Map.Entry<String, SimpleJob> entry = (Map.Entry)iterator.next(); SimpleJob simpleJob = entry.getValue(); TaskJob taskJobSign = simpleJob.getClass().getAnnotation(TaskJob.class); if (taskJobSign != null){ String cron = taskJobSign.cron() ; String jobName = taskJobSign.jobName() ; // 生成配置 SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration( JobCoreConfiguration.newBuilder(jobName, cron, shardCount) .shardingItemParameters(shardItem).jobParameter(jobName).build(), simpleJob.getClass().getCanonicalName()); LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder( simpleJobConfiguration).overwrite(true).build(); TaskJobListener taskJobListener = new TaskJobListener(); // 初始化任务 SpringJobScheduler jobScheduler = new SpringJobScheduler( simpleJob, zookeeperRegistryCenter, liteJobConfiguration, taskJobListener); jobScheduler.init(); } } } }
3.zookeeper配置信息
package com.kongliand.elasticjob.config; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 初始化Zookeeper配置 */ @Configuration public class InitZookeeper { @Value("${zookeeper.server}") private String serverList ; @Value("${zookeeper.namespace}") private String namespace ; @Bean(initMethod = "init") public ZookeeperRegistryCenter regCenter() { ZookeeperConfiguration config = new ZookeeperConfiguration(serverList, namespace) ; return new ZookeeperRegistryCenter(config); } }
4.简单定时任务
package com.kongliand.elasticjob.taskjob; import com.dangdang.ddframe.job.api.ShardingContext; import com.dangdang.ddframe.job.api.simple.SimpleJob; import com.kongliand.elasticjob.annotation.TaskJob; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; /** * @author kevin */ @Component @TaskJob(cron = "0/5 * * * * ?",jobName = "Hello-Job") public class HelloJob implements SimpleJob { private static final Logger LOG = LoggerFactory.getLogger(HelloJob.class.getName()) ; @Override public void execute(ShardingContext shardingContext) { LOG.info("当前线程: "+Thread.currentThread().getId()); LOG.info("任务分片:"+shardingContext.getShardingTotalCount()); LOG.info("当前分片:"+shardingContext.getShardingItem()); LOG.info("分片参数:"+shardingContext.getShardingParameter()); LOG.info("任务参数:"+shardingContext.getJobParameter()); } }
5.配置任务监听器
package com.kongliand.elasticjob.config; import com.dangdang.ddframe.job.executor.ShardingContexts; import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @Component public class TaskJobListener implements ElasticJobListener { private static final Logger LOG = LoggerFactory.getLogger(TaskJobListener.class); private long beginTime = 0; @Override public void beforeJobExecuted(ShardingContexts shardingContexts) { beginTime = System.currentTimeMillis(); LOG.info(shardingContexts.getJobName()+"===>开始..."); } @Override public void afterJobExecuted(ShardingContexts shardingContexts) { long endTime = System.currentTimeMillis(); LOG.info(shardingContexts.getJobName()+"===>结束...[耗时:"+(endTime - beginTime)+"]"); } }
6.启动测试输出
2020-08-30 19:18:45.009 INFO 19103 --- [ Hello_Worker-1] c.k.elasticjob.config.TaskJobListener : Hello===>开始...
2020-08-30 19:18:45.013 INFO 19103 --- [ner-job-Hello-3] c.kongliand.elasticjob.taskjob.HelloJob : 当前线程: 77
2020-08-30 19:18:45.013 INFO 19103 --- [ner-job-Hello-3] c.kongliand.elasticjob.taskjob.HelloJob : 任务分片:2
2020-08-30 19:18:45.013 INFO 19103 --- [ner-job-Hello-4] c.kongliand.elasticjob.taskjob.HelloJob : 当前线程: 78
2020-08-30 19:18:45.013 INFO 19103 --- [ner-job-Hello-3] c.kongliand.elasticjob.taskjob.HelloJob : 当前分片:0
2020-08-30 19:18:45.013 INFO 19103 --- [ner-job-Hello-4] c.kongliand.elasticjob.taskjob.HelloJob : 任务分片:2
2020-08-30 19:18:45.013 INFO 19103 --- [ner-job-Hello-3] c.kongliand.elasticjob.taskjob.HelloJob : 分片参数:A
2020-08-30 19:18:45.013 INFO 19103 --- [ner-job-Hello-4] c.kongliand.elasticjob.taskjob.HelloJob : 当前分片:1
2020-08-30 19:18:45.013 INFO 19103 --- [ner-job-Hello-3] c.kongliand.elasticjob.taskjob.HelloJob : 任务参数:Hello
2020-08-30 19:18:45.013 INFO 19103 --- [ner-job-Hello-4] c.kongliand.elasticjob.taskjob.HelloJob : 分片参数:B
2020-08-30 19:18:45.013 INFO 19103 --- [ner-job-Hello-4] c.kongliand.elasticjob.taskjob.HelloJob : 任务参数:Hello
2020-08-30 19:18:45.016 INFO 19103 --- [ Hello_Worker-1] c.k.elasticjob.config.TaskJobListener : Hello===>结束...[耗时:7]