当前位置: 首页 > 知识库问答 >
问题:

带有ResultSet的java.util.Stream

辛成周
2023-03-14

我有几个大数据量的表(大约1亿条记录)。因此,我不能将这些数据存储在内存中,但我希望使用java.util.stream类对此结果集进行流式处理,并将此流传递给另一个类。我读过stream.ofstream.builder运算符,但它们是内存中的缓冲流。那么有没有办法解决这个问题呢?提前道谢。

更新#1

好吧,我在谷歌上找到了jooq图书馆。我不确定,但看起来它可能适用于我的测试用例。总而言之,我有很少的表格,但数据量很大。我想对resultset进行流式处理,并将此流传输到另一个方法。类似这样的事情:

// why return Stream<String>? Because my result set has String type
private Stream<Record> writeTableToStream(DataSource dataSource, String table) {

    Stream<Record> record = null;
    try (Connection connection = dataSource.getConnection()) {
        String sql = "select * from " + table;

        try (PreparedStatement pSt = connection.prepareStatement(sql)) {
            connection.setAutoCommit(false);
            pSt.setFetchSize(5000);
            ResultSet resultSet = pSt.executeQuery();
            //
            record = DSL.using(connection)
                    .fetch(resultSet).stream();
        }
    } catch (SQLException sqlEx) {
        logger.error(sqlEx);
    }

    return record;
}

我在jooq上做了一些实验,现在可以说上面决定不适合我。此代码记录=dsl.using(connection).fetch(resultSet).stream();占用太多时间

共有1个答案

牟嘉
2023-03-14

您必须理解的第一件事是,像这样的代码

try (Connection connection = dataSource.getConnection()) {
    …
    try (PreparedStatement pSt = connection.prepareStatement(sql)) {
        …
        return stream;
    }
}

不起作用,因为离开try块时,资源已关闭,而的处理甚至还没有开始。

资源管理结构“try with resources”适用于方法中块范围内使用的资源,但您创建的是返回资源的工厂方法。因此,您必须确保关闭返回的流将关闭资源,调用方负责关闭stream

Record createRecord(ResultSet rs) {
    …
}

您可以创建一个

Stream<Record> stream = StreamSupport.stream(new Spliterators.AbstractSpliterator<Record>(
    Long.MAX_VALUE,Spliterator.ORDERED) {
        @Override
        public boolean tryAdvance(Consumer<? super Record> action) {
            if(!resultSet.next()) return false;
            action.accept(createRecord(resultSet));
            return true;
        }
    }, false);

但是要正确地执行,您必须合并异常处理和关闭资源。您可以使用stream.onclose注册将在stream关闭时执行的操作,但它必须是不能抛出检查异常的runnable。类似地,TryAdvance方法不允许抛出检查异常。由于我们不能在这里简单地嵌套try(…)块,所以当已经存在挂起的异常时,在close中抛出的抑制异常的程序逻辑不会免费出现。

为了在这里提供帮助,我们引入了一个新的类型,它可以包装关闭操作,这些操作可能抛出检查的异常,并将它们包装在未检查的异常中交付。通过实现autocloseable本身,它可以利用try(…)构造来安全地关闭操作:

interface UncheckedCloseable extends Runnable, AutoCloseable {
    default void run() {
        try { close(); } catch(Exception ex) { throw new RuntimeException(ex); }
    }
    static UncheckedCloseable wrap(AutoCloseable c) {
        return c::close;
    }
    default UncheckedCloseable nest(AutoCloseable c) {
        return ()->{ try(UncheckedCloseable c1=this) { c.close(); } };
    }
}

这样,整个操作就变成了:

private Stream<Record> tableAsStream(DataSource dataSource, String table)
    throws SQLException {

    UncheckedCloseable close=null;
    try {
        Connection connection = dataSource.getConnection();
        close=UncheckedCloseable.wrap(connection);
        String sql = "select * from " + table;
        PreparedStatement pSt = connection.prepareStatement(sql);
        close=close.nest(pSt);
        connection.setAutoCommit(false);
        pSt.setFetchSize(5000);
        ResultSet resultSet = pSt.executeQuery();
        close=close.nest(resultSet);
        return StreamSupport.stream(new Spliterators.AbstractSpliterator<Record>(
            Long.MAX_VALUE,Spliterator.ORDERED) {
            @Override
            public boolean tryAdvance(Consumer<? super Record> action) {
                try {
                    if(!resultSet.next()) return false;
                    action.accept(createRecord(resultSet));
                    return true;
                } catch(SQLException ex) {
                    throw new RuntimeException(ex);
                }
            }
        }, false).onClose(close);
    } catch(SQLException sqlEx) {
        if(close!=null)
            try { close.close(); } catch(Exception ex) { sqlEx.addSuppressed(ex); }
        throw sqlEx;
    }
}

此方法将所有资源、连接语句结果集的必要关闭操作包装在上面描述的实用工具类的一个实例中。如果在初始化过程中发生异常,则立即执行关闭操作,并将异常传递给调用方。如果流构造成功,则通过onclose注册关闭操作。

因此,调用方必须确保正确关闭,如

try(Stream<Record> s=tableAsStream(dataSource, table)) {
    // stream operation
}
 类似资料:
  • 问题内容: 以下代码使用和将转换为JSON字符串。 有没有更快的方法? 有没有一种方法可以使用更少的内存? 问题答案: 由于JIT编译器只是分支和基本测试,因此它可能会使其变得非常快。您可以通过对回调进行HashMap查找来使其更加优雅,但我怀疑这样做会更快。至于记忆,这是非常苗条的。 我以某种方式怀疑此代码实际上是内存或性能的关键瓶颈。您有真正的理由尝试对其进行优化吗?

  • 问题内容: 我在Java中使用ResultSet,但不确定如何正确关闭它。我正在考虑使用ResultSet构造一个HashMap,然后在那之后关闭ResultSet。这种HashMap技术是有效的还是有更有效的方法来处理这种情况?我需要键和值,因此使用HashMap似乎是一个合理的选择。 如果使用HashMap是最有效的方法,如何在代码中构造和使用HashMap? 这是我尝试过的: 问题答案: 遍

  • 问题内容: 当executeQuery函数运行,sql语句正常工作并在sql编辑器上运行时,给出正确的结果时,会发生问题。当它在jdbc上运行时,不会执行。该连接接受多个查询。 java.sql.SQLException:ResultSet来自UPDATE。没有数据。 问题答案: 这是不可能的,您必须将查询分开,以获得可以使用过程或函数的最佳解决方案。 程序应采取 返回结果,在这种情况下,它应该是

  • 问题内容: 假设我有一个类似的查询 也许两个表都具有相同的列名。所以我虽然很高兴通过访问数据 但这事与愿违,我一无所获。我阅读了API,但是他们并没有真正谈论这种情况。这样的功能供应商依赖吗? 问题答案: JDBC将仅通过查询中指定的名称来命名列-它不知道表名等。 您有两种选择: 选项1: 在查询中使用不同的名称命名列,即 然后在您的Java代码中参考列别名: 选项2: 在对JDBC API的调用

  • 现在我的问题是,如何在自定义的而不是自定义的中重写方法?我没有在这里公布我的代码,因为它与链接的代码本质上是相同的,只是我需要为子创建一个自定义的来代替,这样它就可以按照“pptang”的答案所述进行正确的度量。 否则,有没有比在第二个RecyclerView中使用1个RecyclerView更好的方法?只能有1个RecyclerView使用上述列表和每个中唯一项的网格填充活动/片段吗?

  • 我想把表抓成CSV文件。怎么往前走? 这是表: