elastic-job-Lite入门

戎泰
2023-12-01

一、概述

Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供最轻量级的分布式任务的协调服务,外部依赖仅Zookeeper。
引入maven坐标

<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-core</artifactId>
    <version>2.1.5</version>
</dependency>
<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-spring</artifactId>
    <version>2.1.5</version>
</dependency>

以下入门代码

二、Java

public class App {
    public static void main(String[] args) {  
        new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();  
    }  

    private static CoordinatorRegistryCenter createRegistryCenter() {  
        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("192.168.134.10:2182,192.168.134.10:2183,192.168.134.10:2184", "elastic-job-demo"));  
        regCenter.init();  
        return regCenter;  
    }  

    private static LiteJobConfiguration createJobConfiguration() {  
        // 创建作业配置  
        JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("myDataFlowTest", "0/10 * * * * ?", 3).shardingItemParameters("0=0,1=1,2=2").build();  
        DataflowJobConfiguration dataflowJobConfig = new DataflowJobConfiguration(coreConfig, MyDataFlowJob.class.getCanonicalName(), true);  
        LiteJobConfiguration result = LiteJobConfiguration.newBuilder(dataflowJobConfig).build();  
        return result;  
    }  
}
public class MyDataFlowJob implements DataflowJob<User>{

    private DataProcess dataProcess = DataProcessFactory.getDataProcess();  

    @Override  
    public List<User> fetchData(ShardingContext context) {  
        List<User> result = new ArrayList<User>();  
        result = dataProcess.getData(context.getShardingParameter(), context.getShardingTotalCount());  
        System.out.println(String.format("------Thread ID: %s, Date: %s, Sharding Context: %s, Action: %s, Data: %s", Thread.currentThread().getId(), new Date(), context, "fetch data",result));  
        return result;  
    }  

    @Override  
    public void processData(ShardingContext shardingContext, List<User> data) {  
        System.out.println(String.format("------Thread ID: %s, Date: %s, Sharding Context: %s, Action: %s, Data: %s", Thread.currentThread().getId(), new Date(), shardingContext, "finish data",data));  
        for(User User:data){  
            dataProcess.setData(User.getId());  
        }  
        System.out.println(String.format("------Thread ID: %s, Date: %s, Sharding Context: %s, Action: %s, Data: %s", Thread.currentThread().getId(), new Date(), shardingContext, "after data",data));  
    }  

}
public class DataProcess {  

    private Map<Integer, User> data = new ConcurrentHashMap<>(30, 1);  
    public DataProcess()  
    {  
        for(int i=0;i<30;i++){  
            data.put(i, new User(i,User.Status.TODO));  
        }  
    }  
    public List<User> getData(String tailId,int shardNum)  
    {  
        int intId  = Integer.parseInt(tailId);  
        List<User> result = new ArrayList<User>();  
        for (Map.Entry<Integer, User> each : data.entrySet()) {  
            User User = each.getValue();  
            int key = each.getKey();  
            if (key % shardNum == intId && User.getStatus() == Status.TODO) {  
                result.add(User);  
            }  
        }  
        return result;  
    }  
    public void setData(int i){  
        data.get(i).setStatus(User.Status.DONE);  
    }  

} 
public class DataProcessFactory {

    private DataProcessFactory() {}

    private static DataProcess dataProcess = new DataProcess();  

    public static DataProcess getDataProcess() {  
        return dataProcess;
    }  
}
public class JavaSimpleJob implements SimpleJob{
    private static final String ZOOKEEPER_CONNECTION_STRING = "";
    private static final String JOB_NAMESPACE = "";

    @Override
    public void execute(ShardingContext shardingContext) {
         System.out.println(String.format("------Thread ID: %s, 任务总片数: %s, 当前分片项: %s",  
                    Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), 
                    shardingContext.getShardingItem())); 
        //业务
         System.out.println(new Date()+" job名称 = "+shardingContext.getJobName()
         +"分片数量"+shardingContext.getShardingTotalCount()
         +"当前分区"+shardingContext.getShardingItem()
         +"当前分区名称"+shardingContext.getShardingParameter()
         +"当前自定义参数"+shardingContext.getJobParameter()+"============start=================");
    }
}
public class User {
    private int id;  
    private Status status;  
    public User(final int id,final Status status) {  
        this.id = id;  
        this.status = status;  
    }  
    public int getId() {  
        return id;  
    }  
    public void setId(int id) {  
        this.id = id;  
    }  
    public Status getStatus() {  
        return status;  
    }  
    public void setStatus(Status status) {  
        this.status = status;  
    }  
    public enum Status{  
        TODO,  
        DONE  
    }

