当前位置: 首页 > 知识库问答 >
问题:

Spring批处理与BeanIO的集成

廖琨
2023-03-14

我正在尝试将BeanIO与spring Batch集成。使用BeanIO,我正在读取一个固定长度的流文件。我已经测试并验证了使用独立类读取平面文件的代码,它可以无缝地工作,但是当我试图将它与Spring Batch集成时,BeanIOFlatFileItemReader的doRead()方法没有被调用,而且我编写的RedemptionEventCustomProcessor是如何直接被调用的。

我在控制台的stacktrace下面。:

  • Spring-batch-core版本2.1.9.发布
  • spring-batch-infrastructure 2.1.9.发布
  • Beanio-2.1.0.m2
  • earn-api-batch-spring批处理xml文件
  • earn-api-batch-context-spring批处理上下文xml文件
  • earn-api-Mapping.xml-BeanIO映射xml文件

earn-api-batch.xml:

    <import resource="classpath:earn-api-batch-context.xml" />

<batch:job id="processEventJob">
    <batch:step id="step">
        <batch:tasklet>
            <batch:chunk reader="RedemptionFileReader" writer="RedemptionEventCustomWriter" processor="RedemptionEventCustomProcessor" commit-interval="1"></batch:chunk>
        </batch:tasklet>
    </batch:step>
</batch:job>

<bean id="RedemptionFileReader" class="org.beanio.spring.BeanIOFlatFileItemReader">
    <property name="streamMapping" value="classpath:/earn-api-mapping.xml" />
    <property name="streamName" value="redemptionFile" />
    <property name="resource" value="classpath:/RedemptionTest" />
</bean>

<bean id="RedemptionEventCustomWriter" class="org.beanio.spring.BeanIOFlatFileItemWriter">
    <property name="streamMapping" value="classpath:/earn-api-mapping.xml" />
    <property name="streamName" value="redemptionFile" />
    <property name="resource" value="file:Redemption.txt" />
</bean>

earn-api-batch-context:

<context:component-scan base-package="com.aexp.earn.api.batch" />

<bean id="jobLauncher"  class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
    <property name="jobRepository" ref="jobRepository" />
</bean>
 <bean id="jobRepository" class="org.springframework.batch.core.repository.support.SimpleJobRepository">
    <constructor-arg>
        <bean class="org.springframework.batch.core.repository.dao.MapJobInstanceDao"/>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.springframework.batch.core.repository.dao.MapJobExecutionDao" />
    </constructor-arg>
    <constructor-arg>
        <bean class="org.springframework.batch.core.repository.dao.MapStepExecutionDao"/>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.springframework.batch.core.repository.dao.MapExecutionContextDao"/>
   </constructor-arg>
</bean>
<bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
 

earn-api-Mapping.xml:

<beanio xmlns="http://www.beanio.org/2012/03" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.beanio.org/2012/03 http://www.beanio.org /2012/03/mapping.xsd">

<stream name="redemptionFile" format="fixedlength">
<!-- 'class' binds the header record to a java.util.HashMap -->
<record name="header" class="map">
  <!-- 'rid' indicates this field is used to identify the record -->
  <field name="recordType" length="1" rid="true" literal="1" />
  <field name="fileSeqNo" length="10" padding=" " justify="right" />
  <!-- 'format' can be used to provide Date and Number formats -->
  <field name="fileDate" length="8" type="date" format="yyyyMMdd" />
</record> 

<record name="nar" class="com.aexp.earn.api.batch.vo.NonAirline">
  <field name="recordType" length="1" ignore="true" />
  <field name="redeemType" rid="true" literal="1" length="1" />
  <field name="mmNo" length="10" />
  <field name="cmNo" length="19" padding=" " justify="right" />
  <field name="rewardProgCode" length="2" padding="0" justify="right" />
  <field name="certNo" length="10"/>
  <field name="certFaceValue" length="13" padding=" " justify="right" />
  <field name="redeemTimeStamp" length="26" />
  <field name="statusCode" length="2" />
  <field name="statusTimeStamp" length="26" />
  <field name="rewardCode" length="4" />
  <field name="rewardSubCode" length="4" />
  <field name="transId" length="15" />
  <field name="transSrcId" length="3" />
  <field name="narRequestId" length="14" />
  <field name="narRequestLnId" length="5" padding=" " justify="right" />
</record> 

<record name="ar" class="com.aexp.earn.api.batch.vo.Airline">
  <field name="recordType" length="1" ignore="true" />
  <field name="redeemType" rid="true" literal="2" length="1" />
  <field name="partnerTransferType" length="1" />
  <field name="mmNo" length="10" />
  <field name="cmNo" length="19" padding=" " justify="right" />
  <field name="rewardProgCode" length="2" />
  <field name="frequentFlyerNo" length="15" />
  <field name="partnerCode" length="2"/>
  <field name="redeemTimeStamp" length="26" />
  <field name="transferedMiles" length="12" padding=" " justify="right" />
  <field name="transferedStatus" length="1" />
  <field name="statusTimeStamp" length="26" />
  <field name="originSourceCode" length="3" />
  <field name="transId" length="15" />
  <field name="transSrcId" length="3" />
  <field name="arRequestId" length="14" />
  <field name="arRequestLnId" length="5" padding=" " justify="right" />
