当前位置: 首页 > 工具软件 > FlowJob > 使用案例 >

Elastic-Job (二)实现Dataflow作业

吴均
2023-12-01

附:可以参考:Elastic-Job (一)实现Simple作业

对于Dataflow类型作业,官方文档给的解释是:

  1. 可通过DataflowJobConfiguration配置是否流式处理。
  2. 流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去; 非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,随即完成本次作业。
  3. 如果采用流式作业处理方式,建议processData处理数据后更新其状态,避免fetchData再次抓取到,从而使得作业永不停止。 流式数据处理参照TbSchedule设计,适用于不间歇的数据处理。

elastic-job配置:

package com.lucifer.config;

import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.JobTypeConfiguration;
import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import com.lucifer.job.MyDataFlowJob;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author lucifer
 * @date 2020/5/4 19:50
 * @description elastic-job配置
 */
@Configuration
public class LiteJobConfig {

    private static final String SERVER_LISTS = "192.168.24.128:2181";

    private static final String NAMES_SPACE = "myDataFlowJob";

    @Bean
    public static void JobScheduler() {
        new JobScheduler(zkCenter(), dataFlowJobConfiguration()).init();
    }

    public static CoordinatorRegistryCenter zkCenter() {
        ZookeeperConfiguration zookeeperConfiguration =
                new ZookeeperConfiguration(SERVER_LISTS, NAMES_SPACE);

        CoordinatorRegistryCenter coordinatorRegistryCenter =
                new ZookeeperRegistryCenter(zookeeperConfiguration);
        //注册中心初始化
        coordinatorRegistryCenter.init();
        return coordinatorRegistryCenter;
    }


    /**
     * job配置
     *
     * @return
     */
    public static LiteJobConfiguration dataFlowJobConfiguration() {
        //job核心配置
        JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration
                .newBuilder("myDataFlowJob", "0/10 * * * * ? ", 2)
                .build();

        //job类型配置
        JobTypeConfiguration jobTypeConfiguration =
                new DataflowJobConfiguration(jobCoreConfiguration, MyDataFlowJob.class.getCanonicalName(),true);

        //job根的配置
        LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration
                .newBuilder(jobTypeConfiguration)
                .build();
        return liteJobConfiguration;
    }
}

MyDataFlowJob:

package com.lucifer.job;

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import com.lucifer.pojo.Order;
import lombok.extern.slf4j.Slf4j;

import java.util.*;

import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 * @author lucifer
 * @date 2020/5/4 21:10
 * @description Data-Flow
 */
@Slf4j
public class MyDataFlowJob implements DataflowJob<Order> {
    //模拟100个未处理订单

    private static List<Order> orders=new ArrayList<>();

    {
        for (int i = 0; i < 100; i++) {
            Order order = new Order();
            order.setOrderId(i);
            order.setStatus(0);
            orders.add(order);
        }
    }


    @Override
    public List<Order> fetchData(ShardingContext shardingContext) {
        //订单号%分片总数==当前分片项
        List<Order> orderList = orders.stream().filter(o -> o.getStatus() == 0)
                .filter(o -> o.getOrderId() % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem())
                .collect(Collectors.toList());

        List<Order> subList = null;
        if (orderList != null && orderList.size() > 0) {
            subList = orderList.subList(0, 10);
        }
        //由于抓取数据过快,为更好看出效果,此处休眠一会儿
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("分片项:{},我抓取的数据:{}", shardingContext.getShardingItem(), subList);
        return subList;
    }

    @Override
    public void processData(ShardingContext shardingContext, List<Order> list) {
        list.forEach(o -> o.setStatus(1));
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("分片项:{},处理中.....", shardingContext.getShardingItem());
    }
}

实体类: 

package com.lucifer.pojo;

import lombok.Data;

/**
 * @author lucifer
 * @date 2020/5/4 21:10
 * @description 订单实体类
 */
@Data
public class Order {

    //订单id
    private Integer orderId;

    //订单状态,0:未处理,1:已处理
    private Integer status;

}

控制台打印:

2020-05-04 23:20:36.958  INFO 19320 --- [           main] com.lucifer.ElasticJobApplication        : Started ElasticJobApplication in 2.223 seconds (JVM running for 3.034)
2020-05-04 23:20:43.075  INFO 19320 --- [myDataFlowJob-1] com.lucifer.job.MyDataFlowJob            : 分片项:0,我抓取的数据:[Order(orderId=0, status=0), Order(orderId=2, status=0), Order(orderId=4, status=0), Order(orderId=6, status=0), Order(orderId=8, status=0), Order(orderId=10, status=0), Order(orderId=12, status=0), Order(orderId=14, status=0), Order(orderId=16, status=0), Order(orderId=18, status=0)]
2020-05-04 23:20:43.075  INFO 19320 --- [myDataFlowJob-2] com.lucifer.job.MyDataFlowJob            : 分片项:1,我抓取的数据:[Order(orderId=1, status=0), Order(orderId=3, status=0), Order(orderId=5, status=0), Order(orderId=7, status=0), Order(orderId=9, status=0), Order(orderId=11, status=0), Order(orderId=13, status=0), Order(orderId=15, status=0), Order(orderId=17, status=0), Order(orderId=19, status=0)]
2020-05-04 23:20:48.082  INFO 19320 --- [myDataFlowJob-2] com.lucifer.job.MyDataFlowJob            : 分片项:1,处理中.....
2020-05-04 23:20:48.083  INFO 19320 --- [myDataFlowJob-1] com.lucifer.job.MyDataFlowJob            : 分片项:0,处理中.....
2020-05-04 23:20:51.084  INFO 19320 --- [myDataFlowJob-1] com.lucifer.job.MyDataFlowJob            : 分片项:0,我抓取的数据:[Order(orderId=20, status=0), Order(orderId=22, status=0), Order(orderId=24, status=0), Order(orderId=26, status=0), Order(orderId=28, status=0), Order(orderId=30, status=0), Order(orderId=32, status=0), Order(orderId=34, status=0), Order(orderId=36, status=0), Order(orderId=38, status=0)]
2020-05-04 23:20:51.084  INFO 19320 --- [myDataFlowJob-2] com.lucifer.job.MyDataFlowJob            : 分片项:1,我抓取的数据:[Order(orderId=21, status=0), Order(orderId=23, status=0), Order(orderId=25, status=0), Order(orderId=27, status=0), Order(orderId=29, status=0), Order(orderId=31, status=0), Order(orderId=33, status=0), Order(orderId=35, status=0), Order(orderId=37, status=0), Order(orderId=39, status=0)]
2020-05-04 23:20:56.085  INFO 19320 --- [myDataFlowJob-2] com.lucifer.job.MyDataFlowJob            : 分片项:1,处理中.....
2020-05-04 23:20:56.085  INFO 19320 --- [myDataFlowJob-1] com.lucifer.job.MyDataFlowJob            : 分片项:0,处理中.....
2020-05-04 23:20:59.086  INFO 19320 --- [myDataFlowJob-2] com.lucifer.job.MyDataFlowJob            : 分片项:1,我抓取的数据:[Order(orderId=41, status=0), Order(orderId=43, status=0), Order(orderId=45, status=0), Order(orderId=47, status=0), Order(orderId=49, status=0), Order(orderId=51, status=0), Order(orderId=53, status=0), Order(orderId=55, status=0), Order(orderId=57, status=0), Order(orderId=59, status=0)]
2020-05-04 23:20:59.087  INFO 19320 --- [myDataFlowJob-1] com.lucifer.job.MyDataFlowJob            : 分片项:0,我抓取的数据:[Order(orderId=40, status=0), Order(orderId=42, status=0), Order(orderId=44, status=0), Order(orderId=46, status=0), Order(orderId=48, status=0), Order(orderId=50, status=0), Order(orderId=52, status=0), Order(orderId=54, status=0), Order(orderId=56, status=0), Order(orderId=58, status=0)]
2020-05-04 23:21:04.086  INFO 19320 --- [myDataFlowJob-2] com.lucifer.job.MyDataFlowJob            : 分片项:1,处理中.....
2020-05-04 23:21:04.088  INFO 19320 --- [myDataFlowJob-1] com.lucifer.job.MyDataFlowJob            : 分片项:0,处理中.....
2020-05-04 23:21:07.088  INFO 19320 --- [myDataFlowJob-2] com.lucifer.job.MyDataFlowJob            : 分片项:1,我抓取的数据:[Order(orderId=61, status=0), Order(orderId=63, status=0), Order(orderId=65, status=0), Order(orderId=67, status=0), Order(orderId=69, status=0), Order(orderId=71, status=0), Order(orderId=73, status=0), Order(orderId=75, status=0), Order(orderId=77, status=0), Order(orderId=79, status=0)]
2020-05-04 23:21:07.090  INFO 19320 --- [myDataFlowJob-1] com.lucifer.job.MyDataFlowJob            : 分片项:0,我抓取的数据:[Order(orderId=60, status=0), Order(orderId=62, status=0), Order(orderId=64, status=0), Order(orderId=66, status=0), Order(orderId=68, status=0), Order(orderId=70, status=0), Order(orderId=72, status=0), Order(orderId=74, status=0), Order(orderId=76, status=0), Order(orderId=78, status=0)]
2020-05-04 23:21:12.089  INFO 19320 --- [myDataFlowJob-2] com.lucifer.job.MyDataFlowJob            : 分片项:1,处理中.....
2020-05-04 23:21:12.091  INFO 19320 --- [myDataFlowJob-1] com.lucifer.job.MyDataFlowJob            : 分片项:0,处理中.....
2020-05-04 23:21:15.092  INFO 19320 --- [myDataFlowJob-2] com.lucifer.job.MyDataFlowJob            : 分片项:1,我抓取的数据:[Order(orderId=81, status=0), Order(orderId=83, status=0), Order(orderId=85, status=0), Order(orderId=87, status=0), Order(orderId=89, status=0), Order(orderId=91, status=0), Order(orderId=93, status=0), Order(orderId=95, status=0), Order(orderId=97, status=0), Order(orderId=99, status=0)]
2020-05-04 23:21:15.093  INFO 19320 --- [myDataFlowJob-1] com.lucifer.job.MyDataFlowJob            : 分片项:0,我抓取的数据:[Order(orderId=80, status=0), Order(orderId=82, status=0), Order(orderId=84, status=0), Order(orderId=86, status=0), Order(orderId=88, status=0), Order(orderId=90, status=0), Order(orderId=92, status=0), Order(orderId=94, status=0), Order(orderId=96, status=0), Order(orderId=98, status=0)]
2020-05-04 23:21:20.092  INFO 19320 --- [myDataFlowJob-2] com.lucifer.job.MyDataFlowJob            : 分片项:1,处理中.....
2020-05-04 23:21:20.093  INFO 19320 --- [myDataFlowJob-1] com.lucifer.job.MyDataFlowJob            : 分片项:0,处理中.....
2020-05-04 23:21:23.095  INFO 19320 --- [myDataFlowJob-2] com.lucifer.job.MyDataFlowJob            : 分片项:1,我抓取的数据:null
2020-05-04 23:21:23.096  INFO 19320 --- [myDataFlowJob-1] com.lucifer.job.MyDataFlowJob            : 分片项:0,我抓取的数据:null

 

 
 类似资料: