这是我现有的代码
const QueryStream = require('pg-query-stream')
const JSONStream = require('JSONStream')
function prepareText(title, content, summary) {
let description
if (content && content.length) {
description = content
} else if (summary && summary.length) {
description = summary
} else {
description = ''
}
return title.toLowerCase() + ' ' + description.toLowerCase()
}
async function tagAll({ db, logger, tagger }) {
// you can also use pgp.as.format(query, values, options)
// to format queries properly, via pg-promise;
const qs = new QueryStream(
'SELECT feed_item_id,title,summary,content FROM feed_items ORDER BY pubdate DESC, feed_item_id DESC'
)
try {
const result = await db.stream(qs, (s) => {
// initiate streaming into the console:
s.pipe(JSONStream.stringify())
s.on('data', async (item) => {
try {
s.pause()
// eslint-disable-next-line camelcase
const { feed_item_id, title, summary, content } = item
// Process text to be tagged
const text = prepareText(title, summary, content)
const tags = tagger.tag(text)
// Update tags per post
await db.query(
'UPDATE feed_items SET tags=$1 WHERE feed_item_id=$2',
// eslint-disable-next-line camelcase
[tags, feed_item_id]
)
} catch (error) {
logger.error(error)
} finally {
s.resume()
}
})
})
logger.info(
'Total rows processed:',
result.processed,
'Duration in milliseconds:',
result.duration
)
} catch (error) {
logger.error(error)
}
}
module.exports = tagAll
这是我能想到的最好的批处理流中的查询,这样我们就不需要加载内存中的所有数据或运行太多查询。如果有人知道更好的批处理方法,特别是使用t.sequence,请随时添加另一个答案
const BATCH_SIZE = 5000
async function batchInsert({ db, pgp, logger, data }) {
try {
// https://vitaly-t.github.io/pg-promise/helpers.ColumnSet.html
const cs = new pgp.helpers.ColumnSet(
[
{ name: 'feed_item_id', cast: 'uuid' },
{ name: 'tags', cast: 'varchar(64)[]' },
],
{
table: 'feed_items',
}
)
const query =
pgp.helpers.update(data, cs) + ' WHERE v.feed_item_id=t.feed_item_id'
await db.none(query)
} catch (error) {
logger.error(error)
}
}
async function tagAll({ db, pgp, logger, tagger }) {
// you can also use pgp.as.format(query, values, options)
// to format queries properly, via pg-promise;
const qs = new QueryStream(
'SELECT feed_item_id,title,summary,content FROM feed_items ORDER BY pubdate DESC, feed_item_id DESC'
)
try {
const queryValues = []
const result = await db.stream(qs, (s) => {
// initiate streaming into the console:
s.pipe(JSONStream.stringify())
s.on('data', async (item) => {
try {
s.pause()
// eslint-disable-next-line camelcase
const { feed_item_id, title, summary, content } = item
// Process text to be tagged
const text = prepareText(title, summary, content)
const tags = tagger.tag(text)
queryValues.push({ feed_item_id, tags })
if (queryValues.length >= BATCH_SIZE) {
const data = queryValues.splice(0, queryValues.length)
await batchInsert({ db, pgp, logger, data })
}
} catch (error) {
logger.error(error)
} finally {
s.resume()
}
})
})
await batchInsert({ db, pgp, logger, data: queryValues })
return result
} catch (error) {
logger.error(error)
}
}
如果您可以用一条sql语句完成所有事情,那么您应该这样做!在这里,您要为表中的每一行支付node和DB之间的往返费用,这将花费您查询的大部分时间。
您的请求可以用纯 sql 实现:
update feed_items set tags=case
when (content = '') is false then lower(title) || ' ' || lower(content)
when (summary = '') is false then lower(title) || ' ' || lower(summary)
else title end;
这个请求将一次更新所有表。我肯定这比你的方法快几个数量级。在我的机器上,一个包含100000行的表,更新时间大约是600毫秒。
一些评论:
限制
部分是因为它太慢了?如果是这样,那么您可以删除它,50000行对于postgres来说不是一个大表pg-stream
东西并不是真的从数据库中流式输出,它只允许您使用它之前收集的结果中的流式api……这没问题,但我认为这里可能有一个误解问题内容: 我在Postgres上使用pg-promise的节点上建立了一个API,效果很好,但是我正在考虑如何修改PUT语句以更好地处理输入中的NULLS。 以下是PUT语句的代码: 现在,此语句有效,但是如果我将NULLS传递给下一个更新,它也会删除现有值。例如,如果我只想更新string1和date2,则必须发送整个json对象,否则所有其他值都将设置为NULL。 有没有更好的方法来解决这个
我想使用单个查询插入多行,例如: 是否有一种方法可以轻松做到这一点,最好是针对如下对象数组: 我最终可能会在一个块中有500条记录,因此不希望运行多个查询。 到目前为止,我只能为单个对象执行此操作: 顺便问一个问题:使用< code>${}符号的插入是否可以防止SQL注入?
问题内容: 我想将超时添加到pg- promise查询,以便如果数据库尚未响应,则在一段时间后它们将失败。有什么建议的方法吗?还是我应该做一个可以处理定时器并拒绝诺言的自定义包装器(如果为时已晚)? 问题答案: 来自pg-promise的作者… pg-promise不支持查询取消,因为它可以解决数据库设计错误或查询执行不正确的问题。 PostgreSQL支持在执行耗时的查询时应使用的事件,因此无需
问题内容: 我想用一个查询插入多个行,例如: 有没有一种方法可以轻松地做到这一点,最好是针对这样的对象数组: 我可能最终将500条记录放在一个块中,因此运行多个查询将是不可取的。 到目前为止,我只能对单个对象执行此操作: 附带的问题:使用符号插入是否可以防止SQL注入? 问题答案: 我是pg-promise的作者。 在旧版本的库中,Performance Boost文章中的简化示例涵盖了这一点,在
Concepts Peering Interval See PG::start_peering_interval. See PG::acting_up_affected See PG::RecoveryState::Reset A peering interval is a maximal set of contiguous map epochs in which the up and actin
pg 是一个用于golang database/sql 的 PostgreSQL 驱动。 安装 go get github.com/blusewang/pg 使用 db, err := sql.Open("pg", "pg://user:password@dbhost.yourdomain.com/database_name?application_name=app_name&sslmode=v