Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供最轻量级的分布式任务的协调服务,外部依赖仅Zookeeper。
引入maven坐标
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
</dependency>
以下入门代码
public class App {
public static void main(String[] args) {
new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();
}
private static CoordinatorRegistryCenter createRegistryCenter() {
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("192.168.134.10:2182,192.168.134.10:2183,192.168.134.10:2184", "elastic-job-demo"));
regCenter.init();
return regCenter;
}
private static LiteJobConfiguration createJobConfiguration() {
// 创建作业配置
JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("myDataFlowTest", "0/10 * * * * ?", 3).shardingItemParameters("0=0,1=1,2=2").build();
DataflowJobConfiguration dataflowJobConfig = new DataflowJobConfiguration(coreConfig, MyDataFlowJob.class.getCanonicalName(), true);
LiteJobConfiguration result = LiteJobConfiguration.newBuilder(dataflowJobConfig).build();
return result;
}
}
public class MyDataFlowJob implements DataflowJob<User>{
private DataProcess dataProcess = DataProcessFactory.getDataProcess();
@Override
public List<User> fetchData(ShardingContext context) {
List<User> result = new ArrayList<User>();
result = dataProcess.getData(context.getShardingParameter(), context.getShardingTotalCount());
System.out.println(String.format("------Thread ID: %s, Date: %s, Sharding Context: %s, Action: %s, Data: %s", Thread.currentThread().getId(), new Date(), context, "fetch data",result));
return result;
}
@Override
public void processData(ShardingContext shardingContext, List<User> data) {
System.out.println(String.format("------Thread ID: %s, Date: %s, Sharding Context: %s, Action: %s, Data: %s", Thread.currentThread().getId(), new Date(), shardingContext, "finish data",data));
for(User User:data){
dataProcess.setData(User.getId());
}
System.out.println(String.format("------Thread ID: %s, Date: %s, Sharding Context: %s, Action: %s, Data: %s", Thread.currentThread().getId(), new Date(), shardingContext, "after data",data));
}
}
public class DataProcess {
private Map<Integer, User> data = new ConcurrentHashMap<>(30, 1);
public DataProcess()
{
for(int i=0;i<30;i++){
data.put(i, new User(i,User.Status.TODO));
}
}
public List<User> getData(String tailId,int shardNum)
{
int intId = Integer.parseInt(tailId);
List<User> result = new ArrayList<User>();
for (Map.Entry<Integer, User> each : data.entrySet()) {
User User = each.getValue();
int key = each.getKey();
if (key % shardNum == intId && User.getStatus() == Status.TODO) {
result.add(User);
}
}
return result;
}
public void setData(int i){
data.get(i).setStatus(User.Status.DONE);
}
}
public class DataProcessFactory {
private DataProcessFactory() {}
private static DataProcess dataProcess = new DataProcess();
public static DataProcess getDataProcess() {
return dataProcess;
}
}
public class JavaSimpleJob implements SimpleJob{
private static final String ZOOKEEPER_CONNECTION_STRING = "";
private static final String JOB_NAMESPACE = "";
@Override
public void execute(ShardingContext shardingContext) {
System.out.println(String.format("------Thread ID: %s, 任务总片数: %s, 当前分片项: %s",
Thread.currentThread().getId(), shardingContext.getShardingTotalCount(),
shardingContext.getShardingItem()));
//业务
System.out.println(new Date()+" job名称 = "+shardingContext.getJobName()
+"分片数量"+shardingContext.getShardingTotalCount()
+"当前分区"+shardingContext.getShardingItem()
+"当前分区名称"+shardingContext.getShardingParameter()
+"当前自定义参数"+shardingContext.getJobParameter()+"============start=================");
}
}
public class User {
private int id;
private Status status;
public User(final int id,final Status status) {
this.id = id;
this.status = status;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public Status getStatus() {
return status;
}
public void setStatus(Status status) {
this.status = status;
}
public enum Status{
TODO,
DONE
}
@Override
public String toString() {
return "User [id=" + id + ", status=" + status + "]";
}
}
public class SpringDataflowJob implements DataflowJob<Foo> {
@Resource
private FooRepository fooRepository;
@Override
public List<Foo> fetchData(final ShardingContext shardingContext) {
List<Foo> result = new ArrayList<Foo>();
result = fooRepository.findTodoData(shardingContext.getShardingParameter(), shardingContext.getShardingTotalCount());
System.out.println(String.format("------Thread ID: %s, Date: %s, Sharding Context: %s, Action: %s, Data: %s", Thread.currentThread().getId(), new Date(), shardingContext, "fetch data",result));
return result;
}
@Override
public void processData(final ShardingContext shardingContext, final List<Foo> data) {
System.out.println(String.format("------Thread ID: %s, Date: %s, Sharding Context: %s, Action: %s, Data: %s", Thread.currentThread().getId(), new Date(), shardingContext, "finish data",data));
for(Foo foo:data){
fooRepository.setCompleted(foo.getId());
}
System.out.println(String.format("------Thread ID: %s, Date: %s, Sharding Context: %s, Action: %s, Data: %s", Thread.currentThread().getId(), new Date(), shardingContext, "after data",data));
}
}
public class SpringSimpleJob implements SimpleJob {
@Resource
private FooRepository fooRepository;
@Override
public void execute(final ShardingContext shardingContext) {
System.out.println(String.format("Item: %s | Time: %s | Thread: %s | %s",
shardingContext.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getId(), "SIMPLE"));
List<Foo> data = fooRepository.findTodoData(shardingContext.getShardingParameter(), 10);
for (Foo each : data) {
fooRepository.setCompleted(each.getId());
}
}
}
@Repository
public class FooRepository {
private Map<Long, Foo> data = new ConcurrentHashMap<>(300, 1);
public FooRepository() {
init();
}
private void init() {
addData(0L, 100L, "Beijing");
addData(100L, 200L, "Shanghai");
addData(200L, 300L, "Guangzhou");
}
private void addData(final long idFrom, final long idTo, final String location) {
for (long i = idFrom; i < idTo; i++) {
data.put(i, new Foo(i, location, Foo.Status.TODO));
}
}
public List<Foo> findTodoData(final String location, final int limit) {
List<Foo> result = new ArrayList<>(limit);
int count = 0;
for (Map.Entry<Long, Foo> each : data.entrySet()) {
Foo foo = each.getValue();
if (foo.getLocation().equals(location) && foo.getStatus() == Foo.Status.TODO) {
result.add(foo);
count++;
if (count == limit) {
break;
}
}
}
return result;
}
public void setCompleted(final long id) {
data.get(id).setStatus(Foo.Status.COMPLETED);
}
}
public final class Foo implements Serializable {
private static final long serialVersionUID = 1L;
private final long id;
private final String location;
private Status status;
public Foo(final long id, final String location, final Status status) {
this.id = id;
this.location = location;
this.status = status;
}
public long getId() {
return id;
}
public String getLocation() {
return location;
}
public Status getStatus() {
return status;
}
public void setStatus(final Status status) {
this.status = status;
}
public String toString() {
return String.format("id: %s, location: %s, status: %s", id, location, status);
}
public enum Status {
TODO,
COMPLETED
}
}
配置文件
applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
xmlns:job="http://www.dangdang.com/schema/ddframe/job"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.dangdang.com/schema/ddframe/reg
http://www.dangdang.com/schema/ddframe/reg/reg.xsd
http://www.dangdang.com/schema/ddframe/job
http://www.dangdang.com/schema/ddframe/job/job.xsd
">
<context:component-scan base-package="com.aric" />
<context:property-placeholder location="classpath:conf/*.properties" />
<bean id="elasticJobLog" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="${event.rdb.driver}"/>
<property name="url" value="${event.rdb.url}"/>
<property name="username" value="${event.rdb.username}"/>
<property name="password" value="${event.rdb.password}"/>
</bean>
<reg:zookeeper id="regCenter" server-lists="${serverLists}" namespace="${namespace}" base-sleep-time-milliseconds="${baseSleepTimeMilliseconds}" max-sleep-time-milliseconds="${maxSleepTimeMilliseconds}" max-retries="${maxRetries}" />
<job:simple id="${simple.id}" class="${simple.class}" registry-center-ref="regCenter"
sharding-total-count="${simple.shardingTotalCount}" cron="${simple.cron}"
sharding-item-parameters="${simple.shardingItemParameters}"
monitor-execution="${simple.monitorExecution}" monitor-port="${simple.monitorPort}"
failover="${simple.failover}" description="${simple.description}"
disabled="${simple.disabled}" overwrite="${simple.overwrite}" /><!-- event-trace-rdb-data-source="elasticJobLog" -->
<job:dataflow id="${dataflow.id}" class="${dataflow.class}" registry-center-ref="regCenter" sharding-total-count="${dataflow.shardingTotalCount}" cron="${dataflow.cron}" sharding-item-parameters="${dataflow.shardingItemParameters}" monitor-execution="${dataflow.monitorExecution}" failover="${dataflow.failover}" max-time-diff-seconds="${dataflow.maxTimeDiffSeconds}" streaming-process="${dataflow.streamingProcess}" description="${dataflow.description}" disabled="${dataflow.disabled}" overwrite="${dataflow.overwrite}" />
<!-- use absolute path to run script job -->
<!--
<job:script id="${script.id}" registry-center-ref="regCenter" script-command-line="${script.scriptCommandLine}" sharding-total-count="${script.shardingTotalCount}" cron="${script.cron}" sharding-item-parameters="${script.shardingItemParameters}" description="${script.description}" overwrite="${script.overwrite}" />
-->
</beans>
job.properties
event.rdb.driver=org.h2.Driver
event.rdb.url=jdbc:h2:mem:job_event_storage
event.rdb.username=sa
event.rdb.password=
listener.simple=com.dangdang.ddframe.job.example.listener.SpringSimpleListener
listener.distributed=com.dangdang.ddframe.job.example.listener.SpringSimpleDistributeListener
listener.distributed.startedTimeoutMilliseconds=1000
listener.distributed.completedTimeoutMilliseconds=3000
simple.id=springSimpleJob
simple.class=com.aric.elastic.job.simple.SpringSimpleJob
simple.cron=0/15 * * * * ?
simple.shardingTotalCount=3
simple.shardingItemParameters=0=Beijing,1=Shanghai,2=Guangzhou
simple.monitorExecution=false
simple.failover=true
simple.description=只运行一次的作业示例
simple.disabled=false
simple.overwrite=true
simple.monitorPort=9888
dataflow.id=springDataflowJob
dataflow.class=com.aric.elastic.job.dataflow.SpringDataflowJob
dataflow.cron=0/5 * * * * ?
dataflow.shardingTotalCount=3
dataflow.shardingItemParameters=0=Beijing,1=Shanghai,2=Guangzhou
dataflow.maxTimeDiffSeconds=-1
dataflow.monitorExecution=true
dataflow.failover=true
dataflow.streamingProcess=true
dataflow.description=按顺序不停止运行的作业示例
dataflow.disabled=false
dataflow.overwrite=true
#script.id=springScriptJob
# need absolute path
#script.scriptCommandLine=your_path/elastic-job/elastic-job-example/elastic-job-example-lite-spring/src/main/resources/script/demo.sh
#script.cron=0/5 * * * * ?
#script.shardingTotalCount=3
#script.shardingItemParameters=0=Beijing,1=Shanghai,2=Guangzhou
#script.description=Script Job
#script.overwrite=true
reg.properties
serverLists=192.168.134.10:2182,192.168.134.10:2183,192.168.134.10:2184
namespace=spring-elastic-job
baseSleepTimeMilliseconds=1000
maxSleepTimeMilliseconds=3000
maxRetries=3
@SpringBootApplication
public class SpringBootMain {
public static void main(final String[] args) {
SpringApplication.run(SpringBootMain.class, args);
}
}
@Configuration
public class JobEventConfig {
@Resource
private DataSource dataSource;
@Bean
public JobEventConfiguration jobEventConfiguration() {
return new JobEventRdbConfiguration(dataSource);
}
}
@Configuration
@ConditionalOnExpression("'${regCenter.serverList}'.length() > 0")
public class RegistryCenterConfig {
@Bean(initMethod = "init")
public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList, @Value("${regCenter.namespace}") final String namespace) {
return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
}
}
@Configuration
public class DataflowJobConfig {
@Resource
private ZookeeperRegistryCenter regCenter;
@Resource
private JobEventConfiguration jobEventConfiguration;
@Bean
public DataflowJob dataflowJob() {
return new SpringDataflowJob();
}
@Bean(initMethod = "init")
public JobScheduler dataflowJobScheduler(final DataflowJob dataflowJob, @Value("${dataflowJob.cron}") final String cron, @Value("${dataflowJob.shardingTotalCount}") final int shardingTotalCount,
@Value("${dataflowJob.shardingItemParameters}") final String shardingItemParameters) {
return new SpringJobScheduler(dataflowJob, regCenter, getLiteJobConfiguration(dataflowJob.getClass(), cron, shardingTotalCount, shardingItemParameters), jobEventConfiguration);
}
private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends DataflowJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {
return LiteJobConfiguration.newBuilder(new DataflowJobConfiguration(JobCoreConfiguration.newBuilder(
jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName(), true)).overwrite(true).build();
}
}
@Configuration
public class SimpleJobConfig {
@Resource
private ZookeeperRegistryCenter regCenter;
@Resource
private JobEventConfiguration jobEventConfiguration;
@Bean
public SimpleJob simpleJob() {
return new SpringSimpleJob();
}
@Bean(initMethod = "init")
public JobScheduler simpleJobScheduler(final SimpleJob simpleJob, @Value("${simpleJob.cron}") final String cron, @Value("${simpleJob.shardingTotalCount}") final int shardingTotalCount,
@Value("${simpleJob.shardingItemParameters}") final String shardingItemParameters) {
return new SpringJobScheduler(simpleJob, regCenter, getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters), jobEventConfiguration);
}
private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {
return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(
jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName())).overwrite(true).build();
}
}
配置文件
database.yml
spring:
datasource:
url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&verifyServerCertificate=false&useSSL=false&requireSSL=false
driver-class-name: com.mysql.jdbc.Driver
username: test
password: test
tomcat:
max-wait: 10000
min-idle: 0
initial-size: 25
validation-query: SELECT 1
test-on-borrow: false
test-while-idle: true
time-between-eviction-runs-millis: 18800
remove-abandoned: true
remove-abandoned-timeout: 180
job.yml
regCenter:
serverList: 192.168.40.10:2181,192.168.40.11:2181,192.168.40.12:2181
namespace: elastic-job-lite-springboot
simpleJob:
cron: 0/5 * * * * ?
shardingTotalCount: 3
shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
dataflowJob:
cron: 0/5 * * * * ?
shardingTotalCount: 3
shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
spring:
profiles:
active: dev