</record> 

<record name="pwp" class="com.aexp.earn.api.batch.vo.PWP">
  <field name="recordType" length="1" ignore="true" />
  <field name="redeemType" rid="true" literal="3" length="1" />
  <field name="mmNo" length="10" />
  <field name="cmNo" length="19" padding=" " justify="right" />
  <field name="rewardProgCode" length="2" />
  <field name="redeemTimeStamp" length="26" />
  <field name="adjustMiles" length="12" />
  <field name="adjustCD" length="4" />
  <field name="transId" length="15" />
  <field name="transSrcId" length="3" />
  <field name="pwpRequestId" length="14" padding=" " justify="right" />
  <field name="pwpRequestLnId" length="5" />
</record>

<record name="pap" class="com.aexp.earn.api.batch.vo.PAP">
  <field name="recordType" length="1" ignore="true" />
  <field name="redeemType" rid="true" literal="4" length="1" />
  <field name="mmNo" length="10" />
  <field name="cmNo" length="19" padding=" " justify="right" />
  <field name="rewardProgCode" length="2" />
  <field name="redeemTimeStamp" length="26" />
  <field name="chnlPtnrId" length="5" />
  <field name="orderId" length="32" />
  <field name="partnerReserId" length="32" />
  <field name="confirmId" length="16" padding=" " justify="right" />
  <field name="actPtCnt" length="12" />
  <field name="rewdActTypeCd" length="3" />
  <field name="seqNo" length="5" />
</record>

<!-- 'target' binds the trailer record to the Integer record count field -->
<record name="trailer" target="recordCount">
  <!-- 'literal' is used to define constant values -->
  <field name="recordType" rid="true" literal="9" length="2" />
  <!-- 'type' can be declared where bean introspection is not possible -->
  <field name="recordCount" length="7" type="int" />
</record> 

</stream>

</beanio>

主类:

public class EventStart {
@SuppressWarnings("resource")
public static void main(String[] args) {
    String[] springConfig  =
        {   
            "earn-api-batch.xml"
        };
    ClassPathXmlApplicationContext appContext = null;
     appContext = new ClassPathXmlApplicationContext(springConfig);
     
    JobLauncher jobLauncher = (JobLauncher) appContext.getBean("jobLauncher");
    Job job = (Job) appContext.getBean("processEventJob");
   
    try {

        JobExecution execution = jobLauncher.run(job, new JobParameters());
        System.out.println("Exit Status : " + execution.getStatus() + " job Id " + execution.getJobId() + " " + execution.getAllFailureExceptions());

    } catch (Exception e) {
        e.printStackTrace();
    }

    System.out.println("Done");

}
}
@Component(value = "RedemptionFileReader")
@Scope("step")
public class RedemptionFileReader extends BeanIOFlatFileItemReader<Map<String,Object>>{

/*    @Value("#{batchProps['redemption.server.file.path']}")
private Resource resource;*/

Map<String,Object> map = new HashMap<String,Object>();

/*@Override
public String read() throws Exception, UnexpectedInputException,
        ParseException, NonTransientResourceException {
    System.out.println("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh");
    // TODO Auto-generated method stub
    return null;
}*/

@Override
public  Map<String,Object> doRead() throws Exception, UnexpectedInputException,
ParseException, NonTransientResourceException {
    System.out.println("HHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHH");
    /*if (null != recordsCount) {
        return null;
    }*/
    map = identifyRecord();
    return map;
   
}


private Map<String,Object> identifyRecord() throws FileNotFoundException {
   
    Map<String,Object> map=new HashMap<String,Object>();
    ArrayList<NonAirline> narList = new ArrayList<NonAirline>();
    ArrayList<Airline> arList = new ArrayList<Airline>();
    ArrayList<PWP> pwpList = new ArrayList<PWP>();
    ArrayList<PAP> papList = new ArrayList<PAP>();
     // create a StreamFactory
    StreamFactory factory = StreamFactory.newInstance();
    // load the mapping file
    factory.load("src/main/resources/earn-api-mapping.xml");
   
    // use a StreamFactory to create a BeanReader
    BeanReader in = factory.createReader("redemptionFile", new File("src/main/resources/RedemptionTest"));
    Object record = null;
    String recordCount = "";

    while ((record = in.read()) != null) {
       
        if ("header".equals(in.getRecordName())) {
            //Map<String,Object> header = (Map<String,Object>) record;
            //System.out.println(header.get("fileDate"));
            System.out.println("Header");
        }
        else if ("nar".equals(in.getRecordName())) {
            NonAirline nar = (NonAirline) record;
            System.out.println("NAR Redeem Type: " + nar.getRedeemType());
            System.out.println("NAR MM No.: " + nar.getMmNo());
            System.out.println("NAR CM No.: " + nar.getCmNo());
            narList.add(nar);
           
        }
        else if ("ar".equals(in.getRecordName())) {
            Airline ar = (Airline) record;
            System.out.println("AR Redeem Type: " + ar.getRedeemType());
            System.out.println("AR MM No.: " + ar.getMmNo());
            System.out.println("AR CM No.: " + ar.getCmNo());
            arList.add(ar);
        }
        else if ("pwp".equals(in.getRecordName())) {
            PWP pwp = (PWP) record;
            System.out.println("PWP Redeem Type: " + pwp.getRedeemType());
            System.out.println("PWP MM No.: " + pwp.getMmNo());
            System.out.println("PWP CM No.: " + pwp.getCmNo());
            pwpList.add(pwp);
        }
        else if ("pap".equals(in.getRecordName())) {
            PAP pap = (PAP) record;
            System.out.println("PAP Redeem Type: " + pap.getRedeemType());
            System.out.println("PAP MM No.: " + pap.getMmNo());
            System.out.println("PAP CM No.: " + pap.getCmNo());
            papList.add(pap);
        }
        else if ("trailer".equals(in.getRecordName())) {
            recordCount = (String) record;
            System.out.println("Trailer");
            System.out.println(recordCount + " contacts processed");
           
        }

    }
    map.put("NAR", narList);
    map.put("AR", arList);
    map.put("PWP", pwpList);
    map.put("PAP", papList);
    in.close();
    return map;
   
   
}

}
@Component(value = "RedemptionEventCustomProcessor")
public class RedemptionEventCustomProcessor implements ItemProcessor<Map<String,Object>, Map<String,Object>> {

ArrayList<NonAirline> narList = new ArrayList<NonAirline>();
ArrayList<Airline> arList = new ArrayList<Airline>();
ArrayList<PWP> pwpList = new ArrayList<PWP>();
ArrayList<PAP> papList = new ArrayList<PAP>();



@SuppressWarnings("unchecked")
@Override
public Map<String,Object> process(Map<String,Object> map) throws Exception {
    System.out.println(":::::::::::::In Processor::::::::::::::::");   
   
    narList = (ArrayList<NonAirline>) map.get("NAR");
   
    for(int i=0; i <= narList.size(); i++)
    {
       
        NonAirline nar     = narList.get(i);
        System.out.println("CM No. " + nar.getCmNo());
       
    }
    return map;
       
}

}

