当前位置: 首页 > 知识库问答 >
问题:

处理多个StoredProcedureItemReader调用

壤驷睿
2023-03-14

我必须在Spring批处理作业中实现以下用例:

  1. 通过StoredProcedureItemReader
  2. 读取提供程序列表
  3. 遍历列表,并为步骤1中找到的每个提供程序(作为输入参数)调用另一个StoredProcedureItemReader
  4. 第二个SP的输出将写入CSV。

我提出了以下策略:

  1. 第1步开始
  2. SP ItemReader返回提供程序列表。
  3. 在ItemWriter中,将提供程序保存到ExecutionContext
  4. 步骤1结束
  5. 第2步开始
  6. 另一个SP项目读取器从ExecutionContext
  7. 访问提供程序
  8. 另一个ItemWriter使用FlatFileItemWriter将响应写入CSV
@Scope("step")
public class FetchReportFromProviderProcessor implements
        ItemProcessor<RiscProvider, List<SogReportRecord>>, ItemStream {

    StoredProcedureItemReader<SogReportRecord> reader = new StoredProcedureItemReader<SogReportRecord>();
    private DataSource dataSource;

    @Value("#{jobParameters['date']}")
    private String date;

    @Override
    public List<SogReportRecord> process(final RiscProvider item) throws Exception {

        SogReportRecord record = null;
        List<SogReportRecord> records = new ArrayList<SogReportRecord>();

        SqlParameter[] sqlParameters = new SqlParameter[] {new SqlParameter(OracleTypes.CURSOR)};           

        reader.setParameters(sqlParameters);
        reader.setPreparedStatementSetter(new PreparedStatementSetter() {

            @Override
            public void setValues(PreparedStatement ps) throws SQLException {
                ps.setString(0, item.getPrefix());
                ps.setString(1, date);
            }
        });

        while( (record = reader.read()) != null ) {
            records.add(record);
        }

        return records;
    }

    public DataSource getDataSource() {
        return dataSource;
    }

    public void setDataSource(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    @Override
    public void open(ExecutionContext executionContext)
            throws ItemStreamException {
        reader.setDataSource(dataSource);
        reader.setProcedureName("RISC_GET_DAYMOVEINOUT");
        reader.open(executionContext);      
    }

    @Override
    public void update(ExecutionContext executionContext)
            throws ItemStreamException {
        reader.update(executionContext);        
    }

    @Override
    public void close() throws ItemStreamException {
        reader.close();     
    }

}

和XML部分:

<batch:job id="SOG_MOVEINOUT_REPORT_GENERATOR">
    <batch:step id="GET_REPORTS">
        <batch:tasklet>
            <batch:chunk reader="getProviders"
                processor="fetchRecordsFromProvider"
                writer="sogReportWriter" commit-interval="500" />
        </batch:tasklet>
    </batch:step>
</batch:job>

<!-- Reader to fetch list of providers -->
<bean id="getProviders" class="org.springframework.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource" />
    <property name="procedureName" value="RISC_GET_PROVIDER" />
    <property name="parameters">
        <list>
            <bean class="org.springframework.jdbc.core.SqlOutParameter">
                <constructor-arg index="0" value="providers" />
                <constructor-arg index="1">
                    <util:constant static-field="oracle.jdbc.OracleTypes.CURSOR" />
                </constructor-arg>
            </bean>
        </list>
    </property>
    <property name="refCursorPosition" value="1" />
    <property name="rowMapper">
        <bean class="com.kpn.risc.ProviderRowMapper" />
    </property>
</bean>

<bean id="fetchRecordsFromProvider" class="com.kpn.risc.FetchReportFromProviderProcessor">
    <property name="dataSource" ref="dataSource" />
</bean>

<bean id="sogReportWriter" class="org.springframework.batch.item.file.FlatFileItemWriter" scope="step">
    <property name="resource" value="file:///${batch.job.report.dir}/report-#{stepExecutionContext['provider']}.csv" />
    <property name="lineAggregator">
        <bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
            <property name="fieldExtractor">
                <bean class="org.springframework.batch.item.file.transform.PassThroughFieldExtractor" />
            </property>
        </bean>
    </property>
</bean>

在上面的代码中,处理器将其工作委托给SP项目读取器。但是在调用read()方法之前,读取器无法正确初始化。是否可以或建议在ItemProcessor内部调用ItemReader

共有1个答案

华温书
2023-03-14

您所描述的直接属于批处理的驱动查询模式。本质上,一个查询定义了ids(或者在您的情况下是提供程序),另一个查询使用它来驱动它的查询。通常,ItemReader读取ID并将每个ID传递给ItemProcessor以进行丰富(“其他”查询)。然后将结果传递给ItemWriter

您可以在Spring Batch文档的Common Batch Patterns一节中阅读更多关于驱动查询模式的信息:http://docs.Spring.io/spring-batch/trunk/reference/html/Patterns.html

 类似资料:
  • 我正在尝试使用StoredProcedureItemReader for Cursor读取spring批处理中的一个DB2存储过程。sql字符串未被执行,默认sql被传递给jdbctemplate 我正在使用由作业调用的批处理步骤: 为什么我没有得到结果集或者无法执行查询。我是个新手,有点卡住了。调试显示,数据源配置正确。 谢谢!

  • 我得到了非法状态例外。我试图通过设置以下参数来解决它 有人知道怎么回事吗? 我在Application.Properties中的配置如下: @计划参数 ETL.Scheduler.Frequency=3600000 这是ItemReader实现的重要部分。如果需要更多的信息,请告诉我。 编辑 我所有的bean都在配置文件中,所以我一开始就初始化它们。并使用排定程序运行作业。为什么作业执行再次尝试重

  • 问题内容: 在运行Linux 2.6.35+的系统中,我的程序创建了许多子进程并对其进行监视。如果子进程死了,我会进行一些清理并再次产生该进程。我经常在过程中获取信号。与异步使用。 当将信号处理程序用于非实时信号时,当信号处理程序针对特定信号运行时,必须阻止同一信号的进一步出现,以避免进入递归处理程序。如果此时有多个信号到达,则内核仅调用一次处理程序(当信号被解除阻塞时)。 使用时是否具有相同的行

  • 问题内容: 我是新手,发现错误处理非常冗长。我已经读过它的理由并大体上同意,但是似乎在某些地方似乎有更多代码来处理错误而不是实际工作。这是一个(人为的)示例,我在其中传送“ Hello world!”。进入cat并读取并打印输出。基本上,每一行都可以再处理三个错误,而我什至没有处理任何事情。 有没有惯用的,干净的方法来处理此问题?我只是觉得我在想什么。 问题答案: 显然,我们必须处理任何错误。我们

  • 问题 你有一个代码片段可能会抛出多个不同的异常,怎样才能不创建大量重复代码就能处理所有的可能异常呢? 解决方案 如果你可以用单个代码块处理不同的异常,可以将它们放入一个元组中,如下所示: try: client_obj.get_url(url) except (URLError, ValueError, SocketTimeout): client_obj.remove_url(u