sql >> Base de Datos >  >> NoSQL >> MongoDB

Importar CSV utilizando Mongoose Schema

Puede hacerlo con fast-csv obteniendo los headers de la definición del esquema que devolverá las líneas analizadas como "objetos". De hecho, tienes algunas discrepancias, así que las he marcado con correcciones:

const fs = require('mz/fs');
const csv = require('fast-csv');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
  serverid: Number,
  resetid: Number,
  rank: Number,
  name: String,
  land: String,         // <-- You have this as Number but it's a string
  networth: Number,
  tag: String,
  stuff: String,        // the empty field in the csv
  gov: String,
  gdi: Number,
  protection: Number,
  vacation: Number,
  alive: Number,
  deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {
    const conn = await mongoose.connect(uri);

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);

    console.log(headers);

    await new Promise((resolve,reject) => {

      let buffer = [],
          counter = 0;

      let stream = fs.createReadStream('input.csv')
        .pipe(csv({ headers }))
        .on("error", reject)
        .on("data", async doc => {
          stream.pause();
          buffer.push(doc);
          counter++;
          log(doc);
          try {
            if ( counter > 10000 ) {
              await Rank.insertMany(buffer);
              buffer = [];
              counter = 0;
            }
          } catch(e) {
            stream.destroy(e);
          }

          stream.resume();

        })
        .on("end", async () => {
          try {
            if ( counter > 0 ) {
              await Rank.insertMany(buffer);
              buffer = [];
              counter = 0;
              resolve();
            }
          } catch(e) {
            stream.destroy(e);
          }
        });

    });


  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }


})()

Siempre que el esquema se alinee con el CSV proporcionado, está bien. Estas son las correcciones que puedo ver, pero si necesita que los nombres de los campos reales estén alineados de manera diferente, entonces debe ajustarlos. Pero básicamente había un Number en la posición donde hay un String y esencialmente un campo adicional, que supongo que es el espacio en blanco en el CSV.

Lo general es obtener la matriz de nombres de campo del esquema y pasarla a las opciones al crear la instancia del analizador csv:

let headers = Object.keys(Rank.schema.paths)
  .filter(k => ['_id','__v'].indexOf(k) === -1);

let stream = fs.createReadStream('input.csv')
  .pipe(csv({ headers }))

Una vez que realmente haces eso, obtienes un "Objeto" en lugar de una matriz:

{
  "serverid": "9",
  "resetid": "1557",
  "rank": "358",
  "name": "286",
  "land": "Mutantville",
  "networth": "4368",
  "tag": "2358026",
  "stuff": "",
  "gov": "M",
  "gdi": "0",
  "protection": "0",
  "vacation": "0",
  "alive": "1",
  "deleted": "0"
}

No se preocupe por los "tipos" porque Mongoose emitirá los valores de acuerdo con el esquema.

El resto ocurre dentro del controlador de los data evento. Para una máxima eficiencia estamos usando insertMany() para escribir solo en la base de datos una vez cada 10,000 líneas. La forma en que eso realmente va al servidor y los procesos depende de la versión de MongoDB, pero 10,000 debería ser bastante razonable en función de la cantidad promedio de campos que importaría para una sola colección en términos de "compensación" para el uso de memoria y escribir un solicitud de red razonable. Haz el número más pequeño si es necesario.

Las partes importantes son marcar estas llamadas como async funciones y await el resultado de insertMany() Antes de continuar. También necesitamos pause() la transmisión y resume() en cada elemento, de lo contrario corremos el riesgo de sobrescribir el buffer de documentos a insertar antes de que sean realmente enviados. El pause() y resume() son necesarios para poner "contrapresión" en la tubería, de lo contrario, los elementos seguirán "saliendo" y disparando los data evento.

Naturalmente, el control de las 10 000 entradas requiere que verifiquemos eso tanto en cada iteración como en la finalización de la transmisión para vaciar el búfer y enviar los documentos restantes al servidor.

Eso es realmente lo que quiere hacer, ya que ciertamente no desea enviar una solicitud asíncrona al servidor en "cada" iteración a través de los data evento o esencialmente sin esperar a que se complete cada solicitud. Se saldrá con la suya si no revisa eso para "archivos muy pequeños", pero para cualquier carga del mundo real seguramente excederá la pila de llamadas debido a las llamadas asíncronas "en vuelo" que aún no se han completado.

FYI:un package.json utilizado. El mz es opcional ya que es solo una Promise modernizada biblioteca habilitada de bibliotecas "incorporadas" de nodo estándar que simplemente estoy acostumbrado a usar. El código es, por supuesto, completamente intercambiable con el fs módulo.

{
  "description": "",
  "main": "index.js",
  "dependencies": {
    "fast-csv": "^2.4.1",
    "mongoose": "^5.1.1",
    "mz": "^2.7.0"
  },
  "keywords": [],
  "author": "",
  "license": "ISC"
}

En realidad, con Node v8.9.x y superior, incluso podemos hacer esto mucho más simple con una implementación de AsyncIterator a través del stream-to-iterator módulo. Todavía está en Iterator<Promise<T>> modo, pero debería funcionar hasta que Node v10.x se vuelva LTS estable:

const fs = require('mz/fs');
const csv = require('fast-csv');
const streamToIterator = require('stream-to-iterator');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
  serverid: Number,
  resetid: Number,
  rank: Number,
  name: String,
  land: String,
  networth: Number,
  tag: String,
  stuff: String,        // the empty field
  gov: String,
  gdi: Number,
  protection: Number,
  vacation: Number,
  alive: Number,
  deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {
    const conn = await mongoose.connect(uri);

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);

    //console.log(headers);

    let stream = fs.createReadStream('input.csv')
      .pipe(csv({ headers }));

    const iterator = await streamToIterator(stream).init();

    let buffer = [],
        counter = 0;

    for ( let docPromise of iterator ) {
      let doc = await docPromise;
      buffer.push(doc);
      counter++;

      if ( counter > 10000 ) {
        await Rank.insertMany(buffer);
        buffer = [];
        counter = 0;
      }
    }

    if ( counter > 0 ) {
      await Rank.insertMany(buffer);
      buffer = [];
      counter = 0;
    }

  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }

})()

Básicamente, todo el manejo de "eventos" de flujo y la pausa y reanudación se reemplazan por un simple for bucle:

const iterator = await streamToIterator(stream).init();

for ( let docPromise of iterator ) {
  let doc = await docPromise;
  // ... The things in the loop
}

¡Fácil! Esto se limpia en la implementación posterior del nodo con for..await..of cuando se vuelve más estable. Pero lo anterior funciona bien en la versión especificada y superior.