如果你对elastic-job还不了解的话,建议你去看看我的《elastic-job分布式作业调度框架简介》,以下涉及到的参数配置请查看我的《elastic-job之Simple类型作业实现》
Dataflow类型用于处理数据流,需实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。
可通过DataflowJobConfiguration配置是否流式处理。
流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去; 非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,随即完成本次作业。
如果采用流式作业处理方式,建议processData处理数据后更新其状态,避免fetchData再次抓取到,从而使得作业永不停止。流式数据处理参照TbSchedule设计,适用于不间歇的数据处理。
4.1、要实现Dataflow当然要实现它的接口DataflowJob<T>, 我们这里定义了Foo的一个普通类
package com.lwl.boot.job.dataflow; import java.io.Serializable; public final class Foo implements Serializable { private static final long serialVersionUID = 2706842871078949451L; private final long id; private final String location; private Status status; public Foo(final long id, final String location, final Status status) { this.id = id; this.location = location; this.status = status; } public long getId() { return id; } public String getLocation() { return location; } public Status getStatus() { return status; } public void setStatus(final Status status) { this.status = status; } public String toString() { return String.format("id: %s, location: %s, status: %s", id, location, status); } public enum Status { TODO, COMPLETED } }
4.2、由于我们未采用数据库,所以我们模拟了一个数据集合类和一个工厂类
package com.lwl.boot.job.dataflow; import org.springframework.stereotype.Repository; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @Repository public class FooRepository { private Map<Long, Foo> data = new ConcurrentHashMap<>(300, 1); public FooRepository() { init(); } private void init() { addData(0L, 100L, "Beijing"); addData(100L, 200L, "Shanghai"); addData(200L, 300L, "Guangzhou"); } private void addData(final long idFrom, final long idTo, final String location) { for (long i = idFrom; i < idTo; i++) { data.put(i, new Foo(i, location, Foo.Status.TODO)); } } public List<Foo> findTodoData(final String location, final int limit) { List<Foo> result = new ArrayList<>(limit); int count = 0; for (Map.Entry<Long, Foo> each : data.entrySet()) { Foo foo = each.getValue(); if (foo.getLocation().equals(location) && foo.getStatus() == Foo.Status.TODO) { result.add(foo); count++; if (count == limit) { break; } } } return result; } public void setCompleted(final long id) { //设置状态,不让他一直抓取 data.get(id).setStatus(Foo.Status.COMPLETED); } }
package com.lwl.boot.job.dataflow; public class FooRepositoryFactory { private static FooRepository fooRepository = new FooRepository(); public static FooRepository repository() { return fooRepository; } }
DataflowJob的实现类
package com.lwl.boot.job.dataflow; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; import org.springframework.util.CollectionUtils; import com.dangdang.ddframe.job.api.ShardingContext; import com.dangdang.ddframe.job.api.dataflow.DataflowJob; /** * Dataflow类型用于处理数据流, * 需实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。 * */ public class ApiMyElasticJobDataflow implements DataflowJob<Foo>{ private FooRepository fooRepository = FooRepositoryFactory.repository(); @Override public List<Foo> fetchData(ShardingContext context) { System.out.println("-------------------------------------fetchData: "+context.getShardingParameter()+"---------------------------------------------"); List<Foo> result = fooRepository.findTodoData(context.getShardingParameter(), 10); System.out.println(String.format("Item: %s | Time: %s | Thread: %s | %s | count: %d", context.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getId(), "DATAFLOW FETCH", CollectionUtils.isEmpty(result)?0:result.size())); return result; } @Override public void processData(ShardingContext shardingContext, List<Foo> data) { System.out.println("-------------------------------------processData: "+shardingContext.getShardingParameter()+"---------------------------------------------"); System.out.println(String.format("Item: %s | Time: %s | Thread: %s | %s | count: %d", shardingContext.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getId(), "DATAFLOW PROCESS", CollectionUtils.isEmpty(data)?0:data.size())); for (Foo each : data) { fooRepository.setCompleted(each.getId()); } } }
最后就是我们的测试类:
这个类中和simple类型最重要的区别就是在于作业类型DataflowJobConfiguration,它的主要参数有package com.lwl.boot.job.dataflow; import com.dangdang.ddframe.job.config.JobCoreConfiguration; import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration; import com.dangdang.ddframe.job.lite.api.JobScheduler; import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration; import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter; /** * 可通过DataflowJobConfiguration配置是否流式处理。 流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去; 非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,随即完成本次作业。 如果采用流式作业处理方式,建议processData处理数据后更新其状态,避免fetchData再次抓取到,从而使得作业永不停止。 流式数据处理参照TbSchedule设计,适用于不间歇的数据处理。 * */ public class ApiJobDataflow { public static void main(String[] args) { new JobScheduler(registryCenter(),configuration()).init(); } private static CoordinatorRegistryCenter registryCenter() { //配置zookeeper CoordinatorRegistryCenter registryCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("localhost:2181", "elastic-job-demo")); registryCenter.init(); return registryCenter; } private static LiteJobConfiguration configuration() { // 定义作业核心配置 String shardingItemParameters = "0=Beijing,1=Shanghai,2=Guangzhou"; JobCoreConfiguration dataflowCoreConfig = JobCoreConfiguration.newBuilder("dataflowJob", "0/20 * * * * ?", 3).shardingItemParameters(shardingItemParameters).build(); // 定义DATAFLOW类型配置 DataflowJobConfiguration dataflowJobConfig = new DataflowJobConfiguration(dataflowCoreConfig, ApiMyElasticJobDataflow.class.getCanonicalName(), true); // 定义Lite作业根配置 String jobShardingStrategyClass = null; LiteJobConfiguration dataflowJobRootConfig = LiteJobConfiguration.newBuilder(dataflowJobConfig).jobShardingStrategyClass(jobShardingStrategyClass).build(); return dataflowJobRootConfig; } }
coreConfig:作业核心配置
jobClass: 作业实现类streamingProcess: 是否流式处理数据,如果流式处理数据, 则fetchData不返回空结果将持续执行作业,如果非流式处理数据, 则处理数据完成后作业结束
其他的参数配置请参考simple类型
最后的启动方式也参考Simple类型,通过启动几次main方法,可以看到各个控制台输出的日志信息