如果需要任何其他信息,请让我知道。任何帮助都将非常感谢,请考虑我对BeanIO和spring都是新的。如果有人可以分享一个使用Spring Batch的BeanIO工作示例,这也会有所帮助。

提前谢谢你。

共有1个答案

呼延河
2023-03-14

我也在spring Batch中使用beanio。您得到的错误可能是版本问题。以下是作业XML中的xsd配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:p="http://www.springframework.org/schema/p"
    xmlns:batch="http://www.springframework.org/schema/batch" xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:util="http://www.springframework.org/schema/util"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd
        http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
 类似资料:
  • 我有一个spring批处理应用程序,它从文件中读取数据,进行一些处理,最后编写一个定制的输出。这一切都是一步到位的。在下一步中,我将使用一个tasklet来归档输入文件(移动到另一个文件夹)。这个应用程序运行良好。但是,现在我需要在远程服务器上对sftp输出文件进行进一步处理。我找到了一种使用spring integration实现sftp的方法,在这里我创建了一个输入通道,该通道将反馈给outb

  • 我正在将Spring Boot项目与Spring批处理和数据jpa项目集成。所有与作业和数据配置相关的东西都是正确的,除了将我的作业编写器结果保存在数据库中。在我读取文件并对其进行处理后,我无法将其写入mysql数据库。没有错误,但也没有插入。有趣的是我的数据源已配置。因为在插入之前,我可以从数据库中获取示例记录。请协助我解决这个问题。 我的申请。属性: 批次配置: 道类: 作家类: temPer

  • 我有以下工作要处理在一定的时间间隔或特别的基础上。 作业中的步骤如下: 我也想要用户界面,在那里我可以触发一个特别的基础上的工作,而且我应该能够提供参数从用户界面。 我想用Spring batch来完成这个任务,但它更多的是用于读->处理->写之类的工作。这里,在第一步中,我正在生成由第二步读取的数据。我不确定我是否还可以使用Spring batch来实现这个,或者有更好的方法来实现这个。

  • 我试图使用Spring Boot、Spring Batch和Spring集成来配置一个RemoteChunking任务。 我已经配置了服务器,并按照官方文档https://docs.Spring.io/spring-batch/4.0.x/reference/html/spring-batch-integration.html#remote-chunking开始配置Spring批处理。 我的主配置

  • 我正在寻找一些关于测试Spring批处理步骤和步骤执行的一般性意见和建议。 我的基本步骤是从api读入数据,处理实体对象,然后写入数据库。我已经测试了快乐之路,这一步成功地完成了。我现在想做的是在处理器阶段数据丢失时测试异常处理。我可以单独测试processor类,但我更愿意测试整个步骤,以确保在步骤/作业级别正确反映流程故障。 我已经阅读了spring批量测试指南,如果我是诚实的,我对它有点迷茫