数据库Database - 基于游标的ItemReaders

优质
小牛编辑
128浏览
2023-12-01

6.9.1 基于Cursor的ItemReaders

使用游标(cursor)是大多数批处理开发人员默认采用的方法, 因为它是处理有关系的数据“流”在数据库级别的解决方案。Java 的 ResultSet 类其本质就是用面向对象的游标处理机制。 ResultSet 维护着一个指向当前数据行的 cursor。调用 ResultSet 的 next 方法则将游标移到下一行。

Spring Batch 基于 cursor 的 ItemReaders 在初始化时打开游标, 每次调用 read 时则将游标向前移动一行, 返回一个可用于进行处理的映射对象。最好将会调用 close 方法, 以确保所有资源都被释放。

Spring 的 JdbcTemplate 的解决办法, 是通过回调模式将 ResultSet 中所有行映射之后,在返回调用方法前关闭结果集来处理的。

但是,在批处理的时候就不一样了, 必须得等 step 执行完成才能调用close。下图描绘了基于游标的ItemReader是如何处理的, 使用的SQL语句非常简单, 而且都是类似的实现方式:

游标示例

这个例子演示了基本的处理模式。 数据库中有一个 “FOO” 表,它有三个字段: ID, NAME, 以及 BAR , select 查询所有ID大于1但小于7的行。这样的话游标起始于 ID 为 2的行(第1行)。这一行的结果会被映射为一个Foo对象。再次调用read()则将光标移动到下一行, 也就是ID为3的Foo。 在所有行读取完毕之后这些结果将会被写出去, 然后这些对象就会被垃圾回收(假设没有其他引用指向他们)。

译注

Foo、Bar 都是英文中的任意代词,没有什么具体意义, 就如我们说的 张三,李四 一样

JdbcCursorItemReader

JdbcCursorItemReader 是基于 cursor 的Jdbc实现。它直接使用ResultSet,需要从数据库连接池中获取连接来执行SQL语句。我们的示例使用下面的数据库表:

  1. CREATE TABLE CUSTOMER (
  2. ID BIGINT IDENTITY PRIMARY KEY,
  3. NAME VARCHAR(45),
  4. CREDIT FLOAT
  5. );

我们一般使用领域对象来对应到每一行, 所以用 RowMapper 接口的实现来映射 CustomerCredit对象:

  1. public class CustomerCreditRowMapper implements RowMapper {
  2. public static final String ID_COLUMN = "id";
  3. public static final String NAME_COLUMN = "name";
  4. public static final String CREDIT_COLUMN = "credit";
  5. public Object mapRow(ResultSet rs, int rowNum) throws SQLException {
  6. CustomerCredit customerCredit = new CustomerCredit();
  7. customerCredit.setId(rs.getInt(ID_COLUMN));
  8. customerCredit.setName(rs.getString(NAME_COLUMN));
  9. customerCredit.setCredit(rs.getBigDecimal(CREDIT_COLUMN));
  10. return customerCredit;
  11. }
  12. }

一般来说Spring的用户对 JdbcTemplate 都不陌生,而 JdbcCursorItemReader 使用其作为关键API接口, 我们一起来学习如何通过 JdbcTemplate 读取这一数据, 看看它与 ItemReader 有何区别。 为了演示方便, 我们假设CUSTOMER表有1000行数据。第一个例子将使用 JdbcTemplate:

  1. //For simplicity sake, assume a dataSource has already been obtained
  2. JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
  3. List customerCredits = jdbcTemplate.query("SELECT ID, NAME, CREDIT from CUSTOMER",
  4. new CustomerCreditRowMapper());

当执行完上面的代码, customerCredits 这个 List 中将包含 1000 个 CustomerCredit 对象。 在 query 方法中, 先从 DataSource 获取一个连接, 然后用来执行给定的SQL, 获取结果后对 ResultSet 中的每一行调用一次 mapRow 方法。 让我们来对比一下 JdbcCursorItemReader 的实现:

  1. JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
  2. itemReader.setDataSource(dataSource);
  3. itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
  4. itemReader.setRowMapper(new CustomerCreditRowMapper());
  5. int counter = 0;
  6. ExecutionContext executionContext = new ExecutionContext();
  7. itemReader.open(executionContext);
  8. Object customerCredit = new Object();
  9. while(customerCredit != null){
  10. customerCredit = itemReader.read();
  11. counter++;
  12. }
  13. itemReader.close(executionContext);

