当前位置: 首页 > 知识库问答 >
问题:

了解 Spring Batch 中的 AsyncItemWriter,以及何时使用 MyBatis 编写块?

杜弘光
2023-03-14

在下面,我的写入是否一次提交到数据库1000——因为我在Spring Batch作业中的提交间隔设置为1000?MyBatis SqlSessionFactory被定义为BATCH执行

<bean id="sqlSession" class="org.mybatis.spring.SqlSessionTemplate">
    <constructor-arg index="0" ref="sqlSessionFactory" />
    <constructor-arg index="1" value="BATCH" />
</bean>
<!-- define the SqlSessionFactory -->
<bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
    <property name="dataSource" ref="dataSource" />
    <property name="typeAliasesPackage" value="org.my.domain" />
</bean>

我在调试日志中注意到:

2015-08-21 22:58:54,632 [main] DEBUG org.mybatis.spring.SqlSessionUtils - Fetched SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@174b870c] from current transaction

在上面的 DEBUG 语句中,我认为它正在打开连接。然后在插入语句下方插入到一批 1,000 个中 - 或者它是否为每个插入打开了与 MS SQL 服务器的新连接?

2015-08-21 22:58:54,632 [main] DEBUG org.mybatis.spring.SqlSessionUtils - Fetched SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@174b870c] from current transaction
KickoutMapper.insertKickoutTbl - ==> Parameters: 12143(Long), 10039(Long), 0(Integer), SUBSCRIBER4998(String), .....
2015-08-21 22:58:54,632 [main] DEBUG org.mybatis.spring.SqlSessionUtils - Releasing transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@174b870c]

最后它正在关闭事务(同样,我不确定它是否正在关闭与MS SQL服务器的连接,或者是否在每次插入后都这样做了)。同样,我想批量插入 1,000 条记录,并与数据库建立一条连接。

2015-08-21 22:58:55,376 [main] DEBUG org.mybatis.spring.SqlSessionUtils - Releasing transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@174b870c]

此外,AsyncItemWriter是否可以改为委托给JdbcBatchItemWriter?它是否比我配置的MyBatis更快?在下面的代码中,我委托给CompositeItemWriter,它使用MyBatis映射器写入两个单独的表。

我的用例是这样的:我需要从一个mec_mdw数据库表中读取并验证700万条记录。

处理器认为无效的mec_mdw记录将被插入mec_kickout表。此外,该特定记录的验证失败原因将插入mec_kickout_reason表中。我最初从中读取的mec_mdw表也将在处理结束时更新一些列。

这是迄今为止我使用异步处理和写入所做的,目前我只测试了2500个读取的mdw记录,在这个测试中,所有2500个记录都是故意无效的,因此被插入到mec_kickout表中,原始的mdw表2500行也在更新中。所有这些都在大约50秒内完成,在一台拥有16 Gb RAM的8核CPU笔记本电脑上,MS SQL服务器数据库需要网络呼叫。但我仍然不完全理解是否可以更快地完成。

    <job id="mecmdwvalidatorJob" xmlns="http://www.springframework.org/schema/batch">
    <step id="mdwvalidatorStep1">
        <tasklet>
            <chunk reader="pageItemReader" processor="asyncItemProcessor"
                writer="asynchItemWriter" commit-interval="1000" skip-limit="2147483647">
                <skippable-exception-classes> <!-- TODO -->
                    <include class="java.lang.Exception" />
                </skippable-exception-classes>
            </chunk>
        </tasklet>
    </step>
</job>

