当前位置: 首页 > 知识库问答 >
问题:

从CSV加载时出现PostgreSQL/JooQ大容量插入性能问题;我如何改进流程?

闾丘选
2023-03-14

对于这个项目,我打算制作一个Web版本,现在正在制作一个PostgreSQL(9. x)后端,webapp将从中查询。

现在,跟踪器生成一个包含两个CSV的zip文件,在运行时将其加载到H2数据库中,其模式是这样的(是的,我知道SQL可以写得更好一点):

create table matchers (
    id integer not null,
    class_name varchar(255) not null,
    matcher_type varchar(30) not null,
    name varchar(1024) not null
);

alter table matchers add primary key(id);

create table nodes (
    id integer not null,
    parent_id integer not null,
    level integer not null,
    success integer not null,
    matcher_id integer not null,
    start_index integer not null,
    end_index integer not null,
    time bigint not null
);

alter table nodes add primary key(id);
alter table nodes add foreign key (matcher_id) references matchers(id);
create index nodes_parent_id on nodes(parent_id);
create index nodes_indices on nodes(start_index, end_index);

现在,由于PostgreSQL数据库将能够处理多个跟踪,我必须添加另一个表;PostgreSQL后端上的模式看起来像这样(低于平均水平SQL警报;此外,在parse_info表中,content列包含解析文件的全文,在zip文件中,它单独存储):

create table parse_info (
    id uuid primary key,
    date timestamp not null,
    content text not null
);

create table matchers (
    parse_info_id uuid references parse_info(id),
    id integer not null,
    class_name varchar(255) not null,
    matcher_type varchar(30) not null,
    name varchar(1024) not null,
    unique (parse_info_id, id)
);

create table nodes (
    parse_info_id uuid references parse_info(id),
    id integer not null,
    parent_id integer not null,
    level integer not null,
    success integer not null,
    matcher_id integer not null,
    start_index integer not null,
    end_index integer not null,
    time bigint not null,
    unique (parse_info_id, id)
);

alter table nodes add foreign key (parse_info_id, matcher_id)
    references matchers(parse_info_id, id);
create index nodes_parent_id on nodes(parent_id);
create index nodes_indices on nodes(start_index, end_index);

现在,我目前正在做的是获取现有的zip文件并将它们插入postgresql数据库;我正在使用JooQ及其CSV加载API。

这个过程有点复杂...以下是当前的步骤:

  • 生成UUID;
  • 我从zip中读取必要的信息(解析日期,输入文本)并将记录写入parse_info表中;
  • 我创建CSV的临时副本,以便JooQ加载API能够使用它(请参阅代码摘录后了解原因);
  • 我插入所有匹配器,然后是所有节点。

代码如下:

public final class Zip2Db2
{
    private static final Pattern SEMICOLON = Pattern.compile(";");
    private static final Function<String, String> CSV_ESCAPE
        = TraceCsvEscaper.ESCAPER::apply;

    // Paths in the zip to the different components
    private static final String INFO_PATH = "/info.csv";
    private static final String INPUT_PATH = "/input.txt";
    private static final String MATCHERS_PATH = "/matchers.csv";
    private static final String NODES_PATH = "/nodes.csv";

    // Fields to use for matchers zip insertion
    private static final List<Field<?>> MATCHERS_FIELDS = Arrays.asList(
        MATCHERS.PARSE_INFO_ID, MATCHERS.ID, MATCHERS.CLASS_NAME,
        MATCHERS.MATCHER_TYPE, MATCHERS.NAME
    );

    // Fields to use for nodes zip insertion
    private static final List<Field<?>> NODES_FIELDS = Arrays.asList(
        NODES.PARSE_INFO_ID, NODES.PARENT_ID, NODES.ID, NODES.LEVEL,
        NODES.SUCCESS, NODES.MATCHER_ID, NODES.START_INDEX, NODES.END_INDEX,
        NODES.TIME
    );

    private final FileSystem fs;
    private final DSLContext jooq;
    private final UUID uuid;

    private final Path tmpdir;

