当前位置: 首页 > 知识库问答 >
问题:

将CompletionPolicy扩展为从MultiResourceItemReader完成/提交每个资源

寇景明
2023-03-14

我的问题是我不理解各种Spring批处理上下文。参考留档解释了如何将数据传递给未来的步骤。但是我如何在步骤内的读取器和写入器组件之间传递数据。步骤上下文。是否有块上下文?我以前在编写分区程序时在执行上下文之前使用过。但这些是并行执行的。

我现在需要做一个有序的手术。它基本上是一个jdbc导入作业,但每个文件都需要提交,否则它们就是外键约束。

我可以获得单个文件资源的行数的最简单的地方是在MultiResourceItemReader中,然后将其委托给ItemReader。但是在查看了各种CompletionPolicy实现之后,他们似乎只能访问RepeatContext。如何在MultiResourceItemReader的RepeatContext中存储一个值,以便我的CompletionPolicy可以访问它并在特定的文件资源行数之后提交。

举一个例子,说明如何扩展抽象计数完成策略,以及如何从MultiResourceItemReader存储数据。

或者也许有更好的方法来评估这类工作。

<!-- <job id="job" restartable="${restartable}" xmlns="http://www.springframework.org/schema/batch"> -->
    <batch:job id="job" restartable="true"
        xmlns="http://www.springframework.org/schema/batch">
        <batch:step id="step1-unzipFile">
            <batch:tasklet ref="unzipFileTasklet" />
            <batch:next on="COMPLETED" to="step2-import" />
        </batch:step>
        <batch:step id="step2-import"> <!-- we can't use a commit-interval="${commitInterval} cause it messes with 2nd pass import processing if the commit ends up being the middle of the file -->
            <batch:tasklet>
                <batch:chunk reader="multiResourceReader" writer="itemWriter" chunk-completion-policy="completionPolicy"/>  
            </batch:tasklet>
            <!-- <batch:next on="COMPLETED" to="step3-fileCleanUp" /> -->
        </batch:step>
        <!-- <batch:step id="step3-fileCleanUp">
            <batch:tasklet ref="fileCleanUpTasklet" />
        </batch:step> -->
    </batch:job>




    <bean id="multiResourceReader" class="springbatch.iimport.extended.SequentialLoaderMultiFileResourceItemReader" scope="step">
        <property name="resourceDirectoryPath" value="${importTempDirectoryBasePath}/#{jobParameters['jobKey']}/"/>
        <property name="delegate" ref="itemReader"/>
    </bean>

    <bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader">
        <property name="lineMapper" ref="lineMapper" />
    </bean>

    <bean id="lineMapper" class="springbatch.iimport.extended.JsonToTupleLineMapper"/>

    <bean id="itemWriter" class="springbatch.iimport.extended.TupleJdbcBatchItemWriter" scope="step">
        <property name="moduleDataSource" ref="moduleDataSource"/>
        <property name="dataSource" ref="dataSource"/>
        <property name="jobKey" value="#{jobParameters['jobKey']}"/>
        <property name="jobDef" value="#{jobParameters['jobDef']}"/>
    </bean>

    <bean id="completionPolicy" class="?"/> 

    <!-- tasklets -->

    <bean id="unzipFileTasklet" class="springbatch.iimport.tasklets.UnZipFile" scope="step">
        <!-- the temp directory the files are unzipped to end up being #{jobParameters['importZipFileName']} -->
        <property name="importZipFileName" value="${uploadDir}/#{jobParameters['importZipFileName']}" />
        <property name="jobKey" value="#{jobParameters['jobKey']}"/>
        <property name="importTempDirectoryBasePath" value="${importTempDirectoryBasePath}" />
     </bean>

共有1个答案

慕容晔
2023-03-14

您可以编写自己的CompletionPolicy,它前瞻itemReader(使用peakableitemreader)并在下次调用itemReader时将当前区块返回为“已完成”。next()返回null(表示当前文件的EOF)
请记住:此解决方案可能会导致内存问题,因为内存中的文件内容已被完全读取。

public class EOFCompletionPolicy extends CompletionPolicySupport
{
    private EOFCompletionContext cc;
    private PeekableItemReader<Object> reader;

    public void setReader(PeekableItemReader<Object> forseeingReader)
    {
        this.reader = forseeingReader;
    }

    @Override
    public boolean isComplete(RepeatContext context)
    {
        return this.cc.isComplete();
    }

    @Override
    public RepeatContext start(RepeatContext context)
    {
        this.cc = new EOFCompletionContext(context);
        return cc;
    }

    @Override
    public void update(RepeatContext context)
    {
        this.cc.update();
    }

    protected class EOFCompletionContext extends RepeatContextSupport
    {
        boolean eof = false;
        public EOFCompletionContext (RepeatContext context)
        {
            super(context);
        }

        public void update()
        {
            final Object next;
            try
            {
                next = reader.peek();
            }
            catch (Exception e)
            {
                throw new NonTransientResourceException("Unable to peek", e);
            }
            // EOF?
            this.eof = (next == null);
        }

        public boolean isComplete() {
            return this.eof;
        }
    }
}
 类似资料:
  • 我目前有一个Spring批处理作业,它执行以下操作: 使用委托给FlatFileItemReader的MultiResourceItemReader读取csv文件列表。 将每个文件分成块,并将每个块写入为JMS消息,每个消息包含块中的行列表和JSON格式的底层资源的文件名。 是否有任何干净的方法来防止阅读器将来自不同文件资源的行包含在同一块中? 编辑:我认为解决方案需要使用自定义块完成策略来确定当

  • 说明 协议2.1中读取state=2,3的结账请求后,从业务系统完成结账,并提交结果到服务端 请求地址 http://api.dc78.cn/Api/cash_post_cash 请求方式 GET 请求参数 参数 参数名称 必填 描述 范例 id 请求编号 此编号为协议2.1中返回的结算单id bzid 结算业务单号 返回 {"status":1,"info":"提交成功"} 请求方式 INI 请

  • 秒付业务,下行接口收到cash-pay后(下行接口详见0.5),完成相应的结账业务流程,并上传确认支付订单处理完成。 请求参数说明 参数 描述 必填 示例值 类型 最大长度 action 接口参数组 是 object └action 需要调用的接口名称 是 cash_post_cash string get GET参数组,本组参数需要参与签名 是 object └id 支付单流水号payid 是

  • 我使用open-api-3标准创建了一个swagger.json,以便能够使用工具swagger-codegen生成客户端sdk。我的问题是,每个apiendpoint最终都得到自己的api对象,而不是一个api对象用于所有的APIendpoint。 预期: 实际: 问:如何配置swagger codegen或我的swagger.json来为所有实体/资源只创建一个api对象? 在此示例中,我使用

  • 是否可以将Gradle配置为构建几个android apk文件,每个文件只使用一个资源类型的文件夹? null 我知道我可以在构建之前简单地移除某些文件夹,但如果我可以“自动”地使它变得更好。 是否可以使用分级“口味”?

  • 问题内容: 我怎样才能做到这一点: 另外,我想知道这种方法是否可行,因为我已经在类中创建了菜单,即ControlMenu,并且在其他活动中继续进行扩展。 问题答案: 你只能扩展一个类。并从许多来源实现接口。 扩展多个类不可用。我能想到的唯一解决方案不是继承任何一个类,而是每个类都有一个内部变量,并通过将对你对象的请求重定向到你希望它们去的对象来做更多的代理。 这是我想出的最好的解决方案。你可以从这