对于Dataflow类型作业,官方文档给的解释是:
elastic-job配置:
package com.lucifer.config;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.JobTypeConfiguration;
import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import com.lucifer.job.MyDataFlowJob;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author lucifer
* @date 2020/5/4 19:50
* @description elastic-job配置
*/
@Configuration
public class LiteJobConfig {
private static final String SERVER_LISTS = "192.168.24.128:2181";
private static final String NAMES_SPACE = "myDataFlowJob";
@Bean
public static void JobScheduler() {
new JobScheduler(zkCenter(), dataFlowJobConfiguration()).init();
}
public static CoordinatorRegistryCenter zkCenter() {
ZookeeperConfiguration zookeeperConfiguration =
new ZookeeperConfiguration(SERVER_LISTS, NAMES_SPACE);
CoordinatorRegistryCenter coordinatorRegistryCenter =
new ZookeeperRegistryCenter(zookeeperConfiguration);
//注册中心初始化
coordinatorRegistryCenter.init();
return coordinatorRegistryCenter;
}
/**
* job配置
*
* @return
*/
public static LiteJobConfiguration dataFlowJobConfiguration() {
//job核心配置
JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration
.newBuilder("myDataFlowJob", "0/10 * * * * ? ", 2)
.build();
//job类型配置
JobTypeConfiguration jobTypeConfiguration =
new DataflowJobConfiguration(jobCoreConfiguration, MyDataFlowJob.class.getCanonicalName(),true);
//job根的配置
LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration
.newBuilder(jobTypeConfiguration)
.build();
return liteJobConfiguration;
}
}
MyDataFlowJob:
package com.lucifer.job;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import com.lucifer.pojo.Order;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* @author lucifer
* @date 2020/5/4 21:10
* @description Data-Flow
*/
@Slf4j
public class MyDataFlowJob implements DataflowJob<Order> {
//模拟100个未处理订单
private static List<Order> orders=new ArrayList<>();
{
for (int i = 0; i < 100; i++) {
Order order = new Order();
order.setOrderId(i);
order.setStatus(0);
orders.add(order);
}
}
@Override
public List<Order> fetchData(ShardingContext shardingContext) {
//订单号%分片总数==当前分片项
List<Order> orderList = orders.stream().filter(o -> o.getStatus() == 0)
.filter(o -> o.getOrderId() % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem())
.collect(Collectors.toList());
List<Order> subList = null;
if (orderList != null && orderList.size() > 0) {
subList = orderList.subList(0, 10);
}
//由于抓取数据过快,为更好看出效果,此处休眠一会儿
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("分片项:{},我抓取的数据:{}", shardingContext.getShardingItem(), subList);
return subList;
}
@Override
public void processData(ShardingContext shardingContext, List<Order> list) {
list.forEach(o -> o.setStatus(1));
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("分片项:{},处理中.....", shardingContext.getShardingItem());
}
}
实体类:
package com.lucifer.pojo;
import lombok.Data;
/**
* @author lucifer
* @date 2020/5/4 21:10
* @description 订单实体类
*/
@Data
public class Order {
//订单id
private Integer orderId;
//订单状态,0:未处理,1:已处理
private Integer status;
}
控制台打印:
2020-05-04 23:20:36.958 INFO 19320 --- [ main] com.lucifer.ElasticJobApplication : Started ElasticJobApplication in 2.223 seconds (JVM running for 3.034)
2020-05-04 23:20:43.075 INFO 19320 --- [myDataFlowJob-1] com.lucifer.job.MyDataFlowJob : 分片项:0,我抓取的数据:[Order(orderId=0, status=0), Order(orderId=2, status=0), Order(orderId=4, status=0), Order(orderId=6, status=0), Order(orderId=8, status=0), Order(orderId=10, status=0), Order(orderId=12, status=0), Order(orderId=14, status=0), Order(orderId=16, status=0), Order(orderId=18, status=0)]
2020-05-04 23:20:43.075 INFO 19320 --- [myDataFlowJob-2] com.lucifer.job.MyDataFlowJob : 分片项:1,我抓取的数据:[Order(orderId=1, status=0), Order(orderId=3, status=0), Order(orderId=5, status=0), Order(orderId=7, status=0), Order(orderId=9, status=0), Order(orderId=11, status=0), Order(orderId=13, status=0), Order(orderId=15, status=0), Order(orderId=17, status=0), Order(orderId=19, status=0)]
2020-05-04 23:20:48.082 INFO 19320 --- [myDataFlowJob-2] com.lucifer.job.MyDataFlowJob : 分片项:1,处理中.....
2020-05-04 23:20:48.083 INFO 19320 --- [myDataFlowJob-1] com.lucifer.job.MyDataFlowJob : 分片项:0,处理中.....
2020-05-04 23:20:51.084 INFO 19320 --- [myDataFlowJob-1] com.lucifer.job.MyDataFlowJob : 分片项:0,我抓取的数据:[Order(orderId=20, status=0), Order(orderId=22, status=0), Order(orderId=24, status=0), Order(orderId=26, status=0), Order(orderId=28, status=0), Order(orderId=30, status=0), Order(orderId=32, status=0), Order(orderId=34, status=0), Order(orderId=36, status=0), Order(orderId=38, status=0)]
2020-05-04 23:20:51.084 INFO 19320 --- [myDataFlowJob-2] com.lucifer.job.MyDataFlowJob : 分片项:1,我抓取的数据:[Order(orderId=21, status=0), Order(orderId=23, status=0), Order(orderId=25, status=0), Order(orderId=27, status=0), Order(orderId=29, status=0), Order(orderId=31, status=0), Order(orderId=33, status=0), Order(orderId=35, status=0), Order(orderId=37, status=0), Order(orderId=39, status=0)]
2020-05-04 23:20:56.085 INFO 19320 --- [myDataFlowJob-2] com.lucifer.job.MyDataFlowJob : 分片项:1,处理中.....
2020-05-04 23:20:56.085 INFO 19320 --- [myDataFlowJob-1] com.lucifer.job.MyDataFlowJob : 分片项:0,处理中.....
2020-05-04 23:20:59.086 INFO 19320 --- [myDataFlowJob-2] com.lucifer.job.MyDataFlowJob : 分片项:1,我抓取的数据:[Order(orderId=41, status=0), Order(orderId=43, status=0), Order(orderId=45, status=0), Order(orderId=47, status=0), Order(orderId=49, status=0), Order(orderId=51, status=0), Order(orderId=53, status=0), Order(orderId=55, status=0), Order(orderId=57, status=0), Order(orderId=59, status=0)]
2020-05-04 23:20:59.087 INFO 19320 --- [myDataFlowJob-1] com.lucifer.job.MyDataFlowJob : 分片项:0,我抓取的数据:[Order(orderId=40, status=0), Order(orderId=42, status=0), Order(orderId=44, status=0), Order(orderId=46, status=0), Order(orderId=48, status=0), Order(orderId=50, status=0), Order(orderId=52, status=0), Order(orderId=54, status=0), Order(orderId=56, status=0), Order(orderId=58, status=0)]
2020-05-04 23:21:04.086 INFO 19320 --- [myDataFlowJob-2] com.lucifer.job.MyDataFlowJob : 分片项:1,处理中.....
2020-05-04 23:21:04.088 INFO 19320 --- [myDataFlowJob-1] com.lucifer.job.MyDataFlowJob : 分片项:0,处理中.....
2020-05-04 23:21:07.088 INFO 19320 --- [myDataFlowJob-2] com.lucifer.job.MyDataFlowJob : 分片项:1,我抓取的数据:[Order(orderId=61, status=0), Order(orderId=63, status=0), Order(orderId=65, status=0), Order(orderId=67, status=0), Order(orderId=69, status=0), Order(orderId=71, status=0), Order(orderId=73, status=0), Order(orderId=75, status=0), Order(orderId=77, status=0), Order(orderId=79, status=0)]
2020-05-04 23:21:07.090 INFO 19320 --- [myDataFlowJob-1] com.lucifer.job.MyDataFlowJob : 分片项:0,我抓取的数据:[Order(orderId=60, status=0), Order(orderId=62, status=0), Order(orderId=64, status=0), Order(orderId=66, status=0), Order(orderId=68, status=0), Order(orderId=70, status=0), Order(orderId=72, status=0), Order(orderId=74, status=0), Order(orderId=76, status=0), Order(orderId=78, status=0)]
2020-05-04 23:21:12.089 INFO 19320 --- [myDataFlowJob-2] com.lucifer.job.MyDataFlowJob : 分片项:1,处理中.....
2020-05-04 23:21:12.091 INFO 19320 --- [myDataFlowJob-1] com.lucifer.job.MyDataFlowJob : 分片项:0,处理中.....
2020-05-04 23:21:15.092 INFO 19320 --- [myDataFlowJob-2] com.lucifer.job.MyDataFlowJob : 分片项:1,我抓取的数据:[Order(orderId=81, status=0), Order(orderId=83, status=0), Order(orderId=85, status=0), Order(orderId=87, status=0), Order(orderId=89, status=0), Order(orderId=91, status=0), Order(orderId=93, status=0), Order(orderId=95, status=0), Order(orderId=97, status=0), Order(orderId=99, status=0)]
2020-05-04 23:21:15.093 INFO 19320 --- [myDataFlowJob-1] com.lucifer.job.MyDataFlowJob : 分片项:0,我抓取的数据:[Order(orderId=80, status=0), Order(orderId=82, status=0), Order(orderId=84, status=0), Order(orderId=86, status=0), Order(orderId=88, status=0), Order(orderId=90, status=0), Order(orderId=92, status=0), Order(orderId=94, status=0), Order(orderId=96, status=0), Order(orderId=98, status=0)]
2020-05-04 23:21:20.092 INFO 19320 --- [myDataFlowJob-2] com.lucifer.job.MyDataFlowJob : 分片项:1,处理中.....
2020-05-04 23:21:20.093 INFO 19320 --- [myDataFlowJob-1] com.lucifer.job.MyDataFlowJob : 分片项:0,处理中.....
2020-05-04 23:21:23.095 INFO 19320 --- [myDataFlowJob-2] com.lucifer.job.MyDataFlowJob : 分片项:1,我抓取的数据:null
2020-05-04 23:21:23.096 INFO 19320 --- [myDataFlowJob-1] com.lucifer.job.MyDataFlowJob : 分片项:0,我抓取的数据:null