    public Zip2Db2(final FileSystem fs, final DSLContext jooq, final UUID uuid)
        throws IOException
    {
        this.fs = fs;
        this.jooq = jooq;
        this.uuid = uuid;

        tmpdir = Files.createTempDirectory("zip2db");
    }

    public void removeTmpdir()
        throws IOException
    {
        // From java7-fs-more (https://github.com/fge/java7-fs-more)
        MoreFiles.deleteRecursive(tmpdir, RecursionMode.KEEP_GOING);
    }

    public void run()
    {
        time(this::generateMatchersCsv, "Generate matchers CSV");
        time(this::generateNodesCsv, "Generate nodes CSV");
        time(this::writeInfo, "Write info record");
        time(this::writeMatchers, "Write matchers");
        time(this::writeNodes, "Write nodes");
    }

    private void generateMatchersCsv()
        throws IOException
    {
        final Path src = fs.getPath(MATCHERS_PATH);
        final Path dst = tmpdir.resolve("matchers.csv");

        try (
            final Stream<String> lines = Files.lines(src);
            final BufferedWriter writer = Files.newBufferedWriter(dst,
                StandardOpenOption.CREATE_NEW);
        ) {
            // Throwing below is from throwing-lambdas
            // (https://github.com/fge/throwing-lambdas)
            lines.map(this::toMatchersLine)
                .forEach(Throwing.consumer(writer::write));
        }
    }

    private String toMatchersLine(final String input)
    {
        final List<String> parts = new ArrayList<>();
        parts.add('"' + uuid.toString() + '"');
        Arrays.stream(SEMICOLON.split(input, 4))
            .map(s -> '"' + CSV_ESCAPE.apply(s) + '"')
            .forEach(parts::add);
        return String.join(";", parts) + '\n';
    }

    private void generateNodesCsv()
        throws IOException
    {
        final Path src = fs.getPath(NODES_PATH);
        final Path dst = tmpdir.resolve("nodes.csv");

        try (
            final Stream<String> lines = Files.lines(src);
            final BufferedWriter writer = Files.newBufferedWriter(dst,
                StandardOpenOption.CREATE_NEW);
        ) {
            lines.map(this::toNodesLine)
                .forEach(Throwing.consumer(writer::write));
        }
    }

    private String toNodesLine(final String input)
    {
        final List<String> parts = new ArrayList<>();
        parts.add('"' + uuid.toString() + '"');
        SEMICOLON.splitAsStream(input)
            .map(s -> '"' + CSV_ESCAPE.apply(s) + '"')
            .forEach(parts::add);
        return String.join(";", parts) + '\n';
    }

    private void writeInfo()
        throws IOException
    {
        final Path path = fs.getPath(INFO_PATH);

        try (
            final BufferedReader reader = Files.newBufferedReader(path);
        ) {
            final String[] elements = SEMICOLON.split(reader.readLine());

            final long epoch = Long.parseLong(elements[0]);
            final Instant instant = Instant.ofEpochMilli(epoch);
            final ZoneId zone = ZoneId.systemDefault();
            final LocalDateTime time = LocalDateTime.ofInstant(instant, zone);

            final ParseInfoRecord record = jooq.newRecord(PARSE_INFO);

            record.setId(uuid);
            record.setContent(loadText());
            record.setDate(Timestamp.valueOf(time));

            record.insert();
        }
    }

    private String loadText()
        throws IOException
    {
        final Path path = fs.getPath(INPUT_PATH);

        try (
            final BufferedReader reader = Files.newBufferedReader(path);
        ) {
            return CharStreams.toString(reader);
        }
    }

    private void writeMatchers()
        throws IOException
    {
        final Path path = tmpdir.resolve("matchers.csv");

        try (
            final BufferedReader reader = Files.newBufferedReader(path);
        ) {
            jooq.loadInto(MATCHERS)
                .onErrorAbort()
                .loadCSV(reader)
                .fields(MATCHERS_FIELDS)
                .separator(';')
                .execute();
        }
    }

    private void writeNodes()
        throws IOException
    {
        final Path path = tmpdir.resolve("nodes.csv");

        try (
            final BufferedReader reader = Files.newBufferedReader(path);
        ) {
            jooq.loadInto(NODES)
                .onErrorAbort()
                .loadCSV(reader)
                .fields(NODES_FIELDS)
                .separator(';')
                .execute();
        }
    }

