sql >> Base de Datos >  >> NoSQL >> MongoDB

Streaming de datos en tiempo real con MongoDB Change Streams

Recientemente, MongoDB lanzó una nueva función a partir de la versión 3.6, Change Streams. Esto le brinda acceso instantáneo a sus datos, lo que lo ayuda a mantenerse actualizado con los cambios en sus datos. En el mundo de hoy, todos quieren notificaciones instantáneas en lugar de recibirlas después de algunas horas o minutos. Para algunas aplicaciones, es fundamental enviar notificaciones en tiempo real a todos los usuarios suscritos para todas y cada una de las actualizaciones. MongoDB facilitó mucho este proceso al introducir esta función. En este artículo, aprenderemos sobre el flujo de cambios de MongoDB y sus aplicaciones con algunos ejemplos.

Definición de corrientes de cambio

Los flujos de cambios no son más que el flujo en tiempo real de cualquier cambio que ocurra en la base de datos o la colección o incluso en las implementaciones. Por ejemplo, cada vez que se produce una actualización (Insertar, Actualizar o Eliminar) en una colección específica, MongoDB activa un evento de cambio con todos los datos que se han modificado.

Puede definir flujos de cambio en cualquier colección como cualquier otro operador de agregación normal usando el operador $changeStream y el método watch(). También puede definir el flujo de cambios utilizando el método MongoCollection.watch().

Ejemplo

db.myCollection.watch()

Cambiar funciones de transmisiones

  • Filtrado de cambios

    Puede filtrar los cambios para recibir notificaciones de eventos solo para algunos datos específicos.

    Ejemplo:

    pipeline = [
       {
         $match: { name: "Bob" }
       } ];
    changeStream = collection.watch(pipeline);

    Este código se asegurará de que reciba actualizaciones solo para los registros cuyo nombre sea igual a Bob. De esta forma, puede escribir canalizaciones para filtrar los flujos de cambios.

  • Reanudación de secuencias de cambios

    Esta característica asegura que no haya pérdida de datos en caso de fallas. Cada respuesta en la transmisión contiene el token de reanudación que se puede usar para reiniciar la transmisión desde un punto específico. Para algunas fallas frecuentes de la red, el controlador mongodb intentará restablecer la conexión con los suscriptores utilizando el token de reanudación más reciente. Aunque, en caso de falla completa de la aplicación, los clientes deben mantener el token de reanudación para reanudar la transmisión.

  • Flujos de cambios ordenados

    MongoDB utiliza un reloj lógico global para ordenar todos los eventos de flujo de cambios en todas las réplicas y fragmentos de cualquier clúster, por lo que el receptor siempre recibirá las notificaciones en el mismo orden en que se aplicaron los comandos en la base de datos.

  • Eventos con documentos completos

    MongoDB devuelve la parte de los documentos coincidentes de forma predeterminada. Pero puede modificar la configuración del flujo de cambios para recibir un documento completo. Para hacerlo, pase { fullDocument:“updateLookup”} al método de visualización.
    Ejemplo:

    collection = db.collection("myColl")
    changeStream = collection.watch({ fullDocument: “updateLookup”})
  • Durabilidad

    Los flujos de cambios solo notificarán los datos que están comprometidos con la mayoría de las réplicas. Esto asegurará que los eventos sean generados por datos de persistencia mayoritaria asegurando la durabilidad del mensaje.

  • Seguridad/Control de Acceso

    Los flujos de cambio son muy seguros. Los usuarios pueden crear flujos de cambios solo en las colecciones en las que tienen permisos de lectura. Puede crear secuencias de cambios en función de las funciones de los usuarios.

Varios nueves Conviértase en un administrador de bases de datos de MongoDB - Llevando MongoDB a la producción Obtenga información sobre lo que necesita saber para implementar, monitorear, administrar y escalar MongoDBDescargar gratis

Ejemplo de flujos de cambio

En este ejemplo, crearemos flujos de cambios en la colección Stock para recibir una notificación cuando el precio de una acción supere cualquier umbral.

  • Configurar el clúster

    Para usar flujos de cambios, primero tenemos que crear un conjunto de réplicas. Ejecute el siguiente comando para crear un conjunto de réplicas de un solo nodo.

    mongod --dbpath ./data --replSet “rs”
  • Insertar algunos registros en la colección Acciones

    var docs = [
     { ticker: "AAPL", price: 210 },
     { ticker: "AAPL", price: 260 },
     { ticker: "AAPL", price: 245 },
     { ticker: "AAPL", price: 255 },
     { ticker: "AAPL", price: 270 }
    ];
    db.Stocks.insert(docs)
  • Configurar el entorno del nodo e instalar las dependencias

    mkdir mongo-proj && cd mongo-proj
    npm init -y
    npm install mongodb --save
  • Suscríbase para los cambios

    Cree un archivo index.js y coloque el siguiente código en él.

    const mongo = require("mongodb").MongoClient;
    mongo.connect("mongodb://localhost:27017/?replicaSet=rs0").then(client => {
     console.log("Connected to MongoDB server");
     // Select DB and Collection
     const db = client.db("mydb");
     const collection = db.collection("Stocks");
     pipeline = [
       {
         $match: { "fullDocument.price": { $gte: 250 } }
       }
     ];
     // Define change stream
     const changeStream = collection.watch(pipeline);
     // start listen to changes
     changeStream.on("change", function(event) {
       console.log(JSON.stringify(event));
     });
    });

    Ahora ejecute este archivo:

    node index.js
  • Inserte un nuevo registro en la base de datos para recibir una actualización

    db.Stocks.insert({ ticker: “AAPL”, price: 280 })

    Ahora revise su consola, recibirá una actualización de MongoDB.
    Ejemplo de respuesta:

    {
    "_id":{
    "_data":"825C5D51F70000000129295A1004E83608EE8F1B4FBABDCEE73D5BF31FC946645F696400645C5D51F73ACA83479B48DE6E0004"},
    "operationType":"insert",
    "clusterTime":"6655565945622233089",
    "fullDocument":{
    "_id":"5c5d51f73aca83479b48de6e",
    "ticker":"AAPL",
    "Price":300
    },
    "ns":{"db":"mydb","coll":"Stocks"},
    "documentKey":{"_id":"5c5d51f73aca83479b48de6e"}
    }

Aquí puede cambiar el valor del parámetro OperationType con las siguientes operaciones para detectar diferentes tipos de cambios en una colección:

  • Insertar
  • Reemplazar (excepto ID único)
  • Actualizar
  • Eliminar
  • Invalidar (siempre que Mongo devuelva un cursor no válido)

Otros modos de flujos de cambios

Puede iniciar flujos de cambios en una base de datos y la implementación de la misma manera que en la recopilación. Esta función se ha lanzado a partir de la versión 4.0 de MongoDB. Estos son los comandos para abrir un flujo de cambios contra la base de datos y las implementaciones.

Against DB: db.watch()
Against deployment: Mongo.watch()

Conclusión

MongoDB Change Streams simplifica la integración entre el frontend y el backend en tiempo real y sin problemas. Esta característica puede ayudarlo a usar MongoDB para el modelo pubsub, por lo que ya no necesita administrar las implementaciones de Kafka o RabbitMQ. Si su aplicación requiere información en tiempo real, debe consultar esta característica de MongoDB. Espero que esta publicación lo ayude a comenzar con las secuencias de cambios de MongoDB.