这里将Spring Batch与Elastic-Job进行融合,至于Spring Batch与Elastic-Job可以参考之前的文章作为基础,融合重点是在基于分片时的业务分配
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.12.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.gs</groupId>
<artifactId>spring-job</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-job</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!-- 引入elastic-job-lite核心模块 -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
<!-- 使用springframework自定义命名空间时引入 -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.11</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-lifecycle</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.39</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
server.port=9087
# zookeeper集群
elaticjob.zookeeper.server-lists=127.0.0.1:2181
elaticjob.zookeeper.namespace=my-project
# 主要是为了存储任务执行的日志
spring.datasource.druid.us
spring.datasource.druid.log.url=jdbc:mysql://127.0.0.1:3306/gsong?characterEncoding=utf-8&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=GMT%2B8
spring.datasource.druid.log.username=root
spring.datasource.druid.log.password=123456
spring.datasource.druid.log.driver-class-name=com.mysql.cj.jdbc.Driver
#spring.datasource.druid.log.maxActive=16
# 自动创建更新验证数据库结构
spring.jpa.hibernate.ddl-auto=update
spring.jpa.database=mysql
spring.jpa.show-sql=true
#色之后将不会自启动job
spring.batch.job.enabled=false
reader代码
@Bean("reader")
public FlatFileItemReader<User> reader()throws Exception {
FlatFileItemReader<User> reader = new FlatFileItemReader<>();
reader.setEncoding("GBK");
reader.setLineMapper(new LineMapper<User>() {
@Override
public User mapLine(String s, int i) throws Exception {
if(s==null||"".equals(s)){
return new User();
}
List<String> collect = Arrays.stream(s.split(" ")).filter(a -> !a.trim().equals("")).collect(Collectors.toList());
String s1 =collect.size()>=1? collect.get(0):" ";
String s2 =collect.size()>=2? collect.get(1):" ";
String s3 =collect.size()>=3? collect.get(2):" ";
User user =new User(){{
setName(s1);
setPassword(s2);
setEmail(s3);
}};
return user;
}
});
return reader;
}
@Bean("listResourceMq")
public MultiResourceItemReader<User> listResourceMq(@Qualifier("reader") FlatFileItemReader<User> reader ){
MultiResourceItemReader<User> multiResourceItemReader =new MultiResourceItemReader<User>();
multiResourceItemReader.setDelegate(reader);
List<Resource> list =new ArrayList<>();
for (int a =1;a<51;a++) {
if(a%2==1){
if(a==35) continue;
list.add(new ClassPathResource("tianya_" + a + ".txt"));
}
}
int start=list.size();
Object[] obj= list.toArray();
Resource[] qiye=new Resource[start];
for(int i=0;i<obj.length;i++){
qiye[i]=(Resource) obj[i];
}
multiResourceItemReader.setResources(qiye);
return multiResourceItemReader;
}
processor代码实现
@Bean
public ItemProcessor<User,User> processor(){
//使用我们自定义的ItemProcessor的实现CsvItemProcessor
CsvItemProcessor processor = new CsvItemProcessor();
//为processor指定校验器为CsvBeanValidator()
processor.setValidator(csvBeanValidator());
return processor;
}
writer代码实现
@Bean
public ItemWriter<User> writer(){
JdbcBatchItemWriter<User> writer = new JdbcBatchItemWriter<>();
//我们使用JDBC批处理的JdbcBatchItemWriter来写数据到数据库
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
String sql = "insert into user3 "+" (gamename,name,email) "
+" values(:name,:password,:email)";
//在此设置要执行批处理的SQL语句
writer.setSql(sql);
writer.setDataSource(dataSource);
return writer;
}
配置batch step
@Bean("step1")
public Step step1(StepBuilderFactory stepBuilderFactory, @Qualifier("listResourceMq") MultiResourceItemReader<User> listResourceMq, ItemWriter<User> writer,
ItemProcessor<User,User> processor){
return stepBuilderFactory
.get(step1)
.<User, User>chunk(65000)//批处理每次提交65000条数据
.reader(listResourceMq)//给step绑定reader
.faultTolerant()
.skip(NullPointerException.class)
.skip(Exception.class)
.skipLimit(10000)
.processor(processor)//给step绑定processor
.writer(writer)//给step绑定writer
.taskExecutor(new SimpleAsyncTaskExecutor())
.throttleLimit(16)
.build();
}
配置batch job
@Bean("importJob")
public Job importJob(JobBuilderFactory jobBuilderFactory, @Qualifier("step1") Step s1){
return jobBuilderFactory.get("importJob")
.incrementer(new RunIdIncrementer())
.flow(s1)//为Job指定Step
.end()
.listener(csvJobListener())//绑定监听器csvJobListener
.build();
}
job配置
public SpringJobScheduler instanceSimpleJob() throws IOException {
// 定义作业核心配置
JobCoreConfiguration simpleCoreConfig =
JobCoreConfiguration
.newBuilder("demoSimpleJob", "0 */3 * * * ?", 2)
.shardingItemParameters("0=A,1=B")
.build();
// 定义SIMPLE类型配置
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, SimpleJobImpl.class.getCanonicalName());
// 定义Lite作业根配置
LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();
//jobEventConfiguration,事件追踪功能将job执行信息写入数据库
return new SpringJobScheduler(simpleJob, zookeeperRegistryCenter, simpleJobRootConfig,customerMoreJobListener);
}
> job实现类,这里是连接springbatch的核心
/**
* @auther 高松
* @DATE 2019/1/5 23:01
* spring-job
*/
@Component
public class SimpleJobImpl implements SimpleJob
{
@Autowired
JobLauncher jobLauncher;
@Autowired
@Qualifier("importJob")
Job importJob;
@Autowired
@Qualifier("importJob1")
Job importJob1;
private static volatile boolean flag=false;
@Override
public void execute(ShardingContext shardingContext) {
switch (shardingContext.getShardingItem()){
case 0:
try {
if(!flag){
flag=true;
System.out.println("执行了A");
//这里根据分片来执行不同的batch job 也就是自己的业务
/* JobParameters jobParameters = new JobParametersBuilder()
.addDate("start",new Date())
.toJobParameters();
jobLauncher.run(importJob1,jobParameters);*/
}
} catch (Exception e) {
e.printStackTrace();
}
break;
case 1:
if(!flag) {
try {
flag=true;
System.out.println("执行了B");
//这里根据分片来执行不同的batch job 也就是自己的业务
/*
JobParameters jobParameters = new JobParametersBuilder()
.addDate("start", new Date())
.toJobParameters();
jobLauncher.run(importJob, jobParameters);*/
} catch (Exception e) {
e.printStackTrace();
}
}
break;
default:
System.out.println("尚未分片!");
break;
}
}
}
源码地址:链接:https://pan.baidu.com/s/1RYlQulxyyw2P5iNbtMaN_A
提取码:2ay1