当前,我需要将一个大型CSV文件推送到mongo数据库中,值的顺序需要确定数据库条目的键:
CSV文件示例:
9,1557,358,286,Mutantville,4368,2358026,,M,0,0,0,1,0
9,1557,359,147,Wroogny,4853,2356061,,D,0,0,0,1,0
将其解析为数组的代码:
var fs = require("fs");
var csv = require("fast-csv");
fs.createReadStream("rank.txt")
.pipe(csv())
.on("data", function(data){
console.log(data);
})
.on("end", function(data){
console.log("Read Finished");
});
代码输出:
[ '9',
'1557',
'358',
'286',
'Mutantville',
'4368',
'2358026',
'',
'M',
'0',
'0',
'0',
'1',
'0' ]
[ '9',
'1557',
'359',
'147',
'Wroogny',
'4853',
'2356061',
'',
'D',
'0',
'0',
'0',
'1',
'0' ]
如何将数组插入我的Mongoose模式中以进入mongo db?
架构:
var mongoose = require("mongoose");
var rankSchema = new mongoose.Schema({
serverid: Number,
resetid: Number,
rank: Number,
number: Number,
name: String,
land: Number,
networth: Number,
tag: String,
gov: String,
gdi: Number,
protection: Number,
vacation: Number,
alive: Number,
deleted: Number
});
module.exports = mongoose.model("Rank", rankSchema);
数组的顺序需要与架构的顺序匹配,例如在数组中,第一个数字9始终需要保存,因为它们键为“ serverid”,依此类推。我正在使用Node.JS
您可以通过headers
从模式定义获取来使用fast-
csv进行操作,该模式定义会将已解析的行作为“对象”返回。您实际上有一些不匹配,因此我已将它们标记为更正:
const fs = require('mz/fs');
const csv = require('fast-csv');
const { Schema } = mongoose = require('mongoose');
const uri = 'mongodb://localhost/test';
mongoose.Promise = global.Promise;
mongoose.set('debug', true);
const rankSchema = new Schema({
serverid: Number,
resetid: Number,
rank: Number,
name: String,
land: String, // <-- You have this as Number but it's a string
networth: Number,
tag: String,
stuff: String, // the empty field in the csv
gov: String,
gdi: Number,
protection: Number,
vacation: Number,
alive: Number,
deleted: Number
});
const Rank = mongoose.model('Rank', rankSchema);
const log = data => console.log(JSON.stringify(data, undefined, 2));
(async function() {
try {
const conn = await mongoose.connect(uri);
await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
console.log(headers);
await new Promise((resolve,reject) => {
let buffer = [],
counter = 0;
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }))
.on("error", reject)
.on("data", async doc => {
stream.pause();
buffer.push(doc);
counter++;
log(doc);
try {
if ( counter > 10000 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
} catch(e) {
stream.destroy(e);
}
stream.resume();
})
.on("end", async () => {
try {
if ( counter > 0 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
resolve();
}
} catch(e) {
stream.destroy(e);
}
});
});
} catch(e) {
console.error(e)
} finally {
process.exit()
}
})()
只要该模式实际上与提供的CSV对齐,就可以了。这些是我可以看到的更正,但是如果您需要对实际字段名称进行不同的对齐,则需要进行调整。但是基本上Number
在一个位置上有一个String
,实际上是一个额外的字段,我认为这是CSV中的空白字段。
通常的事情是从架构中获取字段名称数组,并在制作csv解析器实例时将其传递给选项:
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }))
实际执行此操作后,您将获得“对象”而不是数组:
{
"serverid": "9",
"resetid": "1557",
"rank": "358",
"name": "286",
"land": "Mutantville",
"networth": "4368",
"tag": "2358026",
"stuff": "",
"gov": "M",
"gdi": "0",
"protection": "0",
"vacation": "0",
"alive": "1",
"deleted": "0"
}
不要担心“类型”,因为Mongoose会根据模式强制转换值。
其余发生在data
事件的处理程序中。为了获得最大的效率,我们insertMany()
仅每10,000行写入一次数据库。它实际如何到达服务器和进程取决于MongoDB版本,但是根据内存使用情况的“权衡”并编写一个单据,您将为单个集合导入的平均字段数应该是10,000,这是相当合理的。合理的网络请求。如有必要,请减小数字。
重要的部分是将这些调用标记为async
函数,然后await
将其标记为结果insertMany()
。另外,我们还需要进行pause()
流处理,resume()
否则需要对每个项目进行操作,否则就有可能buffer
在实际发送文档之前覆盖要插入的文档。在pause()
和resume()
有必要把“反压”在管道上,否则项目只保留“出山”和烧制的data
事件。
自然地,对于10,000个条目的控件要求我们在每次迭代和流完成时都进行检查,以清空缓冲区并将所有剩余文档发送到服务器。
那确实是您要执行的操作,因为您当然不希望在整个data
事件的“每次”迭代中或者基本上不等待每个请求完成就向服务器触发异步请求。您无需检查“非常小的文件”就可以摆脱困境,但是对于任何实际负载,由于“正在进行中”的异步调用尚未完成,因此您肯定会超出调用堆栈。
仅供参考-
package.json
用过的。的mz
,因为它只是一个现代化的可选Promise
标准节点的启用库“内置”库,我只是习惯于使用。该代码当然可以与该fs
模块完全互换。
{
"description": "",
"main": "index.js",
"dependencies": {
"fast-csv": "^2.4.1",
"mongoose": "^5.1.1",
"mz": "^2.7.0"
},
"keywords": [],
"author": "",
"license": "ISC"
}
实际上,使用Node v8.9.x及更高版本,我们甚至可以AsyncIterator
通过stream-to- iterator
模块的实现来简化此过程。它仍然处于Iterator<Promise<T>>
模式下,但是应该在Node
v10.x变为稳定LTS之前执行:
const fs = require('mz/fs');
const csv = require('fast-csv');
const streamToIterator = require('stream-to-iterator');
const { Schema } = mongoose = require('mongoose');
const uri = 'mongodb://localhost/test';
mongoose.Promise = global.Promise;
mongoose.set('debug', true);
const rankSchema = new Schema({
serverid: Number,
resetid: Number,
rank: Number,
name: String,
land: String,
networth: Number,
tag: String,
stuff: String, // the empty field
gov: String,
gdi: Number,
protection: Number,
vacation: Number,
alive: Number,
deleted: Number
});
const Rank = mongoose.model('Rank', rankSchema);
const log = data => console.log(JSON.stringify(data, undefined, 2));
(async function() {
try {
const conn = await mongoose.connect(uri);
await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
//console.log(headers);
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }));
const iterator = await streamToIterator(stream).init();
let buffer = [],
counter = 0;
for ( let docPromise of iterator ) {
let doc = await docPromise;
buffer.push(doc);
counter++;
if ( counter > 10000 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
}
if ( counter > 0 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
} catch(e) {
console.error(e)
} finally {
process.exit()
}
})()
基本上,所有流“事件”的处理,暂停和恢复都将替换为一个简单的for
循环:
const iterator = await streamToIterator(stream).init();
for ( let docPromise of iterator ) {
let doc = await docPromise;
// ... The things in the loop
}
简单!for..await..of
当它变得更稳定时,将在以后的节点实现中对此进行清理。但以上在指定版本及更高版本上运行良好。
选择定义如何导入数据的导入模式。 【提示】若要启动余下的选项,你必须在上一步启用主键。 点击“高级”按钮来进行更多设置。下列选项是根据连接的服务器类型而有所不同。 在每个运行中运行多个查询 一次运行多句 SQL 语句,这将会使导入过程更快。 使用延伸插入语句 插入记录时使用延伸插入语法。 例如: INSERT INTO `僱员` VALUES ('1', '梁明洁'', '23'), ('2',
选择定义如何导入数据的导入模式。 【提示】若要启动余下的选项,你必须在上一步启用主键。 点击“高级”按钮来进行更多设置。下列选项是根据连接的服务器类型而有所不同。 在每个运行中运行多个查询 一次运行多句 SQL 语句,这将会使导入过程更快。 使用 NULL 取代空白字符串 如果源数据字段有空白字符串,就以 NULL 值导入。 忽略外键约束 在导入进程中忽略外键约束的检查。 遇到错误时继续 在导入进
选择定义如何导入数据的导入模式。 【提示】若要启动余下的选项,你必须在上一步启用主键。 点击“高级”按钮来进行更多设置。下列选项是根据连接的服务器类型而有所不同。 在每个运行中运行多个查询 一次运行多句 SQL 语句,这将会使导入过程更快。 使用延伸插入语句 插入记录时使用延伸插入语法。 例如: INSERT INTO `僱员` VALUES ('1', '梁明洁'', '23'), ('2',
问题内容: 我已尝试找到有关最佳使用还是全面的指南。我刚开始使用Python,并且正在尝试着眼于最佳实践。 基本上,我希望有人能分享他们的经验,其他开发人员有什么喜好,以及避免遇到麻烦的最佳方法是什么? 问题答案: 和之间的区别主要是主观的。选择最喜欢的一个,并在使用中保持一致。这里有一些要点可以帮助你做出决定。 优点: - 减少维护你的import报表。无需添加任何其他导入即可开始使用模块中的另
问题内容: 我使用Mongoose.js,无法解决3级层次结构文档的问题。 有2种方法可以做到。 首先 -没有裁判。 我需要出示C记录。仅知道_id的C,如何填充/找到它? 我曾尝试使用: 但是我不知道如何从returnet得到一个对象,我只需要c对象。 其次,如果使用裁判: 如何填充所有B,C记录以获取层次结构? 我试图使用这样的东西: 但是它将为single_c.title返回undefine