当前位置: 首页 > 知识库问答 >
问题:

如何使用pg-query-stream和pg-Promise在流式传输时运行更新?

贲言
2023-03-14
  • 我正在尝试从数据库中加载50000个包含文本的项目,标记它们并更新标签
  • 我使用pg-Promise和pg-query-stream来达到这个目的
  • 我能够让流部分正常工作,但是更新变得有问题,有这么多更新语句

这是我现有的代码

const QueryStream = require('pg-query-stream')
const JSONStream = require('JSONStream')

function prepareText(title, content, summary) {
  let description
  if (content && content.length) {
    description = content
  } else if (summary && summary.length) {
    description = summary
  } else {
    description = ''
  }
  return title.toLowerCase() + ' ' + description.toLowerCase()
}

async function tagAll({ db, logger, tagger }) {
  // you can also use pgp.as.format(query, values, options)
  // to format queries properly, via pg-promise;
  const qs = new QueryStream(
    'SELECT feed_item_id,title,summary,content FROM feed_items ORDER BY pubdate DESC, feed_item_id DESC'
  )
  try {
    const result = await db.stream(qs, (s) => {
      // initiate streaming into the console:
      s.pipe(JSONStream.stringify())
      s.on('data', async (item) => {
        try {
          s.pause()
          // eslint-disable-next-line camelcase
          const { feed_item_id, title, summary, content } = item

          // Process text to be tagged
          const text = prepareText(title, summary, content)
          const tags = tagger.tag(text)

          // Update tags per post
          await db.query(
            'UPDATE feed_items SET tags=$1 WHERE feed_item_id=$2',
            // eslint-disable-next-line camelcase
            [tags, feed_item_id]
          )
        } catch (error) {
          logger.error(error)
        } finally {
          s.resume()
        }
      })
    })
    logger.info(
      'Total rows processed:',
      result.processed,
      'Duration in milliseconds:',
      result.duration
    )
  } catch (error) {
    logger.error(error)
  }
}

module.exports = tagAll
  • db对象是来自pg promise的对象,而tagger只是从变量标记中包含的文本中提取一个标记数组
  • 根据我在诊断中看到的内容,执行了太多更新语句,有没有办法对它们进行批处理

共有2个答案

井翰
2023-03-14

这是我能想到的最好的批处理流中的查询,这样我们就不需要加载内存中的所有数据或运行太多查询。如果有人知道更好的批处理方法,特别是使用t.sequence,请随时添加另一个答案

const BATCH_SIZE = 5000
async function batchInsert({ db, pgp, logger, data }) {
  try {
    // https://vitaly-t.github.io/pg-promise/helpers.ColumnSet.html
    const cs = new pgp.helpers.ColumnSet(
      [
        { name: 'feed_item_id', cast: 'uuid' },
        { name: 'tags', cast: 'varchar(64)[]' },
      ],
      {
        table: 'feed_items',
      }
    )
    const query =
      pgp.helpers.update(data, cs) + ' WHERE v.feed_item_id=t.feed_item_id'
    await db.none(query)
  } catch (error) {
    logger.error(error)
  }
}

async function tagAll({ db, pgp, logger, tagger }) {
  // you can also use pgp.as.format(query, values, options)
  // to format queries properly, via pg-promise;
  const qs = new QueryStream(
    'SELECT feed_item_id,title,summary,content FROM feed_items ORDER BY pubdate DESC, feed_item_id DESC'
  )
  try {
    const queryValues = []
    const result = await db.stream(qs, (s) => {
      // initiate streaming into the console:
      s.pipe(JSONStream.stringify())
      s.on('data', async (item) => {
        try {
          s.pause()
          // eslint-disable-next-line camelcase
          const { feed_item_id, title, summary, content } = item

          // Process text to be tagged
          const text = prepareText(title, summary, content)
          const tags = tagger.tag(text)
          queryValues.push({ feed_item_id, tags })

          if (queryValues.length >= BATCH_SIZE) {
            const data = queryValues.splice(0, queryValues.length)
            await batchInsert({ db, pgp, logger, data })
          }
        } catch (error) {
          logger.error(error)
        } finally {
          s.resume()
        }
      })
    })
    await batchInsert({ db, pgp, logger, data: queryValues })
    return result
  } catch (error) {
    logger.error(error)
  }
}
姜松
2023-03-14

如果您可以用一条sql语句完成所有事情,那么您应该这样做!在这里,您要为表中的每一行支付node和DB之间的往返费用,这将花费您查询的大部分时间。

您的请求可以用纯 sql 实现:

update feed_items set tags=case 
    when (content = '') is false then lower(title) || ' ' || lower(content) 
    when (summary = '') is  false then lower(title) || ' ' || lower(summary) 
    else title end;

这个请求将一次更新所有表。我肯定这比你的方法快几个数量级。在我的机器上,一个包含100000行的表,更新时间大约是600毫秒。

一些评论:

  • 您无需订购即可更新。由于订购速度很慢,最好不要订购。
  • 我想限制部分是因为它太慢了?如果是这样,那么您可以删除它,50000行对于postgres来说不是一个大表
  • 我打赌这个pg-stream东西并不是真的从数据库中流式输出,它只允许您使用它之前收集的结果中的流式api……这没问题,但我认为这里可能有一个误解
 类似资料:
  • 问题内容: 我在Postgres上使用pg-promise的节点上建立了一个API,效果很好,但是我正在考虑如何修改PUT语句以更好地处理输入中的NULLS。 以下是PUT语句的代码: 现在,此语句有效,但是如果我将NULLS传递给下一个更新,它也会删除现有值。例如,如果我只想更新string1和date2,则必须发送整个json对象,否则所有其他值都将设置为NULL。 有没有更好的方法来解决这个

  • 我想使用单个查询插入多行,例如: 是否有一种方法可以轻松做到这一点,最好是针对如下对象数组: 我最终可能会在一个块中有500条记录,因此不希望运行多个查询。 到目前为止,我只能为单个对象执行此操作: 顺便问一个问题:使用< code>${}符号的插入是否可以防止SQL注入?

  • 问题内容: 我想将超时添加到pg- promise查询,以便如果数据库尚未响应,则在一段时间后它们将失败。有什么建议的方法吗?还是我应该做一个可以处理定时器并拒绝诺言的自定义包装器(如果为时已晚)? 问题答案: 来自pg-promise的作者… pg-promise不支持查询取消,因为它可以解决数据库设计错误或查询执行不正确的问题。 PostgreSQL支持在执行耗时的查询时应使用的事件,因此无需

  • 问题内容: 我想用一个查询插入多个行,例如: 有没有一种方法可以轻松地做到这一点,最好是针对这样的对象数组: 我可能最终将500条记录放在一个块中,因此运行多个查询将是不可取的。 到目前为止,我只能对单个对象执行此操作: 附带的问题:使用符号插入是否可以防止SQL注入? 问题答案: 我是pg-promise的作者。 在旧版本的库中,Performance Boost文章中的简化示例涵盖了这一点,在

  • PG

    Concepts Peering Interval See PG::start_peering_interval. See PG::acting_up_affected See PG::RecoveryState::Reset A peering interval is a maximal set of contiguous map epochs in which the up and actin

  • pg

    pg 是一个用于golang database/sql 的 PostgreSQL 驱动。 安装 go get github.com/blusewang/pg 使用 db, err := sql.Open("pg", "pg://user:password@dbhost.yourdomain.com/database_name?application_name=app_name&sslmode=v