运行这段代码后 counter 的值将变成 1000。如果上面的代码将返回的 customerCredit 放入 List, 则结果将和使用 JdbcTemplate 的例子完全一致。 但是呢, 使用 ItemReader 的强大优势在于, 它允许数据项变成 “流式(streamed)”。 调用一次read 方法, 通过ItemWriter写出数据对象, 然后再通过 read 获取下一项。 这使得 item 读取和写出可以进行 “分块(chunks)”, 并且周期性地提交, 这才是高性能批处理的本质。此外,它可以很容易地通过配置注入到某个 Spring Batch Step 中:

  1. <bean id="itemReader" class="org.spr...JdbcCursorItemReader">
  2. <property name="dataSource" ref="dataSource"/>
  3. <property name="sql" value="select ID, NAME, CREDIT from CUSTOMER"/>
  4. <property name="rowMapper">
  5. <bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
  6. </property>
  7. </bean>

附加属性" class="reference-link">附加属性

因为在Java中有很多种不同的方式来打开游标, 所以 JdbcCustorItemReader有许多可以设置的属性 :

需要整理

Table 6.2. JdbcCursorItemReader 的属性(Properties)
































ignoreWarnings决定 SQL警告(SQLWarnings)是否被日志记录,还是导致异常 - 默认值为 true
fetchSize给 Jdbc driver 一个提示, 当 ItemReader 对象需要从 ResultSet 中获取更多记录时, 每次应该取多少行数据. 默认没有给定 hint 值.
maxRows设置底层的 ResultSet 最多可以持有多少行记录
queryTimeout设置 driver 在执行 Statement 对象时应该在给定的时间(单位: 秒)内完成。 如果超过这个时间限制,就抛出一个 DataAccessEception 异常.(详细信息请参考/咨询具体数据库驱动的相关文档).
verifyCursorPosition因为 ItemReader 持有的同一个 ResultSet 会被传递给 RowMapper, 所以用户有可能会自己调用 ResultSet.next(), 这就有可能会影响到 reader 内部的计数状态. 将这个值设置为 true 时, 如果在调用 RowMapper 前后游标位置(cursor position)不一致,就会抛出一个异常.
saveState明确指定 reader 的状态是否应该保存在ItemStream#update(ExecutionContext) 提供的 ExecutionContext 中, 默认值为 true.
driverSupportsAbsolute默认值为 false. 指明 Jdbc 驱动是否支持在 ResultSet 中设置绝对行(absolute row). 官方建议,对于支持 ResultSet.absolute() 的 Jdbc drivers,应该设置为 true, 一般能提高效率和性能,特别是在某个 step 处理很大的数据集失败时.
setUseSharedExtendedConnection默认值为 false. 指明此 cursor 使用的数据库连接是否和其他处理过程共享连接,以便处于同一个事务中. 如果设置为 false, 也就是默认值, 那么游标会打开自己的数据库连接,也就不会参与到 step 处理中的其他事务. 如果要将标志位设置为 true, 则必须将 DataSource 包装在一个 ExtendedConnectionDataSourceProxy 中,以阻止每次提交之后关闭/释放连接. 如果此选项设置为 true ,则打开cursor的语句将会自动带上 ‘READ_ONLY’ 和 ‘HOLD_CUSORS_OVER_COMMIT’ 选项. 这样就允许在 step 处理过程中保持 cursor 跨越多个事务. 要使用这个特性,需要数据库服务器的支持,以及JDBC驱动符合 Jdbc 3.0 版本规范.

HibernateCursorItemReader

使用 Spring 的程序员需要作出一个重要的决策,即是否使用ORM解决方案,这决定了是否使用 JdbcTemplateHibernateTemplate , Spring Batch开发者也面临同样的选择。HibernateCursorItemReader 是 Hibernate 的游标实现。 其实在批处理中使用 Hibernate 那是相当有争议。这很大程度上是因为 Hibernate 最初就是设计了用来开发在线程序的。

但也不是说Hibernate就不能用来进行批处理。最简单的解决办法就是使用一个 StatelessSession (无状态会话), 而不使用标准 session 。这样就去掉了在批处理场景中 Hibernate 那些恼人的缓存、脏检查等等。

更多无状态会话与正常hibernate会话之间的差异, 请参考你使用的 hibernate 版本对应的文档。 HibernateCursorItemReader 允许您声明一个HQL语句, 并传入 SessionFactory , 然后每次调用 read 时就会返回一个对象, 和 JdbcCursorItemReader 一样。下面的示例配置也使用和 JDBC reader 相同的数据库表:

  1. HibernateCursorItemReader itemReader = new HibernateCursorItemReader();
  2. itemReader.setQueryString("from CustomerCredit");
  3. //For simplicity sake, assume sessionFactory already obtained.
  4. itemReader.setSessionFactory(sessionFactory);
  5. itemReader.setUseStatelessSession(true);
  6. int counter = 0;
  7. ExecutionContext executionContext = new ExecutionContext();
  8. itemReader.open(executionContext);
  9. Object customerCredit = new Object();
  10. while(customerCredit != null){
  11. customerCredit = itemReader.read();
  12. counter++;
  13. }
  14. itemReader.close(executionContext);

