当前位置: 首页 > 面试题库 >

将二进制COPY表FROM与psycopg2一起使用

关学
2023-03-14
问题内容

我有数千万行要从多维数组文件传输到PostgreSQL数据库。我的工具是Python和psycopg2。批量插入数据的最有效方法是使用copy_from。但是,我的数据主要是32位浮点数(实数或float4),所以我宁愿不从实数→文本→实数转换。这是一个示例数据库DDL:

CREATE TABLE num_data
(
  id serial PRIMARY KEY NOT NULL,
  node integer NOT NULL,
  ts smallint NOT NULL,
  val1 real,
  val2 double precision
);

这是我在Python中使用字符串(文本)的地方:

# Just one row of data
num_row = [23253, 342, -15.336734, 2494627.949375]

import psycopg2
# Python3:
from io import StringIO
# Python2, use: from cStringIO import StringIO

conn = psycopg2.connect("dbname=mydb user=postgres")
curs = conn.cursor()

# Convert floating point numbers to text, write to COPY input
cpy = StringIO()
cpy.write('\t'.join([repr(x) for x in num_row]) + '\n')

# Insert data; database converts text back to floating point numbers
cpy.seek(0)
curs.copy_from(cpy, 'num_data', columns=('node', 'ts', 'val1', 'val2'))
conn.commit()

是否存在可以使用二进制模式运行的等效项?即,将浮点数保留为二进制?这样不仅可以保持浮点精度,而且可以更快。

