官网介绍:SpringBatch是一个轻量级、全面的批处理框架,旨在支持开发对企业系统的日常操作至关重要的健壮的批处理应用程序。
Spring Batch 提供了处理大量记录所必需的可重用功能,包括日志记录/跟踪、事务管理、作业处理统计、作业重启、跳过和资源管理。它还提供更高级的技术服务和功能,将通过优化和分区技术实现极高容量和高性能的批处理作业。简单和复杂的大批量批处理作业都可以以高度可扩展的方式利用框架来处理大量信息。
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.3.8</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.72</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.1</version>
</dependency>
</dependencies>
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/sms?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=UTC
username: root
password: 123456
batch:
initialize-schema: always #允许springboot操作数据库创建默认数据表
@Data
public class SeaDove {
private String send_time;
private String phone;
private String msg;
private String id;
private String business;
private String source;
}
-- sms.sea_dove3 definition
CREATE TABLE `sea_dove3` (
`id` bigint NOT NULL AUTO_INCREMENT,
`send_time` varchar(100) DEFAULT NULL,
`phone` varchar(100) DEFAULT NULL,
`msg` varchar(500) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
`business` varchar(100) DEFAULT NULL,
`source` varchar(100) DEFAULT NULL,
KEY `sea_dove_id_IDX` (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=94308865453199388 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
@Configuration
@EnableBatchProcessing
public class DataJob {
private static final Logger log = LoggerFactory.getLogger(DataJob.class);
@Resource
private JobBuilderFactory jobBuilderFactory; //用于构建JOB
@Resource
private StepBuilderFactory stepBuilderFactory; //用于构建Step
@Resource
private JobListener jobListener; //简单的JOB listener
@Resource
private SetpListenter stepListener; //简单的Step listener
@Autowired
DataSource dataSource;
/**
* 一个简单基础的Job通常由一个或者多个Step组成
*/
@Bean
public Job DataJob() {
return jobBuilderFactory.get("DataJob").
incrementer(new RunIdIncrementer()). //设置Job的唯一标识
start(handleDataStep()). //start是JOB执行的第一个step
listener(jobListener). //设置了一个简单JobListener
build();
}
/**
* 一个简单基础的Step主要分为三个部分
* ItemReader : 用于读取数据
* ItemProcessor : 用于处理数据
* ItemWriter : 用于写数据
*/
@Bean
public Step handleDataStep() {
return stepBuilderFactory.get("getData").
<SeaDove, SeaDove>chunk(1000). // <输入,输出> 。chunk通俗的讲类似于SQL的commit; 这里表示处理(processor)100条后写入(writer)一次。
faultTolerant().retryLimit(3). //容错处理,重试3次
retry(Exception.class).
skipLimit(100).skip(Exception.class). //捕捉到异常就重试,重试10次还是异常,JOB就停止并标志失败
reader(getDataReader()). //指定ItemReader
processor(getDataProcessor()). //指定ItemProcessor
writer(writer()). //指定ItemWriter
listener(stepListener).
build();
}
@Bean
public ItemReader<? extends SeaDove> getDataReader() {
//这里可以指定从数据库读取数据,也可以指定从文件读取数据
FlatFileItemReader<SeaDove> reader=new FlatFileItemReader<>();
File file = new File("D:\\sendlog\\seadove\\seadove\\8yue.json");
reader.setResource(new FileSystemResource(file)) ;
reader.setLineMapper(new SeaDoveReadLineMapper());
return reader;
}
@Bean
public ItemProcessor<SeaDove, SeaDove> getDataProcessor() {
return new ItemProcessor<SeaDove, SeaDove>() {
@Override
public SeaDove process(SeaDove seaDove) throws Exception {
if (StrUtil.isNotEmpty(seaDove.getMsg())) {
return seaDove;
}
return null;
}
};
}
@Bean
public JdbcBatchItemWriter<SeaDove> writer() {
return new JdbcBatchItemWriterBuilder<SeaDove>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("insert into sea_dove3 (send_time,phone,msg,business,source) values(:send_time,:phone,:msg,:business,:source)")
.dataSource(dataSource)
.build();
}
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);//核心线程数
executor.setMaxPoolSize(20);//设置最大线程数
executor.setQueueCapacity(100);//队列大小
executor.setThreadNamePrefix("batch-"); //线程名称前缀
executor.initialize();//初始化
return executor;
}
}
public class SeaDoveReadLineMapper implements LineMapper<SeaDove> {
@Override
public SeaDove mapLine(String s, int i) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(s);
String content = jsonObject.getString("content");
if (content.contains("step=0")) {
SeaDove seaDove = new SeaDove();
final String[] split = content.split("\\|");
seaDove.setSend_time(split[0]);
String collect = StreamUtils.handle5(split[1]);
String[] split1 = collect.split("\\|");
seaDove.setPhone(split1[0]);
String msg = URLUtil.decode(split1[1]);
seaDove.setBusiness(getBusiness(msg));
seaDove.setMsg(msg);
seaDove.setSource(split1[2]);
return seaDove;
}
return new SeaDove();
}
public String getBusiness(String msg) {
final List<String> collect = SeaDoveConstant.list.stream().filter(s -> {
if (msg.contains(s)) {
return true;
}
return false;
}).collect(Collectors.toList());
if (!collect.isEmpty()) {
return String.join("|", collect);
} else {
return "other";
}
}
}
@Component
@Slf4j
public class SetpListenter implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
System.out.println("beforeStep");
log.info("beforeStep : {}", JSONObject.toJSONString(stepExecution));
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
log.info("afterStep : {}", JSONObject.toJSONString(stepExecution));
System.out.println("afterStep");
if (stepExecution.getReadSkipCount() > 0 || stepExecution.getProcessSkipCount() > 0 || stepExecution.getWriteSkipCount() > 0) {
return new ExitStatus(ExitStatus.FAILED.getExitCode());
} else {
return ExitStatus.COMPLETED;
}
}
}
@Component
public class JobListener implements JobExecutionListener {
private static final Logger log = LoggerFactory.getLogger(JobListener.class);
@Resource
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
private long startTime;
@Override
public void beforeJob(JobExecution jobExecution) {
startTime = System.currentTimeMillis();
log.info( jobExecution.getJobInstance().getJobName() +" Job 任务处理开始 " + jobExecution.getJobParameters());
}
@Override
public void afterJob(JobExecution jobExecution) {
log.info("JOB STATUS : {}", jobExecution.getStatus());
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
log.info("JOB FINISHED");
threadPoolTaskExecutor.destroy();
} else if (jobExecution.getStatus() == BatchStatus.FAILED) {
log.info("JOB FAILED");
}
log.info("Job Cost Time : {}ms" , (System.currentTimeMillis() - startTime));
}
}
@Slf4j
@RestController
public class TestController {
@Autowired
DataJob dataJob;
@Autowired
private JobLauncher jobLauncher;
@RequestMapping("/data")
public String data() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
// 根据时间配置每个job完成的使用参数,因为时间不一样所以参数不一样,所以任务可以重复执行
JobParameters jobParameters = new JobParametersBuilder()
.addDate("date", new Date())
.addString("data", "data") // 传入参数 可以带入到job中
.toJobParameters();
JobExecution run = jobLauncher.run(dataJob.DataJob(), jobParameters);
log.info("job执行结果:{}", JSONObject.toJSONString(run));
BatchStatus status = run.getStatus();
return JSONObject.toJSONString(status);
}
}
文件数据处理后入库数据 | springcatch | flink |
---|---|---|
343行数据入库93行耗时 | 234ms | 3026ms |
1102000行数据入库275500行耗时 | 45927ms | 184978ms |
6362288行数据入库1590572行耗时 | 329079ms | 1283280ms |
注:flink相关程序见flink批处理入库mysql