sql >> Base de Datos >  >> NoSQL >> Redis

¿Cómo implementar un flujo de futuros para una llamada de bloqueo usando futures.rs y Redis PubSub?

Advertencia importante Nunca he usado esta biblioteca antes, y mi conocimiento de bajo nivel de algunos de los conceptos es un poco... escaso. Principalmente estoy leyendo el tutorial. Estoy bastante seguro de que cualquier persona que haya realizado trabajos asincrónicos leerá esto y se reirá, pero puede ser un punto de partida útil para otras personas. ¡Advertencia emptor!

Comencemos con algo un poco más simple, demostrando cómo un Stream obras. Podemos convertir un iterador de Result s en una secuencia:

extern crate futures;

use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let payloads: Vec<Result<String, ()>> = vec![Ok("a".into()), Ok("b".into())];
    let payloads = stream::iter(payloads.into_iter());

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
}

Esto nos muestra una forma de consumir la transmisión. Usamos and_then para hacer algo con cada carga útil (aquí solo imprimiéndola) y luego for_each para convertir el Stream volver a un Future . Luego podemos ejecutar el futuro llamando al extraño nombre forget método.

Lo siguiente es vincular la biblioteca de Redis a la mezcla, manejando solo un mensaje. Desde el get_message() El método está bloqueando, necesitamos introducir algunos subprocesos en la mezcla. No es una buena idea realizar una gran cantidad de trabajo en este tipo de sistema asincrónico ya que todo lo demás se bloqueará. Por ejemplo:

A menos que se disponga lo contrario, se debe garantizar que las implementaciones de esta función finalicen muy rápidamente .

En un mundo ideal, la caja redis se construiría sobre una biblioteca como futuros y expondría todo esto de forma nativa.

extern crate redis;
extern crate futures;

use std::thread;
use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");

    let (tx, payloads) = stream::channel();

    let redis_thread = thread::spawn(move || {
        let msg = pubsub.get_message().expect("Unable to get message");
        let payload: Result<String, _> = msg.get_payload();
        tx.send(payload).forget();
    });

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
    redis_thread.join().expect("unable to join to thread");
}

Mi comprensión se vuelve más borrosa aquí. En un hilo separado, bloqueamos el mensaje y lo insertamos en el canal cuando lo recibimos. Lo que no entiendo es por qué necesitamos aferrarnos al mango del hilo. Esperaría que foo.forget se estaría bloqueando a sí mismo, esperando hasta que la transmisión esté vacía.

En una conexión telnet al servidor Redis, envíe esto:

publish rust awesome

Y verás que funciona. Agregar declaraciones de impresión muestra que (para mí) el foo.forget se ejecuta antes de que se genere el hilo.

Múltiples mensajes es más complicado. El Sender se consume a sí mismo para evitar que el lado generador se adelante demasiado al lado consumidor. Esto se logra devolviendo otro futuro de send ! Necesitamos sacarlo de allí para reutilizarlo en la próxima iteración del bucle:

extern crate redis;
extern crate futures;

use std::thread;
use std::sync::mpsc;

use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");

    let (tx, payloads) = stream::channel();

    let redis_thread = thread::spawn(move || {
        let mut tx = tx;

        while let Ok(msg) = pubsub.get_message() {
            let payload: Result<String, _> = msg.get_payload();

            let (next_tx_tx, next_tx_rx) = mpsc::channel();

            tx.send(payload).and_then(move |new_tx| {
                next_tx_tx.send(new_tx).expect("Unable to send successor channel tx");
                futures::finished(())
            }).forget();

            tx = next_tx_rx.recv().expect("Unable to receive successor channel tx");
        }
    });

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
    redis_thread.join().expect("unable to join to thread");
}

Estoy seguro de que habrá más ecosistema para este tipo de interoperación a medida que pase el tiempo. Por ejemplo, la caja de futuros-cpupool podría probablemente ampliarse para admitir un caso de uso similar a este.