(注意:要查看与示例相同的精度,请使用SET extra_float_digits='2'


问题答案:

这是Python 3的COPY FROM的二进制等效文件:

from io import BytesIO
from struct import pack
import psycopg2

# Two rows of data; "id" is not in the upstream data source
# Columns: node, ts, val1, val2
data = [(23253, 342, -15.336734, 2494627.949375),
        (23256, 348, 43.23524, 2494827.949375)]

conn = psycopg2.connect("dbname=mydb user=postgres")
curs = conn.cursor()

# Determine starting value for sequence
curs.execute("SELECT nextval('num_data_id_seq')")
id_seq = curs.fetchone()[0]

# Make a binary file object for COPY FROM
cpy = BytesIO()
# 11-byte signature, no flags, no header extension
cpy.write(pack('!11sii', b'PGCOPY\n\377\r\n\0', 0, 0))

# Columns: id, node, ts, val1, val2
# Zip: (column position, format, size)
row_format = list(zip(range(-1, 4),
                      ('i', 'i', 'h', 'f', 'd'),
                      ( 4,   4,   2,   4,   8 )))
for row in data:
    # Number of columns/fields (always 5)
    cpy.write(pack('!h', 5))
    for col, fmt, size in row_format:
        value = (id_seq if col == -1 else row[col])
        cpy.write(pack('!i' + fmt, size, value))
    id_seq += 1  # manually increment sequence outside of database

# File trailer
cpy.write(pack('!h', -1))

# Copy data to database
cpy.seek(0)
curs.copy_expert("COPY num_data FROM STDIN WITH BINARY", cpy)

# Update sequence on database
curs.execute("SELECT setval('num_data_id_seq', %s, false)", (id_seq,))
conn.commit()

我改写了上面的方法来为COPY编写文件。我在Python中的数据位于NumPy数组中,因此使用它们很有意义。这是一个data具有1M行,7列的示例:

import psycopg2
import numpy as np
from struct import pack
from io import BytesIO
from datetime import datetime

conn = psycopg2.connect("dbname=mydb user=postgres")
curs = conn.cursor()

# NumPy record array
shape = (7, 2000, 500)
print('Generating data with %i rows, %i columns' % (shape[1]*shape[2], shape[0]))

dtype = ([('id', 'i4'), ('node', 'i4'), ('ts', 'i2')] +
         [('s' + str(x), 'f4') for x in range(shape[0])])
data = np.empty(shape[1]*shape[2], dtype)
data['id'] = np.arange(shape[1]*shape[2]) + 1
data['node'] = np.tile(np.arange(shape[1]) + 1, shape[2])
data['ts'] = np.repeat(np.arange(shape[2]) + 1, shape[1])
data['s0'] = np.random.rand(shape[1]*shape[2]) * 100
prv = 's0'
for nxt in data.dtype.names[4:]:
    data[nxt] = data[prv] + np.random.rand(shape[1]*shape[2]) * 10
    prv = nxt

在我的数据库中,我有两个看起来像的表:

CREATE TABLE num_data_binary
(
  id integer PRIMARY KEY,
  node integer NOT NULL,
  ts smallint NOT NULL,
  s0 real,
  s1 real,
  s2 real,
  s3 real,
  s4 real,
  s5 real,
  s6 real
) WITH (OIDS=FALSE);

另一个类似的表名为num_data_text

以下是一些简单的辅助函数,它们通过使用NumPy记录数组中的信息为COPY(文本和二进制格式)准备数据:

def prepare_text(dat):
    cpy = BytesIO()
    for row in dat:
        cpy.write('\t'.join([repr(x) for x in row]) + '\n')
    return(cpy)

def prepare_binary(dat):
    pgcopy_dtype = [('num_fields','>i2')]
    for field, dtype in dat.dtype.descr:
        pgcopy_dtype += [(field + '_length', '>i4'),
                         (field, dtype.replace('<', '>'))]
    pgcopy = np.empty(dat.shape, pgcopy_dtype)
    pgcopy['num_fields'] = len(dat.dtype)
    for i in range(len(dat.dtype)):
        field = dat.dtype.names[i]
        pgcopy[field + '_length'] = dat.dtype[i].alignment
        pgcopy[field] = dat[field]
    cpy = BytesIO()
    cpy.write(pack('!11sii', b'PGCOPY\n\377\r\n\0', 0, 0))
    cpy.write(pgcopy.tostring())  # all rows
    cpy.write(pack('!h', -1))  # file trailer
    return(cpy)

这就是我使用帮助程序函数对两种COPY格式方法进行基准测试的方式:

def time_pgcopy(dat, table, binary):
    print('Processing copy html" target="_blank">object for ' + table)
    tstart = datetime.now()
    if binary:
        cpy = prepare_binary(dat)
    else:  # text
        cpy = prepare_text(dat)
    tendw = datetime.now()
    print('Copy object prepared in ' + str(tendw - tstart) + '; ' +
          str(cpy.tell()) + ' bytes; transfering to database')
    cpy.seek(0)
    if binary:
        curs.copy_expert('COPY ' + table + ' FROM STDIN WITH BINARY', cpy)
    else:  # text
        curs.copy_from(cpy, table)
    conn.commit()
    tend = datetime.now()
    print('Database copy time: ' + str(tend - tendw))
    print('        Total time: ' + str(tend - tstart))
    return

time_pgcopy(data, 'num_data_text', binary=False)
time_pgcopy(data, 'num_data_binary', binary=True)

这是最后两个time_pgcopy命令的输出:

Processing copy object for num_data_text
Copy object prepared in 0:01:15.288695; 84355016 bytes; transfering to database
Database copy time: 0:00:37.929166
        Total time: 0:01:53.217861
Processing copy object for num_data_binary
Copy object prepared in 0:00:01.296143; 80000021 bytes; transfering to database
Database copy time: 0:00:23.325952
        Total time: 0:00:24.622095

因此,使用二进制方法,NumPy→文件和File→数据库步骤都更快。明显的区别是Python如何准备COPY文件,这对于文本来说确实很慢。通常,二进制格式会以这种格式的文本格式在2/3的时间内将其加载到数据库中。

最后,我比较了数据库中两个表中的值,以查看数字是否不同。大约1.46%的行的column值不同s0,并且该比例的值增加到6.17%s6(可能与我使用的随机方法有关)。所有70M
32位浮点值之间的非零绝对差值介于9.3132257e-010和7.6293945e-006之间。文本和二进制加载方法之间的这些细微差别是由于文本格式方法所需的float→text→float转换而导致精度损失。



 类似资料:
  • 问题内容: 我正在尝试使用python从Lambda函数更新Redshift。为此,我尝试合并2个代码片段。当我分别运行它们时,这两个片段都起作用。 从PyDev for Eclipse更新Redshift 接收上传到S3存储桶的内容(Lambda上提供预构建的模板) 由于这两个段都起作用,因此我尝试将它们组合在一起,以便在将文件上传到s3时可以更新Redshift: 由于我使用的是外部库,因此需

  • 我试图使用二元运算符来比较两个值: 现在,我收到一条失败消息:二进制运算符“==”不能应用于unichar或String类型的操作数。我还尝试转换字符: 不工作...

  • 我正在使用一个自定义的可调用的熊猫。to_sql()。下面的代码片段来自pandas文档,用于使用它 但是在使用这个复制功能时,我得到了错误 当我习惯于使用_sql()函数而不使用自定义可调用的“psql_insert_copy()”时,作为此表模式和值的输入在最初工作时不会出现问题。我正在使用sqlalchemy引擎获取连接光标

  • 如何将Restrictions.in与列表字段一起使用? 这是我的模型: 我尝试只获取具有某些功能的组: 但这会返回一个错误: 引起原因:org.hibernate.exception.GenericJDBCExc0019:在索引:: 1缺少IN或OUT参数org.hibernate.exception.internal.在org.hibernate.engine.jdbc.spi.SqlExce

  • 问题内容: 我们的处理器将一个(有效地传递)返回给我们。 现在,我们观察到尚未编程处理。我们还观察到处理instanceInstanceof ;我们需要编写一个自定义。 但可悲的是,它返回时只能处理一个,又不能处理a 。 因此,有人可以帮助我们了解如何处理中的列表吗? 问题答案: 通常,设计模式为: 如果处理器返回,则需要Writer期望。 您可以通过将您的委托包装在看起来像这样的ItemWrit

  • 问题内容: 因此,我一直在为这个(应该是)简单的练习而绞尽脑汁,以使该程序将日期字符串转换为对象,对其进行格式化,并在完成后将其作为字符串再次返回。 这是程序的最后一点,它从文件中获取一小段文本,将其分解为单独的记录,然后将记录分解为单独的数据并将它们分配给个人对象。 我已经在多个位置检查了该代码,并且该代码完全执行了应该执行的操作,直到调用了format函数(该函数抛出)为止。为对象分配了应该分