这里配置的 ItemReader 将以完全相同的方式返回CustomerCredit对象,和 JdbcCursorItemReader 没有区别, 如果 hibernate 映射文件正确的话。useStatelessSession 属性的默认值为 true , 这里明确设置的目的只是为了引起你的注意,我们可以通过他来进行切换。 还值得注意的是 可以通过 setFetchSize 设置底层 cursor 的 fetchSize属性 。与JdbcCursorItemReader一样,配置很简单:

  1. <bean id="itemReader"
  2. class="org.springframework.batch.item.database.HibernateCursorItemReader">
  3. <property name="sessionFactory" ref="sessionFactory" />
  4. <property name="queryString" value="from CustomerCredit" />
  5. </bean>

StoredProcedureItemReader

有时候使用存储过程来获取游标数据是很有必要的。 StoredProcedureItemReaderJdbcCursorItemReader 其实差不多,只是不再执行一个查询来获取游标,而是执行一个存储过程, 由存储过程返回一个游标。 存储过程有三种返回游标的方式:

  1. 作为一个 ResultSet 返回(SQL Server, Sybase, DB2, Derby 以及 MySQL支持)
  2. 作为一个 out 参数返回 ref-cursor (Oracle和PostgreSQL使用这种方式)
  3. 作为存储函数(stored function)的返回值

下面是一个基本的配置示例, 还是使用上面 “客户信用” 的例子:

  1. <bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
  2. <property name="dataSource" ref="dataSource"/>
  3. <property name="procedureName" value="sp_customer_credit"/>
  4. <property name="rowMapper">
  5. <bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
  6. </property>
  7. </bean>

这个例子依赖于存储过程提供一个 ResultSet 作为返回结果(方式1)。

如果存储过程返回一个ref-cursor(方式2),那么我们就需要提供返回的ref-cursor(out 参数)的位置。下面的示例中,第一个参数是返回的ref-cursor:

  1. <bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
  2. <property name="dataSource" ref="dataSource"/>
  3. <property name="procedureName" value="sp_customer_credit"/>
  4. <property name="refCursorPosition" value="1"/>
  5. <property name="rowMapper">
  6. <bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
  7. </property>
  8. </bean>

如果存储函数的返回值是一个游标(方式 3), 则需要将 function 属性设置为 true, 默认为 false。如下面所示:

  1. <bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
  2. <property name="dataSource" ref="dataSource"/>
  3. <property name="procedureName" value="sp_customer_credit"/>
  4. <property name="function" value="true"/>
  5. <property name="rowMapper">
  6. <bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
  7. </property>
  8. </bean>

在所有情况下,我们都需要定义 RowMapper 以及 DataSource, 还有存储过程的名字。

如果存储过程/函数需要传入参数, 那么必须声明并通过 parameters 属性来设置值。下面是一个关于 Oracle 的示例, 其中声明了三个参数。 第一个是 out 参数,用来返回 ref-cursor, 第二第三个参数是 in 型参数, 类型都是 INTEGER :

  1. <bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
  2. <property name="dataSource" ref="dataSource"/>
  3. <property name="procedureName" value="spring.cursor_func"/>
  4. <property name="parameters">
  5. <list>
  6. <bean class="org.springframework.jdbc.core.SqlOutParameter">
  7. <constructor-arg index="0" value="newid"/>
  8. <constructor-arg index="1">
  9. <util:constant static-field="oracle.jdbc.OracleTypes.CURSOR"/>
  10. </constructor-arg>
  11. </bean>
  12. <bean class="org.springframework.jdbc.core.SqlParameter">
  13. <constructor-arg index="0" value="amount"/>
  14. <constructor-arg index="1">
  15. <util:constant static-field="java.sql.Types.INTEGER"/>
  16. </constructor-arg>
  17. </bean>
  18. <bean class="org.springframework.jdbc.core.SqlParameter">
  19. <constructor-arg index="0" value="custid"/>
  20. <constructor-arg index="1">
  21. <util:constant static-field="java.sql.Types.INTEGER"/>
  22. </constructor-arg>
  23. </bean>
  24. </list>
  25. </property>
  26. <property name="refCursorPosition" value="1"/>
  27. <property name="rowMapper" ref="rowMapper"/>
  28. <property name="preparedStatementSetter" ref="parameterSetter"/>
  29. </bean>

除了参数声明, 我们还需要指定一个 PreparedStatementSetter 实现来设置参数值。这和上面的 JdbcCursorItemReader 一样。查看全部附加属性请查看 附加属性, StoredProcedureItemReader 的附加属性也一样。