通用批处理模式 - 添加一个Footer记录

优质
小牛编辑
136浏览
2023-12-01

11.3 添加一个Footer记录

经常写文本文件时,在所有处理都已经完成,一个”footer” 记录必须附加到文件的末尾.这也可以通过使用由Spring Batch提供的FlatFileFooterCallback接口,FlatFileItemWriter的FlatFileFooterCallback(和与之对应的FlatFileHeaderCallback)是可选的属性.

  1. <bean id="itemWriter" class="org.spr...FlatFileItemWriter">
  2. <property name="resource" ref="outputResource" />
  3. <property name="lineAggregator" ref="lineAggregator"/>
  4. <property name="headerCallback" ref="headerCallback" />
  5. <property name="footerCallback" ref="footerCallback" />
  6. </bean>

footer回调接口非常简单,它只有一个方法调用.

  1. public interface FlatFileFooterCallback {
  2. void writeFooter(Writer writer) throws IOException;
  3. }

11.3.1 写一个简单Footer

是一个非常常见的需求涉及到footer记录,在输出过程中总计信息然后把这信息附加到文件末尾.这footer作为文件的总结或提供了一个校验和。

例如,如果一个批处理作业是flat文件写贸易记录,所有交易的totalAmount需要放置在footer,然后可以使用followingItemWriter实现:

  1. public class TradeItemWriter implements ItemWriter<Trade>,
  2. FlatFileFooterCallback {
  3. private ItemWriter<Trade> delegate;
  4. private BigDecimal totalAmount = BigDecimal.ZERO;
  5. public void write(List<? extends Trade> items) {
  6. BigDecimal chunkTotal = BigDecimal.ZERO;
  7. for (Trade trade : items) {
  8. chunkTotal = chunkTotal.add(trade.getAmount());
  9. }
  10. delegate.write(items);
  11. // After successfully writing all items
  12. totalAmount = totalAmount.add(chunkTotal);
  13. }
  14. public void writeFooter(Writer writer) throws IOException {
  15. writer.write("Total Amount Processed: " + totalAmount);
  16. }
  17. public void setDelegate(ItemWriter delegate) {...}
  18. }

TradeItemWriter存储的totalAmount值随着每笔交易Amount写出而增长,在交易处理完成后,框架将调用writeFooter,将总金额加入到文件中.注意,写方法使用一个临时变量,chunkTotalAmount,存储交易总数到chunk.这样做是为了确保如果发生跳过写出的方法,totalAmount将保持不变.只有在写出方法结束时,不抛出异常,我们将更新totalAmount.

为了writeFooter被调用,TradeItemWriter(因为实现了FlatFileFooterCallback接口)必须以footerCallback为属性名注入到FlatFileItemWriter 中

  1. <bean id="tradeItemWriter" class="..TradeItemWriter">
  2. <property name="delegate" ref="flatFileItemWriter" />
  3. </bean>
  4. <bean id="flatFileItemWriter" class="org.spr...FlatFileItemWriter">
  5. <property name="resource" ref="outputResource" />
  6. <property name="lineAggregator" ref="lineAggregator"/>
  7. <property name="footerCallback" ref="tradeItemWriter" />
  8. </bean>

如果step是不可重新开始的,TradeItemWriter只会正常运行.这是因为类是有状态(因为它存储了totalAmount),但totalAmount不是持久化到数据库中,因此,它不能被重启.为了使这个类可重新开始,ItemStream接口应该实现的open和update方法.

  1. public void open(ExecutionContext executionContext) {
  2. if (executionContext.containsKey("total.amount") {
  3. totalAmount = (BigDecimal) executionContext.get("total.amount");
  4. }
  5. }
  6. public void update(ExecutionContext executionContext) {
  7. executionContext.put("total.amount", totalAmount);
  8. }

ExecutionContext持久化到数据库之前,update方法将存储totalAmount最新的值.open方法根据ExecutionContext中记录的处理起始点恢复totalAmount.在Step 中断执行之前,允许TradeItemWriter 重新启动