sql >> Base de Datos >  >> NoSQL >> MongoDB

Guarde un CSV muy grande en mongoDB usando mongoose

Bienvenido a la transmisión. Lo que realmente desea es un "flujo de eventos" que procese su entrada "un fragmento a la vez" y, por supuesto, idealmente por un delimitador común como el carácter de "nueva línea" que está utilizando actualmente.

Para cosas realmente eficientes, puede agregar el uso de MongoDB "Bulk API" inserciones para que su carga sea lo más rápida posible sin consumir toda la memoria de la máquina o los ciclos de CPU.

No abogo porque hay varias soluciones disponibles, pero aquí hay una lista que utiliza line- paquete de flujo de entrada para simplificar la parte del "terminador de línea".

Definiciones de esquema solo por "ejemplo":

var LineInputStream = require("line-input-stream"),
    fs = require("fs"),
    async = require("async"),
    mongoose = require("mongoose"),
    Schema = mongoose.Schema;

var entrySchema = new Schema({},{ strict: false })

var Entry = mongoose.model( "Schema", entrySchema );

var stream = LineInputStream(fs.createReadStream("data.txt",{ flags: "r" }));

stream.setDelimiter("\n");

mongoose.connection.on("open",function(err,conn) { 

    // lower level method, needs connection
    var bulk = Entry.collection.initializeOrderedBulkOp();
    var counter = 0;

    stream.on("error",function(err) {
        console.log(err); // or otherwise deal with it
    });

    stream.on("line",function(line) {

        async.series(
            [
                function(callback) {
                    var row = line.split(",");     // split the lines on delimiter
                    var obj = {};             
                    // other manipulation

                    bulk.insert(obj);  // Bulk is okay if you don't need schema
                                       // defaults. Or can just set them.

                    counter++;

                    if ( counter % 1000 == 0 ) {
                        stream.pause();
                        bulk.execute(function(err,result) {
                            if (err) callback(err);
                            // possibly do something with result
                            bulk = Entry.collection.initializeOrderedBulkOp();
                            stream.resume();
                            callback();
                        });
                    } else {
                        callback();
                    }
               }
           ],
           function (err) {
               // each iteration is done
           }
       );

    });

    stream.on("end",function() {

        if ( counter % 1000 != 0 )
            bulk.execute(function(err,result) {
                if (err) throw err;   // or something
                // maybe look at result
            });
    });

});

Por lo general, la interfaz de "flujo" allí "descompone la entrada" para procesar "una línea a la vez". Eso evita que cargues todo a la vez.

Las partes principales son la "API de operaciones masivas" de MongoDB. Esto le permite "poner en cola" muchas operaciones a la vez antes de enviarlas al servidor. Entonces, en este caso, con el uso de un "módulo", las escrituras solo se envían por cada 1000 entradas procesadas. Realmente puede hacer cualquier cosa hasta el límite de BSON de 16 MB, pero manténgalo manejable.

Además de las operaciones que se procesan de forma masiva, existe un "limitador" adicional en el lugar del async biblioteca. No es realmente necesario, pero esto garantiza que esencialmente no más que el "límite de módulo" de los documentos estén en proceso en cualquier momento. Las "inserciones" por lotes generales no tienen ningún costo de E/S aparte de la memoria, pero las llamadas "ejecutar" significan que se está procesando la E/S. Así que esperamos en lugar de hacer cola para hacer más cosas.

Seguramente hay mejores soluciones que puede encontrar para el "procesamiento de flujo" de datos de tipo CSV que parece ser. Pero, en general, esto le brinda los conceptos sobre cómo hacer esto de una manera eficiente en memoria sin consumir ciclos de CPU también.