    private void time(final ThrowingRunnable runnable, final String description)
    {
        System.out.println(description + ": start");
        final Stopwatch stopwatch = Stopwatch.createStarted();
        runnable.run();
        System.out.println(description + ": done (" + stopwatch.stop() + ')');
    }

    public static void main(final String... args)
        throws IOException
    {
        if (args.length != 1) {
            System.err.println("missing zip argument");
            System.exit(2);
        }

        final Path zip = Paths.get(args[0]).toRealPath();

        final UUID uuid = UUID.randomUUID();
        final DSLContext jooq = PostgresqlTraceDbFactory.defaultFactory()
            .getJooq();

        try (
            final FileSystem fs = MoreFileSystems.openZip(zip, true);
        ) {
            final Zip2Db2 zip2Db = new Zip2Db2(fs, jooq, uuid);
            try {
                zip2Db.run();
            } finally {
                zip2Db.removeTmpdir();
            }
        }
    }
}

现在,这是我的第一个问题...这比在H2装船要慢得多。以下是包含620个匹配器和45746个节点的CSV的时序:

Generate matchers CSV: start
Generate matchers CSV: done (45.26 ms)
Generate nodes CSV: start
Generate nodes CSV: done (573.2 ms)
Write info record: start
Write info record: done (311.1 ms)
Write matchers: start
Write matchers: done (4.192 s)
Write nodes: start
Write nodes: done (22.64 s)

给予或接受,忘记编写专门的CSV部分(见下文),即25秒。将其加载到动态的、基于磁盘的H2数据库中只需不到5秒!

我遇到的另一个问题是我必须编写专用的CSV;CSV加载API似乎在它接受的内容上并不灵活,例如,我必须转过这一行:

328;SequenceMatcher;COMPOSITE;token

进入这个:

"some-randome-uuid-here";"328";"SequenceMatcher";"COMPOSITE";"token"

但我最大的问题是,事实上这个拉链很小。例如,我有一个zip,不是620,而是1532个匹配器,不是45746个节点,而是超过3400万个节点;即使我们忽略CSV生成时间(原始节点CSV为1.2 GiB),因为H2注入需要20分钟,将其乘以5会得到1h30mn以南的某个时间点,这是很多!

总而言之,该过程目前效率非常低下...

现在,为了捍卫后格雷SQL:

    < li >对PostgreSQL实例的约束远远高于对H2实例的约束:在生成的zip文件中,我不需要UUID; < li>H2针对写入进行了“不安全”调整:< code > JDBC:H2:/path/to/db;LOG = 0;LOCK _ MODE = 0;UNDO _ LOG = 0;CACHE_SIZE=131072。

尽管如此,这种插入时间的差异似乎有点过分,我很确定它可以更好。但是我不知道从哪里开始。

此外,我知道PostgreSQL有一个从CSV加载的专用机制,但这里的CSV是在一个zip文件中开始的,我真的很想避免像我目前所做的那样创建一个专用的CSV…理想情况下,我想直接从zip中逐行读取(这是我对H2注入所做的),转换行并写入PostgreSQLschema。

最后,我还意识到,我目前没有在插入之前禁用PostgreSQL模式的约束;我还没有尝试过这个(会有什么不同吗?)。

那么,你建议我怎么做才能提高性能呢?

共有2个答案

茅慈
2023-03-14

以下是您可以采取的一些措施

