【Spring Data JPA+EclipseLink+Stream】使用Repository实现“真实”的流式查询&在JpaSpecificationExecutor中使用Stream

叶稳
2023-12-01

前言

发现问题

  下载报表是各种办公类应用会提供的普遍功能,项目老代码在实现下载的时候,还是采用了最为简单的“读数据库+写文件”的模式。果不其然,当我在测试环境偶然进行记录条数打到10W+以上的下载时,发生了OutOfMemoryException,俗称内存爆了。
  但要解决这个问题其实并不复杂,在JDBC的时代,ResultSet天然就不存在这样的问题,原因就是它是在需要读记录的时候才去数据库里拿数据的。那只要如法炮制,问题不就解决了吗?而Java 8的Stream似乎又具有这样的性质,恰好,Spring Data Jpa又支持在Repository中返回“Stream”,似乎一切都已经解决了。却不料项目当前正在使用的持久化框架是EclipseLink,一切似乎又没有解决……

背景

  在说具体的解决方案前,先聊一下“流”从何而来,以及为什么号称实现了Jpa 2.2标准的EclipseLink的“流”又不是“真实”的流。

JPA 2.2

  这里说的JPA和之前说的Spring Data JPA又不完全是一回事。JPA本身也只是J2EE发布的一个持久化标准,学名Java Persistence API。所以理论上说它只是一系列接口,等着别人来实现,如果实现它的人按要求实现了其全部的功能,那便组成了一套完整的Java持久化体系,这样的实现提供方主要有两个,一个是被Springboot收编为默认的Hibernate,另一个则是发源自Oracle Toplink的EclipseLink。
  如果说Java 8是Java史上的一座里程碑,那么使用JDK8编译并且提供了Java 8部分特性的JPA2.2也应该是JPA史上的里程碑。Java 8最突出的新特性Stream和Function,以及接口里的default方法,在JPA2.2中浓缩在了Query类的一个方法上,那便是提供了流式返回查询结果的API:Stream<X> getResultStream()。上文提到了Spring Data JPA的Repository支持Stream的返回类型,看一下源码,就可以看到在AbstractJpaQuery中,StreamExecution调用的方法就是Query类的getResultStream

		private static Method streamMethod = ReflectionUtils.findMethod(Query.class, "getResultStream");
		...
		//StreamExecution#doExecute
		@Override
		protected Object doExecute(final AbstractJpaQuery query, JpaParametersParameterAccessor accessor) {

			if (!SurroundingTransactionDetectorMethodInterceptor.INSTANCE.isSurroundingTransactionActive()) {
				throw new InvalidDataAccessApiUsageException(NO_SURROUNDING_TRANSACTION);
			}

			Query jpaQuery = query.createQuery(accessor);

			// JPA 2.2 on the classpath
			if (streamMethod != null) { //streamMethod
				return ReflectionUtils.invokeMethod(streamMethod, jpaQuery);
			}

			// Fall back to legacy stream execution
			PersistenceProvider persistenceProvider = PersistenceProvider.fromEntityManager(query.getEntityManager());
			CloseableIterator<Object> iter = persistenceProvider.executeQueryWithResultStream(jpaQuery);

			return StreamUtils.createStreamFromIterator(iter);
		}

  可是为什么实现了JPA2.2标准的EclipseLink给出的却是“假”的流呢?这需要端详一下JPA中的这个方法:

    /**
     * Execute a SELECT query and return the query results
     * as a typed <code>java.util.stream.Stream</code>.
     * By default this method delegates to <code>getResultList().stream()</code>,
     * however persistence provider may choose to override this method
     * to provide additional capabilities.
     * 
     * @since 2.2
     */
    default Stream<X> getResultStream() {
        return getResultList().stream();
    }

  这是一个default方法,注释上也说了“may”,也就是说,到底如何实现取决于实现者,如果不重写,那这个方法就和原来一样,还是要把数据全部装载进内存,而恰恰EclipseLink就没有对此提供实现,所以想要内存不爆掉,还得采取一些其他办法。

解决方案

方案一 分页

  既然“全部”装进内存不行,那就分批来呗?只要把查询结果分页,一页页加载进内存,用完再查下一页。这个方法确实对症下药,但撇开分批次查询的性能问题,这样的实现势必将会对业务代码造成入侵,而且以后新增类似的功能还需要复刻同样的做法,属实不明智,因此并不推荐使用。

方案二 直接使用EclipseLink的API

  EclipseLink作为老牌的框架,虽然没有实现getResultStream方法,但是其本身相应的功能还是非常健全的。这个场景下一般使用的ReadAllQuery下有两个配置方法:useScrollableCursor()以及useCursoredStream(),它们分别对应的两种容器类ScrollableCursorCursoredStream都是Cursor的子类,Cursor就是EclipseLink(Toplink)给出的类似ResultSet的解决方案,允许用户像迭代器一样地将数据按行或按批加载进内存。

  但是既然使用了Spring Data JPA,再直接调用EclipseLink的接口总觉得不对味,是否有在Repository直接使用Stream的方法呢?

测试代码

  在说其它的解决方案以前,先看一下测试代码以及其如何证明其是否真的使用了“流”。

定义Repository

  选取一个数据库中数据量较大的实体,建立一个测试Repository,我这里选择的类所对应的表的数据量大约在30-40万,如果全部装入内存必然导致OOM。在其中加入一个方法,读取全部的JobOrder

interface TestRepository extends JpaRepository<JobOrder, Long> {
    @Query("select jo from JobOrder jo order by jo.oid desc")
    Stream<JobOrder> streamAll();
}

建立一个DAO

  建立一个DAO以模拟正常情况下使用Repository的场景,在使用流式返回时,查询必须包裹在一个事务中进行,以确保数据库连接打开。在以后续要对返回结果进行一些处理,因此创建一个包装类。

@Repository
class TestDAO {
    @Autowired
    private TestRepository repository;

    @Transactional(readOnly = true)
    public Stream<JobOrder> getAllByStream() {
        return repository.streamAll();
    }
}

创建测试类

  接下来,主要测试代码都会围绕这几个类和方法展开。这个测试类中现在有一个方法,这个方法中,将找出来的JobOrder(以下简称“JO”)按oid倒序排序,跳过oid最大的100个JO,剩余的JO中取出前10个,然后格式化并打印到标准输出流上。其中进行格式化的formatJO通常代表了我们对是数据实体所进行的一系列转换操作,打印到输出流在下载的场景下则类似于输出到文件。

@DataJpaTest
class CustomizedRepositoryTest {
    @Autowired
    private TestDAO testDAO;

    @Test
    void testGetAllByStream() {
        try (Stream<JobOrder> joStream = testDAO.getAllByStream()) {
            joStream.skip(100).limit(10).map(this::formatJO).forEach(System.out::println);
        }
    }

    private String formatJO(JobOrder jo) {
        return "Jo#oid=" + jo.getOid() + ";joNo=" + jo.getJoNo();
    }
}

如何判断是否使用了“流”

  想要判断这一点其实并不需要打断点去看具体返回的对象是什么。首先即使将40万条数据加载到内存几乎是不可能的,而即使你拥有超大的内存,传输也需要天量的时间。因此如果返回结果能够立即打印出来,则说明达到了“流”的效果。

方案三 降级JPA

  如果仔细看上面的AbstractJpaQuery源码,就会发现Springboot并不会总是去调用QuerygetResultStream方法,当类路径上的JPA版本低于2.2时,就会采用传统的方式去实现。而针对EclipseLink,Springboot也包装了相应的实现。
  因此只需要在依赖管理中,将EclipseLink自己引进来的JPA给排除掉,再引入一个低版本的JPA即可。Maven的pom.xml如示例。

        <dependency>
            <groupId>org.eclipse.persistence</groupId>
            <artifactId>org.eclipse.persistence.jpa</artifactId>
            <version>${eclipselink.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>javax.persistence</artifactId>
                    <groupId>org.eclipse.persistence</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.eclipse.persistence</groupId>
            <artifactId>javax.persistence</artifactId>
            <version>2.1.1</version>
        </dependency>

  这似乎唯一能够在Repository中直接使用Stream类型的返回值并且不把所有查询数据装入内存的方法。但这个方法并不是没有缺陷,首先其降级使用了低版本的JPA,从功能性的角度以及字节码级别的角度,其兼容性都让人不怎么放心,并且随着版本更新,你的JPA版本或许就将永远停留在2.1.1,这可不是件好事。而事实上,Springboot的实现可以带给我们诸多的启示,如果不严格要求返回类型是Stream,那么就有另一个方案可供选择。

方案四 @QueryHint+Cursor/Iterator

  如果顺着AbstractJpaQuery.StreamExecution的源码顺藤摸瓜,就能看到Springboot在不依赖JPA2.2的getResultStream的时候的做法可以分为一下几步:

  1. 设置QueryHint,将eclipselink.cursor.scrollable设置为true
  2. 调用getSingleResult()方法获得一个ScrollableCursor
  3. 将其包装成一个CloseableIterator(Closeable是为了能够通过迭代器关闭底层流)
  4. 使用JDK的StreamSupportIterator转化为Steam

  从QueryHints类中,可以看到eclipselink.cursor.scrollable的用法(如下代码引用所示),对ReadAllQuery使用,通过getSingleResult()getResultCursor()获得返回的Cursor

    /**
     * <p>Configures the query to return a ScrollableCursor.
     * A cursor is a stream of the JDBC ResultSet.
     * ScrollableCursor implements ListIterator, when the each next() will fetch the next from the JDBC ResultSet,
     * and build the resulting Object or value.
     * ScrollableCursor can scroll forwards and backwards and position into the ResultSet.
     * A Cursor requires and will keep a live JDBC connection, close() must be called
     * to free the Cursor's resources.
     * A Cursor can be accessed from a JPA Query through getSingleResult(), or from JpaQuery using getResultCursor().
     * Cursors are useful for large results sets, and if only some of the results are desired.
     * MAX_ROWS and FIRST_RESULT can also be used instead of cursors to obtain a page of results.
     * Valid values are:  HintValues.FALSE, HintValues.TRUE,
     * "" could be used instead of default value HintValues.FALSE
     */
    public static final String SCROLLABLE_CURSOR = "eclipselink.cursor.scrollable";

  增加一个QueryHint是很简单的,重点是要让Springboot去调用getSingleResult()方法。关于Repository可以设置的返回类型的官方文档贴在这里:Spring Data JPA doc。这里我们从源码出发。在AbstractJpaQuery中,可以看到execution属性的构造方式,除了存储过程和集合类型的判断过程较为复杂(但也很好排除),其余的返回类型都是根据方法返回值类型直接判断的。

		this.execution = Lazy.of(() -> {
			if (method.isStreamQuery()) { //返回类型为Stream<T>
				return new StreamExecution();
			} else if (method.isProcedureQuery()) { //存储过程
				return new ProcedureExecution();
			} else if (method.isCollectionQuery()) { //集合或似集合类型
				return new CollectionExecution();
			} else if (method.isSliceQuery()) { //返回类型为Slice<T>
				return new SlicedExecution();
			} else if (method.isPageQuery()) { //返回类型为Page<T>
				return new PagedExecution();
			} else if (method.isModifyingQuery()) { //标注了@Modifying
				return null;
			} else {
				return new SingleEntityExecution(); //其余的
			}
		});

  剩下的只有一个选项,就是那个SingleEntityExecution,而它的实现正是调用了getSingleResult()方法。

	static class SingleEntityExecution extends JpaQueryExecution {
		@Override
		protected Object doExecute(AbstractJpaQuery query, JpaParametersParameterAccessor accessor) {
			return query.createQuery(accessor).getSingleResult();
		}
	}

  因此你正可以大发奇想,如果设置了QueryHint的话,ReadAllQuery的返回就应该是一个Cursor,而Repository最终组装出的Query实现就是由EclipseLink提供的,也就是说,可以在Repository中使用返回值为Cursor的方法。至于Springboot剩下的两步(上述3、4)我们通过一个工具类来实现就好了(当然你完全可以直接使用这个Cursor)。

public class StreamQueryHelper {
    public static <T> Stream<T> stream(Cursor cursor) {
        if (cursor == null || !cursor.hasNext()) {
            Optional.ofNullable(cursor).ifPresent(Cursor::close);
            return Stream.empty();
        }
        @SuppressWarnings("unchecked")
        Spliterator<T> spliterator = Spliterators.spliteratorUnknownSize(cursor, Spliterator.NONNULL);
        return StreamSupport.stream(spliterator, false).onClose(cursor::close);
    }
}

  为了测试这种做法的可行性,在上面的测试类中增加几个方法:

    //测试类中
    @Test
    void testGetCursorResult() {
        try (Stream<JobOrder> joStream = testDAO.getCursorResult()) {
            joStream.skip(100).limit(10).map(this::formatJO).forEach(System.out::println);
        }
    }
    ...
    //TestDAO中
    public Stream<JobOrder> getCursorResult() {
        Cursor cursor = repository.cursorAll();
        return StreamQueryHelper.stream(cursor);
    }
    ...
    //TestRepository中
    @Query("select jo from JobOrder jo order by jo.oid desc")
    @QueryHints(@QueryHint(name = "eclipselink.cursor.scrollable", value = "true"))
    Cursor cursorAll();

  Cursor在EclipseLink2.7中已经实现了Iterator,因此此处的Cursor也可以替换成Iterator<JobOrder>。这两种返回类型的本质上是一样的,很难说使用哪一种比较好,如果使用Cursor,似乎又倒退到了方案二直接使用EclipseLink的API的感觉;但即使使用Iterator,由于最终要关闭流以释放连接,我们似乎难以避免地要将其转换为Cursor以调用其close()方法(注:Cursor没有实现Closeable接口)。

还没完!!!

  到了这里问题似乎已经解决,但是不要忘了,在事务中(即使是设置成readonly的事务)中,所有被读出的对象都会存储在EclipseLink的IdentityMap(以及缓存)中,用于跟踪变化等,随着读出的数据越来越多,即使我们使用了Cursor作为容器,仍然有OOM的风险。

方法一 调用EntityManagerdetach方法

  当一个对象用完调用detach使对象脱管,这样EclipseLink就不会维护对其的强引用,当内存增加到一定级别后,GC就会将这些没有引用的对象回收掉。
  但是这个方法并不是最好的办法,首先就是其代码的侵入性,你必须记得要在每个对象用完后调用这个方法;而且往往根据ORMapping,会牵扯出各种其它的对象出来,这些对象在用完后也必须被释放,而有些多对一关系下,调用detach会导致相同的对象被反复地查询和构造,造成额外的内存和时间开销。

方法二 使用WeakIdentityMap

  EclipseLink提供了对象在Persistence Context中以何种方式的配置。其中ReferenceMode.WEAK表示对象从数据库中读取出来时,Persistence Context只会以虚引用的方式引用它(一旦GC就将被回收)直至其发生更改。在大批量读取数据时,这种方式可以防止大量的对象被强引用而无法被释放并导致OOM。(只要你不试图用一些很奇怪的方式试图去找到一个已经不被引用的对象进行修改,否则这种方式一般不会导致修改丢失)

@Configuration
class EclipseLinkJpaConfiguration extends JpaBaseConfiguration {
    ...
    @Override
    protected Map<String, Object> getVendorProperties() {
        Map<String, Object> map = new HashMap<>();
        ...
        map.put(PersistenceUnitProperties.PERSISTENCE_CONTEXT_REFERENCE_MODE, ReferenceMode.WEAK.toString()); //不要忘了toString
        ...
        return map;
    }
}

  讲完了注意点,关于Repository中要如何使用“真实的流”就讲到这里。但问题并没有完全解决,因为Repository中往往实现的是一些简单的查询,而较为复杂的动态查询使用Repository就没那么容易了。可是不幸的是,JpaSpecificationExecutor也没有提供对Stream的支持。

JpaSpecificationExecutor with Stream

自定义接口和实现

  要定义接口,就是要定义方法,如果使用类似JpaSpecificationExecutor的风格,就要先搞清楚findAll方法的参数的含义。JpaSpecificationExecutor原本的findAll方法的第一个参数是一个Specification,它其实就是一个Query的where子句。对于一个SQL来说,最基本的构成要件,首先方法的返回值就已经定义了SQL的select子句;构造方法时传入的root就是它最基本的from。JpaSpecificationExecutorfindAll的第二个重载允许将“order by”独立于where地传递进来,解耦了SpecificationOrder,只是传入的Sort并不属于JPA标准的接口,Springboot也用了专门的代码来处理它。所以基本的实现应该是这样的:

    @PersistenceContext
    private EntityManager em;

    @Override
    public Stream<T> streamAll(Specification<T> spec, Class<T> targetType) {
        CriteriaBuilder cb = em.getCriteriaBuilder();
        CriteriaQuery<S> query = cb.createQuery(targetType); //Type required for Selection
        Root<S> from = query.from(targetType); //from
        Optional.ofNullable(spec) // if user specified specification
                .map(s -> s.toPredicate(from, query, cb)) // build where
                .ifPresent(query::where); // attach where to query
        TypedQuery<S> typedQuery = em.createQuery(query); // to Query
        return executeQueryGetStream(typedQuery); // execute query and get stream
    }

  有了实现,那么接口的定义也呼之欲出。但是我们自定义的接口的实现是由Spring根据接口的定义而产生的代理,如何让它使用我们自己的实现呢?答案是实现类的名称只要在接口名后增加一个Impl(全名,所以包括包名),就可以让Springboot在为接口方法生成代理时,首先去看是否已经被实现(default或在接口名+Impl的类中),然后对于没有任何实现的方法,根据方法名产生代理实现。
  我在这里将其取名为StreamedJpaSpecificationExecutor,所以实现类类名就是StreamedJpaSpecificationExecutorImpl,其完整的实现如下所示。

class StreamedJpaSpecificationExecutorImpl<T> implements StreamedJpaSpecificationExecutor<T> {
    @PersistenceContext
    private EntityManager em;

    @Override
    public <S extends T> Stream<S> streamAll(Specification<S> spec, Class<S> targetType) {
        CriteriaBuilder cb = em.getCriteriaBuilder();
        CriteriaQuery<S> query = cb.createQuery(targetType);
        Root<S> from = query.from(targetType);
        Optional.ofNullable(spec)
                .map(s -> s.toPredicate(from, query, cb))
                .ifPresent(query::where);
        TypedQuery<S> typedQuery = em.createQuery(query);
        return executeQueryGetStream(typedQuery);
    }

    private <S extends T> Stream<S> executeQueryGetStream(TypedQuery<S> query) {
        JpaQuery<S> jpaQuery = (JpaQuery<S>) query;
        jpaQuery.setHint("eclipselink.cursor.scrollable", true);
        Cursor cursor = jpaQuery.getResultCursor();
        return StreamQueryHelper.stream(cursor);
    }
}

  这个方法虽然解决了问题,但可以发现,虽然美其名曰StreamedJpaSpecificationExecutor,但实际上它与JpaSpecificationExecutor并不兼容,也就是说两者如果要一起使用就必须实现两个名字贼长的接口,而且它不仅需要额外的参数,还没有完整地提供JpaSpecificationExecutor的所有功能。除非你只为某个类型提供流式的动态查询,否则这个方案还可以更进一步。

拓展SimpleJpaRepository

  SimpleJpaRepositoryJpaRepositoryJpaSpecificationExecutor的默认实现。如果StreamedJpaSpecificationExecutor需要拓展JpaSpecificationExecutor的话,我们只要在SimpleJpaRepository的基础上拓展StreamedJpaSpecificationExecutor新增方法的实现就好了。
  读SimpleJpaRepository的源码可以看到JpaSpecificationExecutor中,我们需要拓展的两个findAll方法的实现非常简单。

	@Override
	public List<T> findAll(@Nullable Specification<T> spec) {
		return getQuery(spec, Sort.unsorted()).getResultList();
	}

	@Override
	public List<T> findAll(@Nullable Specification<T> spec, Sort sort) {
		return getQuery(spec, sort).getResultList();
	}

  于是相应的,拓展的CustomizedRepository的实现也非常简单,没有过多需要赘述的了。

@Transactional(readOnly = true)
class CustomizedRepository<T, ID> extends SimpleJpaRepository<T, ID>
        implements StreamedJpaSpecificationExecutor<T> {
    public CustomizedRepository(JpaEntityInformation<T, ?> entityInformation, EntityManager entityManager) {
        super(entityInformation, entityManager);
    }

    @Override
    public Stream<T> streamAll(Specification<T> spec) {
        return streamAll(spec, Sort.unsorted());
    }

    @Override
    public Stream<T> streamAll(Specification<T> spec, Sort sort) {
        return executeQueryGetStream(getQuery(spec, sort));
    }

    private <S extends T> Stream<S> executeQueryGetStream(TypedQuery<S> query) {
        JpaQuery<S> jpaQuery = (JpaQuery<S>) query;
        jpaQuery.setHint("eclipselink.cursor.scrollable", true);
        Cursor cursor = jpaQuery.getResultCursor();
        return StreamQueryHelper.stream(cursor);
    }
}

  下一步就是要让Spring Data JPA使用我们拓展后的类型,只需要在任意一个配置类上使用@EnableJpaRepositories就可以了,需要注意的是,如果这个注解没有被标注在项目的启动类上,那么basePackage必须要囊括项目中所有的实体类和Repository

@Configuration
@EnableJpaRepositories(repositoryBaseClass = CustomizedRepository.class, basePackages = "xxx")
class RepositoryCustomizer {}

  自定义默认Repository实现的更多详情可参考官方文档。它的作用非常多,包括可以重写部分默认的实现:例如部分数据库的in表达式参数长度有限制,findAllById并没有对此进行限制,那么就可以重写这个方法的实现来提供相应的支持等。

结语

  到此为止,关于全部EclipseLink与Spring Data JPA实现流式查询的功能已经趋于完整了。本文中总结了我个人的发现和理解,如果有不正确或不到位的地方也希望各位指正,希望能够帮助到在这一行面有困扰的兄弟们。

 类似资料: