当前位置: 首页 > 知识库问答 >
问题:

用于动态块大小的Spring批处理自定义完成策略

刁星渊
2023-03-14

上下文

我们有一个批处理作业,可以将本地化的国家名称(即将国家名称翻译成不同的语言)从外部复制到我们的数据库中。其想法是将单个国家的所有本地化国家名称处理为一个区块(即第一区块-安道尔的所有翻译,下一区块-阿联酋的所有翻译,等等)。我们使用jdbcursoritemreader读取外部数据,一些oracle分析函数提供国家可用的翻译总数:比如

select country_code, language_code, localized_name, COUNT(1) OVER(PARTITION BY c_lng.country_code) as lng_count
from EXT_COUNTRY_LNG c_lng
order by c_lng.countty_code, c_lng.language_code

问题

因此,按块剪切这个输入看起来很简单:当您读取了lng_count中指定的确切行数时,停止块,并用下一个读取行开始一个新的输入,但实际上它似乎并不那么简单:(

首先要尝试的是自定义完成策略。但问题是,它无法访问最后一项,即ItemReader——您应该在reader中显式地将其放到上下文中,并将其返回到策略中。不喜欢它,因为它需要额外的读卡器修改/添加读卡器侦听器。此外,我不喜欢同一个项目被前后序列化/反序列化。我觉得JobContext/StepContext不是存放此类数据的好地方。

还有RepeatContext,它看起来是存放此类数据的更好地方,但我无法轻松找到它。。。

最后,我们得出如下解决方案

@Bean(name = "localizedCountryNamesStep")
@JobScope
public Step insertCountryStep(
        final StepBuilderFactory stepBuilderFactory,
        final MasterdataCountryNameReader countryNameReader,
        final MasterdataCountryNameProcessor countryNameProcessor,
        final MasterdataCountryNameWriter writer) {
    /* Use the same fixed-commit policy, but update it's chunk size dynamically */
    final SimpleCompletionPolicy policy = new SimpleCompletionPolicy();
    return stepBuilderFactory.get("localizedCountryNamesStep")
            .<ExtCountryLng, LocalizedCountryName> chunk(policy)
            .reader(countryNameReader)
            .listener(new ItemReadListener<ExtCountryLng>() {

                @Override
                public void beforeRead() {
                    // do nothing
                }

                @Override
                public void afterRead(final ExtCountryLng item) {
                    /* Update the cunk size after every read: consequent reads 
                    inside the same country = same chunk do nothing since lngCount is always the same there */
                    policy.setChunkSize(item.getLngCount());
                }

                @Override
                public void onReadError(final Exception ex) {
                    // do nothing
                }
            })
            .processor(countryNameProcessor)
            .writer(writer)
            .faultTolerant()
            .skip(RuntimeException.class)
            .skipLimit(Integer.MAX_VALUE) // Batch does not support unlimited skip
            .retryLimit(0) // this solution disables only retry, but not recover
            .build();
}

它在工作,它需要最少的代码更改,但对我来说还是有点难看。所以我想知道,当所有必需的信息都已经在ItemReader中可用时,是否有另一种优雅的方法来在Spring Batch中执行动态块大小?

共有1个答案

景鸿才
2023-03-14

最简单的方法是简单地按国家划分。这样,每个国家都会有自己的步骤,你也可以在不同的国家穿行,以提高绩效。

如果需要一个读卡器,您可以包装一个委托PeekableItemReader,并扩展SimpleCompletionPolicy,以实现您的目标。

public class CountryPeekingCompletionPolicyReader extends SimpleCompletionPolicy implements ItemReader<CountrySpecificItem> {

    private PeekableItemReader<? extends CountrySpecificItem> delegate;

    private CountrySpecificItem currentReadItem = null;

    @Override
    public CountrySpecificItem read() throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception {
        currentReadItem = delegate.read();
        return currentReadItem;
    }

    @Override
    public RepeatContext start(final RepeatContext context) {
        return new ComparisonPolicyTerminationContext(context);
    }

    protected class ComparisonPolicyTerminationContext extends SimpleTerminationContext {

        public ComparisonPolicyTerminationContext(final RepeatContext context) {
            super(context);
        }

        @Override
        public boolean isComplete() {
            final CountrySpecificItem nextReadItem = delegate.peek();

            // logic to check if same country
            if (currentReadItem.isSameCountry(nextReadItem)) {
                return false;
            }

            return true;
        }
    }
}

然后在你的上下文中,你会定义:

<batch:tasklet>
    <batch:chunk chunk-completion-policy="countrySpecificCompletionPolicy" reader="countrySpecificCompletionPolicy" writer="someWriter" />
</batch:tasklet>

<bean id="countrySpecificCompletionPolicy" class="CountryPeekingCompletionPolicyReader">
     <property name="delegate" ref="peekableReader" />
</bean>


<bean id="peekableReader" class="YourPeekableItemReader" />

编辑:回想一下你的问题,分区在我看来是最干净的方法。使用分区步骤,每个ItemReader(确保scope=“step”)将从步骤执行上下文传递一个countryName。是的,你需要一个自定义的分区器类来建立你的执行上下文地图(每个国家一个条目)和一个足够大的硬编码提交间隔,以适应你最大的工作单元,但在那之后,一切都是样板,因为每个从属步骤都只是一个单独的块,重启对任何可能遇到问题的国家来说都应该是一件轻而易举的事。

 类似资料:
  • 问题内容: 语境 我们有一个批处理作业,可将本地化的国家名称(即国家名称翻译为不同语言)从外部复制到我们的数据库中。这个想法是在1个块中处理单个国家/地区的所有本地化的国家名称(即第一个块- 安道尔的所有翻译,下一个块- 阿联酋的所有翻译,等等)。我们使用读取外部数据和一些oracle分析功能来提供该国家/地区可用的翻译总数: 问题 因此,按块分割此输入看起来很简单:在读取了其中指定的确切行数后停

  • 我正在处理包含数据库中多个记录列表的。 我应该如何指定处理每个子列表的块大小? 感谢您的帮助,提前谢谢。

  • 我得到以下查询:hibernate:select CHARGE.nextval from dual hibernate:insert into CHARGE(ACCOUNTNUMBER CHARGE_ID)值(?,?)Hibernate:插入到CHARGE(ACCOUNTNUMBER CHARGE_ID)值(?,?)Hibernate:插入到CHARGE(ACCOUNTNUMBER CHARGE_

  • 我们目前正在将一个复杂的spring boot batch+admin UI系统迁移到一个spring-cloud-task基础设施中,该基础设施将被管理云数据流。 作为POC的第一阶段,我们必须能够将所有Spring批处理作业打包在同一个部署JAR下,并且能够使用自定义作业参数一个接一个地运行它们,并且支持某种REST API远程执行作业/任务。 我们删除了所有spring-batch管理依赖项

  • 我在表中总共有8条记录,其中6条在spring批处理调用read时可以使用jpareader。现在我将页面大小和块大小设置为1以进行测试。期望作业运行时,它应该进行6次读取调用,然后它应该逐个处理,逐个写入。但实际上发生的是,它只是调用read 4次(从日志中我可以看到这样读取页面0...1)并处理4个,其中一个由于不匹配写入标准而被过滤掉,然后它只是更新了3个记录,作业标记为成功完成。

  • 我在XML中定义了注入的流,如下所示: 因此,正如您所看到的,我实际上是从的方法启动方法(用于动态创建这些作业定义)。我不确定这是对的。它正在运行,但我不确定是否有一个不同的入口点更适合于这个目的。老实说,我也不知道注释的意义是什么。 当前遇到的问题是,当我从调用时,它会引发以下: 若要使用默认的BatchConfigurer,上下文必须包含一个以上的数据源(found 2)。 注意:我的项目实际