<bean id="pageItemReader"
    class="org.springframework.batch.item.database.JdbcPagingItemReader">
    <property name="dataSource" ref="dataSource" />
    <property name="queryProvider">
        <bean
            class="org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean">
            <property name="dataSource" ref="dataSource" />
            <property name="selectClause"
                value="select MDW_ID,FK_LOG_FILE_ID,TAX_YEAR,SUBS_TYPE_CD,SUB_FIRST_NM,SUB_MIDDLE_NM,SUB_LAST_NM,SUB_SUFFIX,SUB_DOB,SUB_ADDR1,SUB_ADDR2,SUB_CITY,SUB_STATE,SUB_PROVINCE,SUB_ZIP,SUB_ZIP4,SUB_COUNTRY_CD,SUB_COUNTRY,SUB_F_POSTAL_CD,LOB,SUB_SSN,GRP_EMP_NAME1,GRP_EMP_NAME2,GRP_EIN,GRP_ADDR1,GRP_ADDR2,GRP_CITY,GRP_STATE,GRP_PROVINCE,GRP_ZIP,GRP_ZIP4,GRP_COUNTRY_CD,GRP_COUNTRY,GRP_F_POSTAL_CD,ISSUER_NAME1,ISSUER_NAME2,ISSUER_PHONE,ISSUER_ADDR1,ISSUER_ADDR2,ISSUER_CITY,ISSUER_PROVINCE,ISSUER_ZIP,ISSUER_ZIP4,ISSUER_COUNTRY_CD,ISSUER_COUNTRY,ISSUER_F_POSTAL_CD,MEM_FIRST_NM,MEM_MIDDLE_NM,MEM_LAST_NM,MEM_SUFFIX,MEM_SSN,MEM_DOB,MEM_START_DATE,MEM_END_DATE,REGION_CD,SUB_MRN,SUB_MRN_PREFIX,MEM_MRN,MRN_PREFIX,PID,SUB_GRP_ID,SUB_GRP_NAME,INVALID_ADDR_FL" />
            <property name="fromClause"
                value="from MEC_MDW JOIN MEC_FILE_LOG on MEC_FILE_LOG.LOG_FILE_ID=MEC_MDW.FK_LOG_FILE_ID  " />
            <property name="whereClause" value="where MEC_FILE_LOG.STATUS=:status" />
            <property name="sortKey" value="MDW_ID" />
        </bean>
    </property>
    <property name="parameterValues">
        <map>
            <entry key="status" value="READY TO VALIDATE" />
        </map>
    </property>
    <property name="pageSize" value="1000" />
    <property name="rowMapper" ref="mdwRowMapper" />
</bean>
<bean id="mdwRowMapper" class="org.my.rowmapper.MdwRowMapper" />

<bean id="asyncItemProcessor"
    class="org.springframework.batch.integration.async.AsyncItemProcessor">
    <property name="delegate">
        <bean
            class="org.my.itemprocessor.MdwValidatingItemProcessor">
            <property name="validator">
                <bean
                    class="org.springframework.validation.beanvalidation.LocalValidatorFactoryBean" />
            </property>
        </bean>
    </property>
    <property name="taskExecutor" ref="taskExecutor" />
    <!-- <property name="taskExecutor"> -->
    <!-- <bean class="org.springframework.core.task.SimpleAsyncTaskExecutor" 
        /> -->
    <!-- </property> -->
</bean>

<task:executor id="taskExecutor" pool-size="10" />

<bean id="asynchItemWriter"
    class="org.springframework.batch.integration.async.AsyncItemWriter">
    <property name="delegate" ref="customerCompositeWriter">
    </property>
</bean>

<bean id="customerCompositeWriter"
    class="org.springframework.batch.item.support.CompositeItemWriter">
    <property name="delegates">
        <list>
            <ref bean="itemWriter1" />
            <ref bean="itemWriter2" />
        </list>
    </property>
</bean>

<bean id="itemWriter1" class="org.my.writer.MdwWriter" />
<bean id="itemWriter2" class="org.my.writer.KickoutWriter" />

</beans>

处理器将具有业务逻辑,现在它只有 Bean 验证,在 MecMdw 域对象中查找空属性。最终,我还需要适配器代码来查找其他表中的帐户 ID(因此更多的数据库连接乐趣!我认为这个适配器DAO逻辑将进入处理器

public class MdwValidatingItemProcessor implements ItemProcessor<MecMdw, MecMdw> {  

private Validator validator;  

public void setValidator(Validator validator) {  
     this.validator = validator;  
}  

public MecMdw process(MecMdw item) throws Exception {  

     BindingResult results = BindAndValidate(item);

     if (results.hasErrors())  {

         item.setKick_out_fl('Y');

         buildValidationException(results,item);

         return item;  

     }

     return item;  
}  

private BindingResult BindAndValidate(MecMdw item) {

     DataBinder binder = new DataBinder(item);

     binder.setValidator(validator);  

     binder.validate();  

     return binder.getBindingResult();  

}  

private void buildValidationException(BindingResult results, MecMdw item) {  

     List<String> listOfErrors = new ArrayList<String>();

     for (ObjectError error : results.getAllErrors()) {  
          listOfErrors.add(error.toString());  
     }  

     item.setValidationErrors(listOfErrors);
}  

MdwWriter和KickoutWriter正在使用MyBatis DAO写入数据库。

public class MdwWriter<MecMdw> implements ItemWriter<MecMdw> {

@Autowired
MdwMapper mdwMapper;


@Override
public void write(List<? extends MecMdw> items) throws Exception {
    for(MecMdw item : items){
        mdwMapper.setMecMdwRecordAsKickOut((org.my.domain.MecMdw) item);

    }
}

这是KickoutWriter.java

public class KickoutWriter<MecMdw> implements ItemWriter<MecMdw> {

@Autowired
KickoutMapper kickoutMapper;

@Override
public void write(List<? extends MecMdw> items) throws Exception {
    for(MecMdw item : items){

        kickoutMapper.insertKickoutTbl((org.my.domain.MecMdw) item);
    }

}

共有1个答案

后树
2023-03-14

1)这里的博客帮助我了解了Spring Batch中的交易。

2)关于性能,您是否尝试过使用Spring Batch的多线程功能?使用线程向上扩展可能足以满足性能要求。

 类似资料:
  • 我需要在没有中间存储的情况下读写压缩(GZIP)流。目前,我使用Spring

  • 我正在尝试对作业使用Spring批处理。我有两个作业tempJob和tempJob2在两个单独的配置中。当尝试使用命令行参数(-dspring.batch.job.names=tempJob)运行tempJob时,SpringBatch尝试运行tempJob两次,我得到以下错误 2018-06-15 11:36:37.956信息14436---[main]O.S.B.C.L.Support.Sim

  • 我和我的团队一直在使用Spring boot开发一系列微服务。由于服务经历了JUnit和Spring Boot升级(我们现在使用的是Spring Boot 2和JUnit 5),不同开发人员实现的不同JUnit现在使用不同的模式: @扩展为 今天,它们之间的区别是什么?我们真的需要它们来进行单元测试还是嵌入到一些新的Spring Boot注释中?

  • 本文向大家介绍简述 Mybatis 的插件运行原理,以及如何编写一个插件相关面试题,主要包含被问及简述 Mybatis 的插件运行原理,以及如何编写一个插件时的应答技巧和注意事项,需要的朋友参考一下 答:Mybatis 仅可以编写针对 ParameterHandler、ResultSetHandler、StatementHandler、Executor 这 4 种接口的插件,Mybatis 使用

  • 我有一个Employee类,它有如下3个字段。 为此,我希望根据员工姓名(empName)排序,如果多个员工的姓名相同,则根据员工id(empId)排序。 为此,我编写了一个自定义比较器,使用java.util.比较器如下所示。 我已经创建了8个Employee对象并添加到ArrayList中,如下所示。 并使用上述比较器对列表进行如下排序。 它工作得非常好。但这可以使用类似的方法来完成,如下所示

  • 问题内容: 我正在使用播放框架来生成分块响应。代码是: 当我使用浏览器访问时,可以看到显示的数据每秒增加。但是,当我编写一个JavaScript函数来接收和处理数据时,发现它将阻塞直到接收到所有数据。 代码是: 当收到所有数据后,我可以看到一个消息框在10秒钟后弹出。 如何获取流并及时处理数据? 问题答案: jQuery不支持该功能,但是您可以使用普通XHR来实现: 这适用于所有现代浏览器,包括此

  • 问题内容: 在大多数情况下,我将使用异常来检查代码中的条件,我想知道何时才是使用断言的适当时间? 例如, 您能指出断言如何适合这里吗?我应该使用断言吗? 似乎我从未在生产代码中使用断言,而仅在单元测试中看到断言。我确实知道,在大多数情况下,我可以像上面一样使用异常来进行检查,但是我想知道“专业”地执行异常的适当方法。 问题答案: 断言应用于检查不应发生的事情,而异常应用于检查可能发生的事情。 例如

  • 我试图理解Docker Compose和Docker Swarm之间的区别或相似之处。 通过阅读留档,我明白docker-compose提供了一种将不同容器绑定在一起并协同工作的机制,作为一个单一的服务(我猜它使用的功能与用于链接两个容器的--link命令相同) 此外,我对docker-swarm的理解是,它允许您管理不同docker主机的集群,每个主机都运行一些docker-image的几个容器