当前位置: 首页 > 面试题库 >

使用Mongoose模式导入CSV

辛才俊
2023-03-14
问题内容

当前,我需要将一个大型CSV文件推送到mongo数据库中,值的顺序需要确定数据库条目的键:

CSV文件示例:

9,1557,358,286,Mutantville,4368,2358026,,M,0,0,0,1,0
9,1557,359,147,Wroogny,4853,2356061,,D,0,0,0,1,0

将其解析为数组的代码:

var fs = require("fs");

var csv = require("fast-csv");

fs.createReadStream("rank.txt")
    .pipe(csv())
    .on("data", function(data){
        console.log(data);
    })
    .on("end", function(data){
        console.log("Read Finished");
    });

代码输出:

[ '9',
  '1557',
  '358',
  '286',
  'Mutantville',
  '4368',
  '2358026',
  '',
  'M',
  '0',
  '0',
  '0',
  '1',
  '0' ]
[ '9',
  '1557',
  '359',
  '147',
  'Wroogny',
  '4853',
  '2356061',
  '',
  'D',
  '0',
  '0',
  '0',
  '1',
  '0' ]

如何将数组插入我的Mongoose模式中以进入mongo db?

架构:

var mongoose = require("mongoose");


var rankSchema = new mongoose.Schema({
   serverid: Number,
   resetid: Number,
   rank: Number,
   number: Number,
   name: String,
   land: Number,
   networth: Number,
   tag: String,
   gov: String,
   gdi: Number,
   protection: Number,
   vacation: Number,
   alive: Number,
   deleted: Number
});

module.exports = mongoose.model("Rank", rankSchema);

数组的顺序需要与架构的顺序匹配,例如在数组中,第一个数字9始终需要保存,因为它们键为“ serverid”,依此类推。我正在使用Node.JS


问题答案:

您可以通过headers从模式定义获取来使用fast-
csv进行操作,该模式定义会将已解析的行作为“对象”返回。您实际上有一些不匹配,因此我已将它们标记为更正:

const fs = require('mz/fs');
const csv = require('fast-csv');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
  serverid: Number,
  resetid: Number,
  rank: Number,
  name: String,
  land: String,         // <-- You have this as Number but it's a string
  networth: Number,
  tag: String,
  stuff: String,        // the empty field in the csv
  gov: String,
  gdi: Number,
  protection: Number,
  vacation: Number,
  alive: Number,
  deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {
    const conn = await mongoose.connect(uri);

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);

    console.log(headers);

    await new Promise((resolve,reject) => {

      let buffer = [],
          counter = 0;

      let stream = fs.createReadStream('input.csv')
        .pipe(csv({ headers }))
        .on("error", reject)
        .on("data", async doc => {
          stream.pause();
          buffer.push(doc);
          counter++;
          log(doc);
          try {
            if ( counter > 10000 ) {
              await Rank.insertMany(buffer);
              buffer = [];
              counter = 0;
            }
          } catch(e) {
            stream.destroy(e);
          }

          stream.resume();

        })
        .on("end", async () => {
          try {
            if ( counter > 0 ) {
              await Rank.insertMany(buffer);
              buffer = [];
              counter = 0;
              resolve();
            }
          } catch(e) {
            stream.destroy(e);
          }
        });

    });


  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }


})()

只要该模式实际上与提供的CSV对齐,就可以了。这些是我可以看到的更正,但是如果您需要对实际字段名称进行不同的对齐,则需要进行调整。但是基本上Number在一个位置上有一个String,实际上是一个额外的字段,我认为这是CSV中的空白字段。

通常的事情是从架构中获取字段名称数组,并在制作csv解析器实例时将其传递给选项:

let headers = Object.keys(Rank.schema.paths)
  .filter(k => ['_id','__v'].indexOf(k) === -1);

let stream = fs.createReadStream('input.csv')
  .pipe(csv({ headers }))

实际执行此操作后,您将获得“对象”而不是数组:

{
  "serverid": "9",
  "resetid": "1557",
  "rank": "358",
  "name": "286",
  "land": "Mutantville",
  "networth": "4368",
  "tag": "2358026",
  "stuff": "",
  "gov": "M",
  "gdi": "0",
  "protection": "0",
  "vacation": "0",
  "alive": "1",
  "deleted": "0"
}

不要担心“类型”,因为Mongoose会根据模式强制转换值。

其余发生在data事件的处理程序中。为了获得最大的效率,我们insertMany()仅每10,000行写入一次数据库。它实际如何到达服务器和进程取决于MongoDB版本,但是根据内存使用情况的“权衡”并编写一个单据,您将为单个集合导入的平均字段数应该是10,000,这是相当合理的。合理的网络请求。如有必要,请减小数字。

重要的部分是将这些调用标记为async函数,然后await将其标记为结果insertMany()。另外,我们还需要进行pause()流处理,resume()否则需要对每个项目进行操作,否则就有可能buffer在实际发送文档之前覆盖要插入的文档。在pause()resume()有必要把“反压”在管道上,否则项目只保留“出山”和烧制的data事件。

自然地,对于10,000个条目的控件要求我们在每次迭代和流完成时都进行检查,以清空缓冲区并将所有剩余文档发送到服务器。

那确实是您要执行的操作,因为您当然不希望在整个data事件的“每次”迭代中或者基本上不等待每个请求完成就向服务器触发异步请求。您无需检查“非常小的文件”就可以摆脱困境,但是对于任何实际负载,由于“正在进行中”的异步调用尚未完成,因此您肯定会超出调用堆栈。

仅供参考-
package.json用过的。的mz,因为它只是一个现代化的可选Promise标准节点的启用库“内置”库,我只是习惯于使用。该代码当然可以与该fs模块完全互换。

{
  "description": "",
  "main": "index.js",
  "dependencies": {
    "fast-csv": "^2.4.1",
    "mongoose": "^5.1.1",
    "mz": "^2.7.0"
  },
  "keywords": [],
  "author": "",
  "license": "ISC"
}

实际上,使用Node v8.9.x及更高版本,我们甚至可以AsyncIterator通过stream-to- iterator模块的实现来简化此过程。它仍然处于Iterator<Promise<T>>模式下,但是应该在Node
v10.x变为稳定LTS之前执行:

const fs = require('mz/fs');
const csv = require('fast-csv');
const streamToIterator = require('stream-to-iterator');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
  serverid: Number,
  resetid: Number,
  rank: Number,
  name: String,
  land: String,
  networth: Number,
  tag: String,
  stuff: String,        // the empty field
  gov: String,
  gdi: Number,
  protection: Number,
  vacation: Number,
  alive: Number,
  deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {
    const conn = await mongoose.connect(uri);

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);

    //console.log(headers);

    let stream = fs.createReadStream('input.csv')
      .pipe(csv({ headers }));

    const iterator = await streamToIterator(stream).init();

    let buffer = [],
        counter = 0;

    for ( let docPromise of iterator ) {
      let doc = await docPromise;
      buffer.push(doc);
      counter++;

      if ( counter > 10000 ) {
        await Rank.insertMany(buffer);
        buffer = [];
        counter = 0;
      }
    }

    if ( counter > 0 ) {
      await Rank.insertMany(buffer);
      buffer = [];
      counter = 0;
    }

  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }

})()

基本上,所有流“事件”的处理,暂停和恢复都将替换为一个简单的for循环:

const iterator = await streamToIterator(stream).init();

for ( let docPromise of iterator ) {
  let doc = await docPromise;
  // ... The things in the loop
}

简单!for..await..of当它变得更稳定时,将在以后的节点实现中对此进行清理。但以上在指定版本及更高版本上运行良好。



 类似资料:
  • 选择定义如何导入数据的导入模式。 【提示】若要启动余下的选项,你必须在上一步启用主键。 点击“高级”按钮来进行更多设置。下列选项是根据连接的服务器类型而有所不同。 在每个运行中运行多个查询 一次运行多句 SQL 语句,这将会使导入过程更快。 使用延伸插入语句 插入记录时使用延伸插入语法。 例如: INSERT INTO `僱员` VALUES ('1', '梁明洁'', '23'), ('2',

  • 选择定义如何导入数据的导入模式。 【提示】若要启动余下的选项,你必须在上一步启用主键。 点击“高级”按钮来进行更多设置。下列选项是根据连接的服务器类型而有所不同。 在每个运行中运行多个查询 一次运行多句 SQL 语句,这将会使导入过程更快。 使用 NULL 取代空白字符串 如果源数据字段有空白字符串,就以 NULL 值导入。 忽略外键约束 在导入进程中忽略外键约束的检查。 遇到错误时继续 在导入进

  • 选择定义如何导入数据的导入模式。 【提示】若要启动余下的选项,你必须在上一步启用主键。 点击“高级”按钮来进行更多设置。下列选项是根据连接的服务器类型而有所不同。 在每个运行中运行多个查询 一次运行多句 SQL 语句,这将会使导入过程更快。 使用延伸插入语句 插入记录时使用延伸插入语法。 例如: INSERT INTO `僱员` VALUES ('1', '梁明洁'', '23'), ('2',

  • 问题内容: 我已尝试找到有关最佳使用还是全面的指南。我刚开始使用Python,并且正在尝试着眼于最佳实践。 基本上,我希望有人能分享他们的经验,其他开发人员有什么喜好,以及避免遇到麻烦的最佳方法是什么? 问题答案: 和之间的区别主要是主观的。选择最喜欢的一个,并在使用中保持一致。这里有一些要点可以帮助你做出决定。 优点: - 减少维护你的import报表。无需添加任何其他导入即可开始使用模块中的另

  • 问题内容: 我使用Mongoose.js,无法解决3级层次结构文档的问题。 有2种方法可以做到。 首先 -没有裁判。 我需要出示C记录。仅知道_id的C,如何填充/找到它? 我曾尝试使用: 但是我不知道如何从returnet得到一个对象,我只需要c对象。 其次,如果使用裁判: 如何填充所有B,C记录以获取层次结构? 我试图使用这样的东西: 但是它将为single_c.title返回undefine