    @Override
    public String toString() {
        return "User [id=" + id + ", status=" + status + "]";
    }   

}

三、Spring

public class SpringDataflowJob implements DataflowJob<Foo> {

    @Resource
    private FooRepository fooRepository;

    @Override
    public List<Foo> fetchData(final ShardingContext shardingContext) {

        List<Foo> result = new ArrayList<Foo>();  
        result = fooRepository.findTodoData(shardingContext.getShardingParameter(), shardingContext.getShardingTotalCount());  
        System.out.println(String.format("------Thread ID: %s, Date: %s, Sharding Context: %s, Action: %s, Data: %s", Thread.currentThread().getId(), new Date(), shardingContext, "fetch data",result));  
        return result;  
    }

    @Override
    public void processData(final ShardingContext shardingContext, final List<Foo> data) {

         System.out.println(String.format("------Thread ID: %s, Date: %s, Sharding Context: %s, Action: %s, Data: %s", Thread.currentThread().getId(), new Date(), shardingContext, "finish data",data));  
         for(Foo foo:data){  
             fooRepository.setCompleted(foo.getId());
         }  
         System.out.println(String.format("------Thread ID: %s, Date: %s, Sharding Context: %s, Action: %s, Data: %s", Thread.currentThread().getId(), new Date(), shardingContext, "after data",data));  
    }
}
public class SpringSimpleJob implements SimpleJob {

    @Resource
    private FooRepository fooRepository;

    @Override
    public void execute(final ShardingContext shardingContext) {
        System.out.println(String.format("Item: %s | Time: %s | Thread: %s | %s",
                shardingContext.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getId(), "SIMPLE"));
        List<Foo> data = fooRepository.findTodoData(shardingContext.getShardingParameter(), 10);
        for (Foo each : data) {
            fooRepository.setCompleted(each.getId());
        }
    }
}
@Repository
public class FooRepository {

    private Map<Long, Foo> data = new ConcurrentHashMap<>(300, 1);

    public FooRepository() {
        init();
    }

    private void init() {
        addData(0L, 100L, "Beijing");
        addData(100L, 200L, "Shanghai");
        addData(200L, 300L, "Guangzhou");
    }

    private void addData(final long idFrom, final long idTo, final String location) {
        for (long i = idFrom; i < idTo; i++) {
            data.put(i, new Foo(i, location, Foo.Status.TODO));
        }
    }

    public List<Foo> findTodoData(final String location, final int limit) {
        List<Foo> result = new ArrayList<>(limit);
        int count = 0;
        for (Map.Entry<Long, Foo> each : data.entrySet()) {
            Foo foo = each.getValue();
            if (foo.getLocation().equals(location) && foo.getStatus() == Foo.Status.TODO) {
                result.add(foo);
                count++;
                if (count == limit) {
                    break;
                }
            }
        }
        return result;
    }

    public void setCompleted(final long id) {
        data.get(id).setStatus(Foo.Status.COMPLETED);
    }
}
public final class Foo implements Serializable {

    private static final long serialVersionUID = 1L;

    private final long id;

    private final String location;

    private Status status;

    public Foo(final long id, final String location, final Status status) {
        this.id = id;
        this.location = location;
        this.status = status;
    }

    public long getId() {
        return id;
    }

    public String getLocation() {
        return location;
    }

    public Status getStatus() {
        return status;
    }

    public void setStatus(final Status status) {
        this.status = status;
    }

    public String toString() {
        return String.format("id: %s, location: %s, status: %s", id, location, status);
    }

    public enum Status {
        TODO,
        COMPLETED
    }
}

配置文件

applicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context" 
    xmlns:reg="http://www.dangdang.com/schema/ddframe/reg" 
    xmlns:job="http://www.dangdang.com/schema/ddframe/job" 
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
                        http://www.springframework.org/schema/beans/spring-beans.xsd 
                        http://www.springframework.org/schema/context 
                        http://www.springframework.org/schema/context/spring-context.xsd 
                        http://www.dangdang.com/schema/ddframe/reg 
                        http://www.dangdang.com/schema/ddframe/reg/reg.xsd 
                        http://www.dangdang.com/schema/ddframe/job 
                        http://www.dangdang.com/schema/ddframe/job/job.xsd 
                        ">
    <context:component-scan base-package="com.aric" />
    <context:property-placeholder location="classpath:conf/*.properties" />

    <bean id="elasticJobLog" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
        <property name="driverClassName" value="${event.rdb.driver}"/>
        <property name="url" value="${event.rdb.url}"/>
        <property name="username" value="${event.rdb.username}"/>
        <property name="password" value="${event.rdb.password}"/>
    </bean>

    <reg:zookeeper id="regCenter" server-lists="${serverLists}" namespace="${namespace}" base-sleep-time-milliseconds="${baseSleepTimeMilliseconds}" max-sleep-time-milliseconds="${maxSleepTimeMilliseconds}" max-retries="${maxRetries}" />

    <job:simple id="${simple.id}" class="${simple.class}" registry-center-ref="regCenter" 
        sharding-total-count="${simple.shardingTotalCount}" cron="${simple.cron}" 
        sharding-item-parameters="${simple.shardingItemParameters}" 
        monitor-execution="${simple.monitorExecution}" monitor-port="${simple.monitorPort}" 
        failover="${simple.failover}" description="${simple.description}" 
        disabled="${simple.disabled}" overwrite="${simple.overwrite}" /><!-- event-trace-rdb-data-source="elasticJobLog" -->
    <job:dataflow id="${dataflow.id}" class="${dataflow.class}" registry-center-ref="regCenter" sharding-total-count="${dataflow.shardingTotalCount}" cron="${dataflow.cron}" sharding-item-parameters="${dataflow.shardingItemParameters}" monitor-execution="${dataflow.monitorExecution}" failover="${dataflow.failover}" max-time-diff-seconds="${dataflow.maxTimeDiffSeconds}" streaming-process="${dataflow.streamingProcess}" description="${dataflow.description}" disabled="${dataflow.disabled}" overwrite="${dataflow.overwrite}" />

    <!-- use absolute path to run script job -->
    <!--
    <job:script id="${script.id}" registry-center-ref="regCenter" script-command-line="${script.scriptCommandLine}" sharding-total-count="${script.shardingTotalCount}" cron="${script.cron}" sharding-item-parameters="${script.shardingItemParameters}" description="${script.description}" overwrite="${script.overwrite}" />
    -->
</beans>

job.properties

event.rdb.driver=org.h2.Driver
event.rdb.url=jdbc:h2:mem:job_event_storage
event.rdb.username=sa
event.rdb.password=

listener.simple=com.dangdang.ddframe.job.example.listener.SpringSimpleListener
listener.distributed=com.dangdang.ddframe.job.example.listener.SpringSimpleDistributeListener
listener.distributed.startedTimeoutMilliseconds=1000
listener.distributed.completedTimeoutMilliseconds=3000

simple.id=springSimpleJob
simple.class=com.aric.elastic.job.simple.SpringSimpleJob
simple.cron=0/15 * * * * ?
simple.shardingTotalCount=3
simple.shardingItemParameters=0=Beijing,1=Shanghai,2=Guangzhou
simple.monitorExecution=false
simple.failover=true
simple.description=只运行一次的作业示例
simple.disabled=false
simple.overwrite=true
simple.monitorPort=9888

dataflow.id=springDataflowJob
dataflow.class=com.aric.elastic.job.dataflow.SpringDataflowJob
dataflow.cron=0/5 * * * * ?
dataflow.shardingTotalCount=3
dataflow.shardingItemParameters=0=Beijing,1=Shanghai,2=Guangzhou
dataflow.maxTimeDiffSeconds=-1
dataflow.monitorExecution=true
dataflow.failover=true
dataflow.streamingProcess=true
dataflow.description=按顺序不停止运行的作业示例
dataflow.disabled=false
dataflow.overwrite=true

#script.id=springScriptJob

# need absolute path
#script.scriptCommandLine=your_path/elastic-job/elastic-job-example/elastic-job-example-lite-spring/src/main/resources/script/demo.sh

#script.cron=0/5 * * * * ?
#script.shardingTotalCount=3
#script.shardingItemParameters=0=Beijing,1=Shanghai,2=Guangzhou
#script.description=Script Job
#script.overwrite=true

reg.properties

serverLists=192.168.134.10:2182,192.168.134.10:2183,192.168.134.10:2184
namespace=spring-elastic-job
baseSleepTimeMilliseconds=1000
maxSleepTimeMilliseconds=3000
maxRetries=3

四、SpringBoot

@SpringBootApplication
public class SpringBootMain {

    public static void main(final String[] args) {
        SpringApplication.run(SpringBootMain.class, args);
    }
}
@Configuration
public class JobEventConfig {

    @Resource
    private DataSource dataSource;

    @Bean
    public JobEventConfiguration jobEventConfiguration() {
        return new JobEventRdbConfiguration(dataSource);
    }
}
@Configuration
@ConditionalOnExpression("'${regCenter.serverList}'.length() > 0")
public class RegistryCenterConfig {

    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList, @Value("${regCenter.namespace}") final String namespace) {
        return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
    }
}
@Configuration
public class DataflowJobConfig {

    @Resource
    private ZookeeperRegistryCenter regCenter;

    @Resource
    private JobEventConfiguration jobEventConfiguration;

    @Bean
    public DataflowJob dataflowJob() {
        return new SpringDataflowJob(); 
    }

    @Bean(initMethod = "init")
    public JobScheduler dataflowJobScheduler(final DataflowJob dataflowJob, @Value("${dataflowJob.cron}") final String cron, @Value("${dataflowJob.shardingTotalCount}") final int shardingTotalCount,
                                           @Value("${dataflowJob.shardingItemParameters}") final String shardingItemParameters) {
        return new SpringJobScheduler(dataflowJob, regCenter, getLiteJobConfiguration(dataflowJob.getClass(), cron, shardingTotalCount, shardingItemParameters), jobEventConfiguration);
    }

    private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends DataflowJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {
        return LiteJobConfiguration.newBuilder(new DataflowJobConfiguration(JobCoreConfiguration.newBuilder(
                jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName(), true)).overwrite(true).build();
    }
}
@Configuration
public class SimpleJobConfig {

    @Resource
    private ZookeeperRegistryCenter regCenter;

    @Resource
    private JobEventConfiguration jobEventConfiguration;

    @Bean
    public SimpleJob simpleJob() {
        return new SpringSimpleJob(); 
    }

    @Bean(initMethod = "init")
    public JobScheduler simpleJobScheduler(final SimpleJob simpleJob, @Value("${simpleJob.cron}") final String cron, @Value("${simpleJob.shardingTotalCount}") final int shardingTotalCount,
                                           @Value("${simpleJob.shardingItemParameters}") final String shardingItemParameters) {
        return new SpringJobScheduler(simpleJob, regCenter, getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters), jobEventConfiguration);
    }

    private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {
        return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(
                jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName())).overwrite(true).build();
    }
}

配置文件
database.yml

spring:
  datasource:
    url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&verifyServerCertificate=false&useSSL=false&requireSSL=false
    driver-class-name: com.mysql.jdbc.Driver
    username: test
    password: test
    tomcat:
      max-wait: 10000
      min-idle: 0
      initial-size: 25
      validation-query: SELECT 1
      test-on-borrow: false
      test-while-idle: true
      time-between-eviction-runs-millis: 18800
      remove-abandoned: true
      remove-abandoned-timeout: 180

job.yml

regCenter:
  serverList: 192.168.40.10:2181,192.168.40.11:2181,192.168.40.12:2181
  namespace: elastic-job-lite-springboot

simpleJob:
  cron: 0/5 * * * * ?
  shardingTotalCount: 3
  shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou

dataflowJob:
  cron: 0/5 * * * * ?
  shardingTotalCount: 3
  shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou

spring:
  profiles:
    active: dev
 类似资料: