我正面临着Spring批处理的问题。我们在作业中使用了一个任务执行器(simpleAsyncTaskExecutor),它处理两个并行步骤的流程。
谢谢你。
下面是我的工作的配置(不包括我们的自定义编写器):
<batch:job id="job">
<batch:split id="split" task-executor="taskExecutor">
<batch:flow>
<batch:step id="step1">
<batch:tasklet task-executor="taskExecutor" throttle-limit="4" >
<batch:chunk reader="reader1" writer="writer1" commit-interval="24000" />
</batch:tasklet>
</batch:step>
</batch:flow>
<batch:flow>
<batch:step id="step2">
<batch:tasklet task-executor="taskExecutor" throttle-limit="4" >
<batch:chunk reader="reader2" writer="writer2" commit-interval="24000" />
</batch:tasklet>
</batch:step>
</batch:flow>
</batch:split>
</batch:job>
<bean id="reader1" class="org.springframework.batch.item.database.JdbcPagingItemReader" scope="step">
<property name="dataSource" ref="postgresql_1" />
<property name="queryProvider">
<bean class="org.springframework.batch.item.database.support.PostgresPagingQueryProvider">
<property name="selectClause" value="
SELECT name
" />
<property name="fromClause" value="
FROM database.people
" />
<property name="whereClause" value="
WHERE age > 30
" />
<property name="sortKeys">
<map>
<entry key="people_id" value="ASCENDING"/>
</map>
</property>
</bean>
</property>
<property name="saveState" value="false" />
<property name="rowMapper">
<bean class="fr.myapp.PeopleRowMapper" />
</property>
</bean>
<bean id="reader2" class="org.springframework.batch.item.database.JdbcPagingItemReader" scope="step">
<property name="dataSource" ref="postgresql_1" />
<property name="queryProvider">
<bean class="org.springframework.batch.item.database.support.PostgresPagingQueryProvider">
<property name="selectClause" value="
SELECT product_name
" />
<property name="fromClause" value="
FROM database.products
" />
<property name="whereClause" value="
WHERE product_order_date <= '01/11/2017'
" />
<property name="sortKeys">
<map>
<entry key="product_id" value="ASCENDING"/>
</map>
</property>
</bean>
</property>
<property name="saveState" value="false" />
<property name="rowMapper">
<bean class="fr.myapp.ProductsRowMapper" />
</property>
</bean>
<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor">
<property name="concurrencyLimit" value="8" />
</bean>
在要写入的挂起记录数达到commit-interval(或块大小)之前,不应调用编写器的write()方法。在此之前调用它的唯一时间是读取器中的read()返回null,这表明没有更多的结果了。
当你看到较小的块是在工作的中间吗?行映射器中是否有任何逻辑或您省略的任何内容来滚动读取结果?
我有怪异的工作行为,不明白为什么会这样。我有以下spring批处理配置: 因此,我有一个step:throttle-limit=5和commit-interval=“100”,我假设writer将接收100个项目的块来编写,所有块都将被逐个处理。 但我有以下流程: 如果作业在流中有4个项目,那么我会看到writer收到1个项目并调用4次,而不是单次调用4个项目。此外,所有这4个调用都是并发执行的,
我有一个非常简单的spring批处理,它从一个表中更新了一百万条记录。因为它非常简单,所以我尝试只实现一个更新表的Tasklet。 但我想用10个记录的步骤来promise。是可以在tasklet中实现这一点,还是我必须将itemReader/ItemWriter与块一起使用? 提前谢谢。
1)读取器读取的任何记录都应通过处理器的处理传递给写入器 2)我的阅读器通过SQL查询读取记录,所以如果阅读器读取了100条记录,那么所有记录都应该一次传递给writer 3)如果读取1000条记录,则应同时通过所有1000条记录 4)所以从本质上说,提交间隔在这里是动态的,而不是固定的。 5)我们有什么办法可以做到这一点吗? 编辑: 现在我们想要的是动态提交间隔。读者正在阅读的任何内容,都将立即
如何使用Java配置实现这一点?
Spring批处理读取器的“pageSize”属性和写入器的“提交间隔”之间有什么关系/区别。 我可能错了,但我在我的应用程序中看到了一种模式,即每超过一个页面大小,我就会看到一个提交。这是真的吗? 谢谢
此场景的解决方案-提交间隔为10,跳过限制为10,总输入记录为20,前9个记录有效,其余无效。 当已读取9条记录时,第10条无效。块大小是10,跳过限制是10,那么Spring批处理会在输出文件中写入那9条记录吗?如果不是,则继续读取剩余的记录,当读取第20条记录时,错误记录的计数为11,定义的跳过限制为10。因此该过程将立即停止。首先读取的有效记录的命运如何。 前9条记录是否写入输出文件。 请让