背景:
最近用POI解析线上的excel文件,在5万条以上的时候性能很慢。甚至内存卡死现象。于是想到用spring-batch分批次读取。 但是spring-batch不支持直接读取excel文件。所以先将excel转为csv文件(测试转换效率:8万条 40s)。然后用spring-batch分批次读取,每次5000条。 然后5000条数据处理再用多线程(forkJoin)处理。
============ 以下记录下工程demo,仅供我本人参考 ===========
1:spring-batch配置-----pom:
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>3.0.8.RELEASE</version>
</dependency>
2:spring-batch配置-----batch-content.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-2.5.xsd"
default-autowire="byName">
<bean id="jobRepository"
class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
<property name="transactionManager" ref="transactionManagerBatch"/>
</bean>
<bean id="jobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository"/>
</bean>
<!-- 这里命名不能和spring的transactionManager重名.否则导致spring事务不生效 -->
<bean id="transactionManagerBatch"
class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>
</beans>
3:spring-batch-----配置:batch-job.xml:
<?xml version="1.0" encoding="UTF-8"?>
<bean:beans xmlns="http://www.springframework.org/schema/batch"
xmlns:bean="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-2.5.xsd
http://www.springframework.org/schema/batch
http://www.springframework.org/schema/batch/spring-batch-2.2.xsd">
<bean:import resource="classpath:META-INF/batch/fee-batch-context.xml"/>
<job id="analysisExcelJob">
<step id="listStep">
<tasklet transaction-manager="transactionManager">
<chunk reader="redeemDataReader" writer="redeemDataWriter" processor="redeemDataProcessor"
commit-interval="5000"/>
</tasklet>
</step>
<listeners>
<listener ref="analysisExcelInterceptor"/>
</listeners>
</job>
<!-- 读取报表文件,csv格式 -->
<bean:bean id="redeemDataReader"
class="org.springframework.batch.item.file.FlatFileItemReader"
scope="step">
<bean:property name="resource"
value="file:#{jobParameters['file.data']}"/>
<bean:property name="linesToSkip" value="4"/>
<bean:property name="lineMapper">
<bean:bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<bean:property name="lineTokenizer">
<!-- 映射的字段以下面names属性, 须覆盖所有表头, 以 , 隔开 -->
<bean:bean
class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<bean:property name="names" value="计划回款时间,商户名称,父产品名称,标的名称,合同编号,进件编码,
标的募集金额,投资人利率,当前期数,总期数,应还金额,应还利息,罚息金额,还款总额,代扣实际到账,未到账金额,
商户分润金额,产品起息日,虚户时间,滞销天数,首次回款日,运营滞销贴息,商户线下应还,
当期是否提前回款,回款模式,是否是转非标,是否提现成功"/>
</bean:bean>
</bean:property>
<!-- 如果表头是英文选择默认BeanWrapperFieldSetMapper配置, 中文需要自定义读取的字段映射给实体对象 -->
<bean:property name="fieldSetMapper">
<bean:bean class="com.fee.batch.mapper.MerchantPayFieldSetMapper"></bean:bean>
</bean:property>
</bean:bean>
</bean:property>
</bean:bean>
<bean:bean id="analysisExcelInterceptor"
class="com.fee.batch.intercepter.AnalysisExcelInterceptor"/>
<bean:bean id="redeemDataProcessor" scope="prototype" class="com.fee.batch.process.RedeemDataProcessor"/>
<bean:bean id="redeemDataWriter" scope="step" class="com.fee.batch.writer.RedeemDataItemWriter"/>
</bean:beans>
4:增加前置后置的拦截器:AnalysisExcelInterceptor
@Service
public class AnalysisExcelInterceptor {
private static final Logger LOGGER = LoggerFactory.getLogger(AnalysisExcelInterceptor.class);
@BeforeJob
public void beforeJob(JobExecution jobExecution) {
LOGGER.info("》》》批处理任务开始!开始执行== 前置任务 《《《");
}
@AfterJob
public void afterJob(JobExecution jobExecution) {
Long startTime = Long.valueOf(jobExecution.getJobParameters().getString("startTime"));
Long endTime = System.currentTimeMillis();
LOGGER.info("》》》 批处理任务结束!执行后置任务! 总耗时: :" + (endTime - startTime) / 1000 + " s!《《《");
String commonTempPath = jobExecution.getJobParameters().getString("commonTempPath");
try {
FileUtils.deleteDirectory(new File(commonTempPath));
LOGGER.info("》》》 清除临时文件成功!文件路径:" + commonTempPath + "《《《");
} catch (IOException e) {
LOGGER.info("》》》 清除临时文件失败!文件路径:" + commonTempPath + "《《《");
}
}
}
5:增加表头映射类,选取需要的字段属性----MerchantPayFieldSetMapper,这里如果是英文也可以选择默认的分隔符配置。
public class MerchantPayFieldSetMapper implements FieldSetMapper<TzRepayReportVo> {
@Override
public TzRepayReportVo mapFieldSet(FieldSet fieldSet) throws BindException {
TzRepayReportVo tzRepayReportVo = new TzRepayReportVo();
return tzRepayReportVo.setProduceDebtsId(fieldSet.readString("进件编码"))
.setPhase(fieldSet.readString("当前期数"))
.setTotalPhase(fieldSet.readString("总期数"))
.setDelaySubsidy(fieldSet.readString("运营滞销贴息"))
.setOccurDate(fieldSet.readString("计划回款时间"))
.setMerchantName(fieldSet.readString("商户名称"))
.setProductName(fieldSet.readString("父产品名称"))
.setSubjectName(fieldSet.readString("标的名称"))
.setMerchantPayAmount(fieldSet.readString("商户线下应还"))
.setWithholdActualAmount(fieldSet.readString("代扣实际到账"))
.setMerchantShareProfitAmount(fieldSet.readString("商户分润金额"))
.setWithdrawSuccessDesc(fieldSet.readString("是否提现成功"));
}
}
6:增加中间过程类----RedeemDataProcessor:
public class RedeemDataProcessor implements ItemProcessor<TzRepayReportVo, TzRepayReportVo> {
private static final Logger LOGGER = LoggerFactory.getLogger(RedeemDataProcessor.class);
@Override
public TzRepayReportVo process(TzRepayReportVo tzRepayReportVo) throws Exception {
return tzRepayReportVo;
}
}
7:增加写入类-----RedeemDataItemWriter:
public class RedeemDataItemWriter implements ItemWriter<TzRepayReportVo> {
private static final Logger LOGGER = LoggerFactory.getLogger(RedeemDataItemWriter.class);
private int countTotal = 0;
private int failTotal = 0;
private static final ForkJoinPool pool = new ForkJoinPool(4);
@Autowired
private MerchantPayManagerService merchantPayManagerService;
@Override
public void write(List<? extends TzRepayReportVo> list) throws Exception {
countTotal = countTotal + list.size();
LOGGER.info("获取到merchantPayVoList大小: " + list.size() + ",当前记录总条数:" + countTotal + " 条");
List<TzRepayReportVo> tzRepayReportVos = (List<TzRepayReportVo>) list;
AtomicInteger failCount = pool.invoke(new DealMerchantPayReportsTask(merchantPayManagerService, tzRepayReportVos));
failTotal = failTotal + failCount.get();
if (failTotal > 0) {
throw new ApplicationException("== 文件执行完有失败记录! 总条数: " + countTotal + " 条. 失败数量: " + failCount.get() + " 条. ==");
}
}
}
8:(此步骤和spring-batch没有关系)拆分处理每个批次的5000条数据Task-----DealMerchantPayReportsTask:
public class DealMerchantPayReportsTask extends RecursiveTask<AtomicInteger> {
/**
* The main computation performed by this task.
*
* @return the result of the computation
*/
private static final Logger LOGGER = LoggerFactory.getLogger(DealMerchantPayReportsTask.class);
private static final int THREAD_HOLD = 1000;
private int begin;
private int end;
private AtomicInteger failNum = new AtomicInteger(0);
private List<TzRepayReportVo> tzRepayReportVos;
@Autowired
private MerchantPayManagerService merchantPayManagerService;
public DealMerchantPayReportsTask(MerchantPayManagerService merchantPayManagerService, List<TzRepayReportVo> tzRepayReportVos) {
this.merchantPayManagerService = merchantPayManagerService;
this.tzRepayReportVos = tzRepayReportVos;
this.begin = 0;
this.end = tzRepayReportVos.size();
}
@Override
protected AtomicInteger compute() {
List<DealMerchantPayReportsTask> dealMerchantPayReportsTasks = Lists.newArrayList();
if (end - begin <= THREAD_HOLD) {
for (TzRepayReportVo tzRepayReportVo : tzRepayReportVos) {
try {
merchantPayManagerService.dealMerchantPayReports(tzRepayReportVo);
} catch (Exception e) {
failNum.incrementAndGet();
LOGGER.error("标的名称:" + tzRepayReportVo.getSubjectName() + "处理异常!原因:", e);
}
}
} else {
int middle = (begin + end) / 2;
List<TzRepayReportVo> leftList = tzRepayReportVos.subList(begin, middle);
DealMerchantPayReportsTask leftTask = new DealMerchantPayReportsTask(merchantPayManagerService, leftList);
dealMerchantPayReportsTasks.add(leftTask);
List<TzRepayReportVo> rightList = tzRepayReportVos.subList(middle, end);
DealMerchantPayReportsTask rightTask = new DealMerchantPayReportsTask(merchantPayManagerService, rightList);
dealMerchantPayReportsTasks.add(rightTask);
invokeAll(dealMerchantPayReportsTasks);
for (DealMerchantPayReportsTask task : dealMerchantPayReportsTasks) {
task.join();
}
}
LOGGER.info("执行完所有任务!失败数:" + failNum + " 条!");
return failNum;
}
}
9:增加初始化的JobLaunchersuport:
public class JobLaunchSupport implements InitializingBean {
private static final Logger LOGGER = LoggerFactory.getLogger(JobLaunchSupport.class);
@Autowired
private JobLauncher jobLauncher;
/**
* 获取job执行器
*
* @param job job
* @param parameters job参数
* @return 执行器
*/
public JobExecution run(Job job, JobParameters parameters) {
LOGGER.info("开始处理报表回款解析任务");
try {
Assert.notNull(job, "job can not be null");
return jobLauncher.run(job, parameters);
} catch (Exception e) {
LOGGER.error("处理报表回款任务异常!", e);
throw new ApplicationException("处理报表回款任务异常!" + e);
}
}
@Override
public void afterPropertiesSet() throws Exception {
Assert.notNull(jobLauncher, "jobLauncher can not be null");
}
}
10:增加自定义的JobLancher, 这里可以设置自定义的参数:
@Component("redeemDataJobLaunch")
public class RedeemDataJobLaunch extends JobLaunchSupport {
private static final Logger LOGGER = LoggerFactory.getLogger(RedeemDataJobLaunch.class);
@Resource(name = "analysisExcelJob")
private Job analysisExcelJob;
/**
* 执行任务
*
* @param path 文件路径
*/
public void execute(String path, String commonTempPath) {
JobParameters parameters = genParameters(path, commonTempPath);
LOGGER.info("开始处理可抵扣数据!");
run(analysisExcelJob, parameters);
}
/**
* 构建任务参数
*
* @param path 文件路径
* @return 任务参数
*/
private JobParameters genParameters(String path, String commonTempPath) {
JobParametersBuilder builder = new JobParametersBuilder();
builder.addString("file.data", path);
builder.addString("commonTempPath", commonTempPath);
builder.addString("startTime", String.valueOf(System.currentTimeMillis()));
return builder.toJobParameters();
}
}
11:调用处:
//1:excel-->csv
String csvFileName = "TzRepayReports" + excelDate + ".csv";
String csvFileAbsolutePath = commonTempPath + csvFileName;
excelToCsv(targetExcelFile, csvFileAbsolutePath);
//2:spring-batch批处理
redeemDataJobLaunch.execute(csvFileAbsolutePath, commonTempPath);
excel-->csv:
/**
* xlsx后缀的Excel转csv
*
* @param targetExcelFile
* @param csvFileAbsolutePath
*/
private void excelToCsv(File targetExcelFile, String csvFileAbsolutePath) {
LOGGER.info("临时csv文件路径: " + csvFileAbsolutePath);
int minColumns = -1;
OutputStream outputStream = null;
try {
outputStream = new FileOutputStream(csvFileAbsolutePath);
} catch (FileNotFoundException e) {
LOGGER.info("FileNotFoundException- 文件地址: " + csvFileAbsolutePath + ", Excel文件不存在!", e);
throw new ApplicationException("Excel文件不存在!");
}
PrintStream printStream = new PrintStream(outputStream);
OPCPackage opcPackage = null;
try {
opcPackage = OPCPackage.open(targetExcelFile.getPath(), PackageAccess.READ);
} catch (InvalidFormatException e) {
LOGGER.info("InvalidFormatException- Excel转为csv异常: ", e);
throw new ApplicationException("Excel转为csv异常!");
} catch (InvalidOperationException e) {
LOGGER.info("InvalidOperationException- Excel转为csv异常: ", e);
throw new ApplicationException("Excel转为csv异常!");
}
Excel2Csv excel2Csv = new Excel2Csv(opcPackage, printStream, minColumns);
try {
excel2Csv.process();
outputStream.flush();
} catch (IOException e) {
LOGGER.info("IOException- 文件流写入csv文件异常: ", e);
throw new ApplicationException("文件流写入csv文件异常!");
} catch (OpenXML4JException e) {
LOGGER.info("InvalidOperationException- 文件流写入csv文件异常: ", e);
throw new ApplicationException("文件流写入csv文件异常!");
} catch (SAXException e) {
LOGGER.info("InvalidOperationException- 文件流写入csv文件异常: ", e);
throw new ApplicationException("文件流写入csv文件异常!");
}
}
12:最后附一个Excel转化Csv相应的Pom和的工具类:
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>4.0.0</version>
</dependency>
/* ====================================================================
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==================================================================== */
package com.fee.batch;
import org.apache.poi.ooxml.util.SAXHelper;
import org.apache.poi.openxml4j.exceptions.OpenXML4JException;
import org.apache.poi.openxml4j.opc.OPCPackage;
import org.apache.poi.openxml4j.opc.PackageAccess;
import org.apache.poi.ss.usermodel.DataFormatter;
import org.apache.poi.ss.util.CellAddress;
import org.apache.poi.ss.util.CellReference;
import org.apache.poi.xssf.eventusermodel.ReadOnlySharedStringsTable;
import org.apache.poi.xssf.eventusermodel.XSSFReader;
import org.apache.poi.xssf.eventusermodel.XSSFSheetXMLHandler;
import org.apache.poi.xssf.eventusermodel.XSSFSheetXMLHandler.SheetContentsHandler;
import org.apache.poi.xssf.model.SharedStrings;
import org.apache.poi.xssf.model.Styles;
import org.apache.poi.xssf.model.StylesTable;
import org.apache.poi.xssf.usermodel.XSSFComment;
import org.xml.sax.ContentHandler;
import org.xml.sax.InputSource;
import org.xml.sax.SAXException;
import org.xml.sax.XMLReader;
import javax.xml.parsers.ParserConfigurationException;
import java.io.*;
/**
* A rudimentary XLSX -> CSV processor modeled on the
* POI sample program XLS2CSVmra from the package
* org.apache.poi.hssf.eventusermodel.examples.
* As with the HSSF version, this tries to spot missing
* rows and cells, and output empty entries for them.
* <p>
* Data sheets are read using a SAX parser to keep the
* memory footprint relatively small, so this should be
* able to read enormous workbooks. The styles table and
* the shared-string table must be kept in memory. The
* standard POI styles table class is used, but a custom
* (read-only) class is used for the shared string table
* because the standard POI SharedStringsTable grows very
* quickly with the number of unique strings.
* <p>
* For a more advanced implementation of SAX event parsing
* of XLSX files, see {@link XSSFEventBasedExcelExtractor}
* and {@link XSSFSheetXMLHandler}. Note that for many cases,
* it may be possible to simply use those with a custom
* {@link SheetContentsHandler} and no SAX code needed of
* your own!
*/
/**
* @Author: leBang
* @Date: 18/11/16 11:10
* @Description: *
*/
public class Excel2Csv {
/**
* Uses the XSSF Event SAX helpers to do most of the work
* of parsing the Sheet XML, and outputs the contents
* as a (basic) CSV.
*/
private class SheetToCSV implements SheetContentsHandler {
private boolean firstCellOfRow;
private int currentRow = -1;
private int currentCol = -1;
private void outputMissingRows(int number) {
for (int i = 0; i < number; i++) {
for (int j = 0; j < minColumns; j++) {
output.append(',');
}
output.append('\n');
}
}
@Override
public void startRow(int rowNum) {
// If there were gaps, output the missing rows
outputMissingRows(rowNum - currentRow - 1);
// Prepare for this row
firstCellOfRow = true;
currentRow = rowNum;
currentCol = -1;
}
@Override
public void endRow(int rowNum) {
// Ensure the minimum number of columns
for (int i = currentCol; i < minColumns; i++) {
output.append(',');
}
output.append('\n');
}
@Override
public void cell(String cellReference, String formattedValue,
XSSFComment comment) {
if (firstCellOfRow) {
firstCellOfRow = false;
} else {
output.append(',');
}
// gracefully handle missing CellRef here in a similar way as XSSFCell does
if (cellReference == null) {
cellReference = new CellAddress(currentRow, currentCol).formatAsString();
}
// Did we miss any cells?
int thisCol = (new CellReference(cellReference)).getCol();
int missedCols = thisCol - currentCol - 1;
for (int i = 0; i < missedCols; i++) {
output.append(',');
}
currentCol = thisCol;
// Number or string?
try {
//noinspection ResultOfMethodCallIgnored
Double.parseDouble(formattedValue);
output.append(formattedValue);
} catch (NumberFormatException e) {
output.append('"');
output.append(formattedValue);
output.append('"');
}
}
}
///
private final OPCPackage xlsxPackage;
/**
* Number of columns to read starting with leftmost
*/
private final int minColumns;
/**
* Destination for data
*/
private final PrintStream output;
/**
* Creates a new XLSX -> CSV examples
*
* @param pkg The XLSX package to process
* @param output The PrintStream to output the CSV to
* @param minColumns The minimum number of columns to output, or -1 for no minimum
*/
public Excel2Csv(OPCPackage pkg, PrintStream output, int minColumns) {
this.xlsxPackage = pkg;
this.output = output;
this.minColumns = minColumns;
}
/**
* Parses and shows the content of one sheet
* using the specified styles and shared-strings tables.
*
* @param styles The table of styles that may be referenced by cells in the sheet
* @param strings The table of strings that may be referenced by cells in the sheet
* @param sheetInputStream The stream to read the sheet-data from.
* @throws IOException An IO exception from the parser,
* possibly from a byte stream or character stream
* supplied by the application.
* @throws SAXException if parsing the XML data fails.
*/
public void processSheet(
Styles styles,
SharedStrings strings,
SheetContentsHandler sheetHandler,
InputStream sheetInputStream) throws IOException, SAXException {
DataFormatter formatter = new DataFormatter();
InputSource sheetSource = new InputSource(sheetInputStream);
try {
XMLReader sheetParser = SAXHelper.newXMLReader();
ContentHandler handler = new XSSFSheetXMLHandler(styles, null, strings, sheetHandler, formatter, false);
sheetParser.setContentHandler(handler);
sheetParser.parse(sheetSource);
} catch (ParserConfigurationException e) {
throw new RuntimeException("SAX parser appears to be broken - " + e.getMessage());
}
}
/**
* Initiates the processing of the XLS workbook file to CSV.
*
* @throws IOException If reading the data from the package fails.
* @throws SAXException if parsing the XML data fails.
*/
public void process() throws IOException, OpenXML4JException, SAXException {
ReadOnlySharedStringsTable strings = new ReadOnlySharedStringsTable(this.xlsxPackage);
XSSFReader xssfReader = new XSSFReader(this.xlsxPackage);
StylesTable styles = xssfReader.getStylesTable();
XSSFReader.SheetIterator iter = (XSSFReader.SheetIterator) xssfReader.getSheetsData();
int index = 0;
while (iter.hasNext()) {
try (InputStream stream = iter.next()) {
String sheetName = iter.getSheetName();
this.output.println();
this.output.println(sheetName + " [index=" + index + "]:");
processSheet(styles, strings, new SheetToCSV(), stream);
}
++index;
}
}
public static void main(String[] args) throws Exception {
File xlsxFile = new File("/Users/lebang/Desktop/TzRepayReports20181110.xlsx");
if (!xlsxFile.exists()) {
System.err.println("Not found or not a file: " + xlsxFile.getPath());
return;
}
int minColumns = -1;
if (args.length >= 2) {
minColumns = Integer.parseInt(args[1]);
}
String out = "/Users/lebang/Desktop/TzRepayReports20181203.csv";
File outF = new File(out);
if (!outF.exists()) {
outF.createNewFile();
}
Long start = System.currentTimeMillis();
OutputStream outputStream = new FileOutputStream(out);
PrintStream o = new PrintStream(outputStream);
// The package open is instantaneous, as it should be.
try (OPCPackage p = OPCPackage.open(xlsxFile.getPath(), PackageAccess.READ)) {
Excel2Csv excel2Csv = new Excel2Csv(p, o, minColumns);
excel2Csv.process();
}
outputStream.flush();
Long end = System.currentTimeMillis();
System.out.println("耗时:" + (end - start) / 1000 + "s");
}
}