在jOOQ 3.6中,LoaderAPI中有两种新模式:

  • 散装货物(https://github.com/jOOQ/jOOQ/pull/3975)
  • 批量装载(https://github.com/jOOQ/jOOQ/issues/2664)

使用这些技术已被观察到显著加速加载,数量级。另请参阅这篇关于JDBC批处理加载性能的文章。

您当前在一个巨大的事务中加载所有内容(或者使用自动提交,但这也不好)。这对于大负载是不利的,因为数据库需要跟踪插入会话中的所有插入,以便能够在需要时回滚它们。

当您在实时系统上执行此操作时,情况会变得更糟,因为在实时系统上,如此大的负载会产生大量争用。

jOOQ的<code>Loader</code>API允许您通过<code>LoaderOptionsStep.commitAfter(int)</code>指定“提交”大小

这只有在离线加载的情况下才有可能,但是如果完全关闭数据库中的日志记录(对于该表),并且在加载时关闭约束,在加载后再次打开约束,就可以大大加快加载速度。

最后,我还意识到,我目前没有在插入之前禁用PostgreSQL模式的约束;我还没有尝试过这个(会有什么不同吗?)。

哦,是的,它会的。具体来说,每次插入时,独特的约束成本都很高,因为它必须始终保持。

这段代码在这里:

final List<String> parts = new ArrayList<>();
parts.add('"' + uuid.toString() + '"');
Arrays.stream(SEMICOLON.split(input, 4))
      .map(s -> '"' + CSV_ESCAPE.apply(s) + '"')
      .forEach(parts::add);
return String.join(";", parts) + '\n';

当您隐式创建并丢弃大量StringBuilder对象时,会给垃圾收集器带来很大的压力(有关这一点的一些背景信息可以在本文中找到)。通常情况下,这很好,不应过早优化,但在大批量生产过程中,如果您将上述内容转化为更低的水平,您肯定可以获得几个百分点的速度:

StringBuilder result = new StringBuilder();
result.append('"').append(uuid.toString()).append('"');

for (String s : SEMICOLON.split(input, 4))
    result.append('"').append(CSV_ESCAPE.apply(s)).append('"');

...

当然,您仍然可以实现以函数式风格编写相同的东西,但是我发现使用经典的Java8习惯用法优化这些低级字符串操作要容易得多。

袁安志
2023-03-14

将CSV文件批量插入PostgreSQL的最快方法是使用Copy。COPY命令针对插入大量行进行了优化。

使用Java,您可以使用PostgreSQL JDBC驱动程序的Copy实现

这里有一个关于如何使用它的很好的小例子:如何使用JDBC将数据从文件复制到PostgreSQL?

如果您有一个带头文件的CSV,您可能希望运行类似以下的命令:

\从“/tmp/mydata.csv”分隔符“;”复制我的表CSV 标头

向现有表中添加大量数据时,另一个性能提升是删除索引,插入数据,然后重新创建索引。

 类似资料:
  • 问题内容: 我正在测试Postgres插入性能。我有一张表,其中一列以数字作为其数据类型。也有一个索引。我使用以下查询填充数据库: 通过上面的查询,我一次非常快地插入了4百万行10,000。数据库达到600万行后,性能每15分钟急剧下降到100万行。有什么技巧可以提高插入性能?我需要此项目的最佳插入性能。 在具有5 GB RAM的计算机上使用Windows 7 Pro。 问题答案: 请参阅Post

  • 我正在测试博士后的插入性能。我有一个表,其中一列的数据类型是数字。上面还有一个索引。我使用以下查询填充了数据库: 通过上面的查询,我很快插入了400万行,一次插入10000行。在数据库达到600万行后,性能急剧下降到每15分钟100万行。有什么技巧可以提高插入性能吗?我需要这个项目的最佳插入性能。 在内存为5 GB的计算机上使用Windows 7 Pro。

  • 我在PostgreSQL中有以下结构: 这三个表(简化)如下: 我需要在PostgreSQL上运行jOOQ,选择,但它也应该从和加载任何相关的行。有没有办法运行单个jOOQ语句来加载这棵树?我知道我可以使用s来执行此操作,但我正在尝试避免Java中的任何循环。

  • 我正在尝试将文件插入到现有表中。现有的表有3列,这些列是ID(在表中分配)、学生号和组号。 在我的中,我有下面的格式,但是每次插入它时,我都会得到一个错误

  • 当从服务层调用我的的方法时,使用一个长的时,Hibernate的跟踪日志记录会显示每个实体发出的单个SQL语句。 我是否可以强制它进行大容量插入(即多行),而不需要手动摆弄、事务等,甚至不需要原始SQL语句字符串? 致: 而是改为: 在PROD中,我使用的是CockroachDB,性能上的差异是显著的。 日志输出: