sql >> Base de Datos >  >> NoSQL >> Redis

Flask by Example:implementación de una cola de tareas de Redis

Esta parte del tutorial detalla cómo implementar una cola de tareas de Redis para manejar el procesamiento de texto.

Actualizaciones:

  • 12/02/2020:se actualizó a la versión 3.8.1 de Python, así como a las versiones más recientes de Redis, Python Redis y RQ. Consulte a continuación para obtener más información. Mencione un error en la última versión de RQ y proporcione una solución. Se resolvió el error http antes que https.
  • 22/03/2016:actualizado a la versión 3.5.1 de Python, así como a las versiones más recientes de Redis, Python Redis y RQ. Vea a continuación para obtener más detalles.
  • 22/02/2015:Se agregó compatibilidad con Python 3.

Bonificación gratuita: Haga clic aquí para obtener acceso a un videotutorial gratuito de Flask + Python que le muestra cómo crear la aplicación web de Flask, paso a paso.

Recuerda:esto es lo que estamos creando:una aplicación Flask que calcula pares de palabras y frecuencias en función del texto de una URL determinada.

  1. Primera parte:configure un entorno de desarrollo local y luego implemente un entorno de ensayo y uno de producción en Heroku.
  2. Segunda parte:configure una base de datos PostgreSQL junto con SQLAlchemy y Alembic para manejar las migraciones.
  3. Tercera parte:agregue la lógica de back-end para raspar y luego procesar los recuentos de palabras de una página web utilizando las bibliotecas de solicitudes, BeautifulSoup y Natural Language Toolkit (NLTK).
  4. Cuarta parte:implementar una cola de tareas de Redis para manejar el procesamiento de texto. (actual )
  5. Quinta parte:Configure Angular en el front-end para sondear continuamente el back-end para ver si la solicitud ha terminado de procesarse.
  6. Seis partes:enviar al servidor de prueba en Heroku:configurar Redis y detallar cómo ejecutar dos procesos (web y trabajador) en un solo Dyno.
  7. Séptima parte:actualice la interfaz para que sea más fácil de usar.
  8. Octava parte:Cree una directiva angular personalizada para mostrar un gráfico de distribución de frecuencia usando JavaScript y D3.

¿Necesitas el código? Cógelo del repositorio.


Requisitos de instalación

Herramientas utilizadas:

  • Redis (5.0.7)
  • Python Redis (3.4.1)
  • RQ (1.2.2):una biblioteca simple para crear una cola de tareas

Comience descargando e instalando Redis desde el sitio oficial o a través de Homebrew (brew install redis ). Una vez instalado, inicie el servidor Redis:

$ redis-server

A continuación, instale Python Redis y RQ en una nueva ventana de terminal:

$ cd flask-by-example
$ python -m pip install redis==3.4.1 rq==1.2.2
$ python -m pip freeze > requirements.txt


Configurar el Trabajador

Comencemos por crear un proceso de trabajo para escuchar las tareas en cola. Cree un nuevo archivo worker.py y agrega este código:

import os

import redis
from rq import Worker, Queue, Connection

listen = ['default']

redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')

conn = redis.from_url(redis_url)

if __name__ == '__main__':
    with Connection(conn):
        worker = Worker(list(map(Queue, listen)))
        worker.work()

Aquí, escuchamos una cola llamada default y estableció una conexión con el servidor Redis en localhost:6379 .

Inicie esto en otra ventana de terminal:

$ cd flask-by-example
$ python worker.py
17:01:29 RQ worker started, version 0.5.6
17:01:29
17:01:29 *** Listening on default...

Ahora necesitamos actualizar nuestro app.py para enviar trabajos a la cola...



Actualizar app.py

Agregue las siguientes importaciones a app.py :

from rq import Queue
from rq.job import Job
from worker import conn

Luego actualice la sección de configuración:

app = Flask(__name__)
app.config.from_object(os.environ['APP_SETTINGS'])
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
db = SQLAlchemy(app)

q = Queue(connection=conn)

from models import *

q = Queue(connection=conn) configuró una conexión Redis e inicializó una cola basada en esa conexión.

Mueva la funcionalidad de procesamiento de texto fuera de nuestra ruta de índice y a una nueva función llamada count_and_save_words() . Esta función acepta un argumento, una URL, que le pasaremos cuando la llamemos desde nuestra ruta de índice.

def count_and_save_words(url):

    errors = []

    try:
        r = requests.get(url)
    except:
        errors.append(
            "Unable to get URL. Please make sure it's valid and try again."
        )
        return {"error": errors}

    # text processing
    raw = BeautifulSoup(r.text).get_text()
    nltk.data.path.append('./nltk_data/')  # set the path
    tokens = nltk.word_tokenize(raw)
    text = nltk.Text(tokens)

    # remove punctuation, count raw words
    nonPunct = re.compile('.*[A-Za-z].*')
    raw_words = [w for w in text if nonPunct.match(w)]
    raw_word_count = Counter(raw_words)

    # stop words
    no_stop_words = [w for w in raw_words if w.lower() not in stops]
    no_stop_words_count = Counter(no_stop_words)

    # save the results
    try:
        result = Result(
            url=url,
            result_all=raw_word_count,
            result_no_stop_words=no_stop_words_count
        )
        db.session.add(result)
        db.session.commit()
        return result.id
    except:
        errors.append("Unable to add item to database.")
        return {"error": errors}


@app.route('/', methods=['GET', 'POST'])
def index():
    results = {}
    if request.method == "POST":
        # this import solves a rq bug which currently exists
        from app import count_and_save_words

        # get url that the person has entered
        url = request.form['url']
        if not url[:8].startswith(('https://', 'http://')):
            url = 'http://' + url
        job = q.enqueue_call(
            func=count_and_save_words, args=(url,), result_ttl=5000
        )
        print(job.get_id())

    return render_template('index.html', results=results)

Toma nota del siguiente código:

job = q.enqueue_call(
    func=count_and_save_words, args=(url,), result_ttl=5000
)
print(job.get_id())

Nota: Necesitamos importar el count_and_save_words función en nuestra función index ya que el paquete RQ actualmente tiene un error, donde no encontrará funciones en el mismo módulo.

Aquí usamos la cola que inicializamos anteriormente y llamamos enqueue_call() función. Esto agregó un nuevo trabajo a la cola y ese trabajo ejecutó count_and_save_words() función con la URL como argumento. El result_ttl=5000 El argumento de línea le dice a RQ cuánto tiempo debe retener el resultado del trabajo:5000 segundos, en este caso. Luego enviamos la identificación del trabajo a la terminal. Esta identificación es necesaria para ver si el trabajo ha terminado de procesarse.

Configuremos una nueva ruta para eso...



Obtener resultados

@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):

    job = Job.fetch(job_key, connection=conn)

    if job.is_finished:
        return str(job.result), 200
    else:
        return "Nay!", 202

Probemos esto.

Inicie el servidor, navegue a http://localhost:5000/, use la URL https://realpython.com y tome la identificación del trabajo de la terminal. Luego use esa identificación en el punto final '/results/', es decir, http://localhost:5000/results/ef600206-3503-4b87-a436-ddd9438f2197.

Siempre que hayan transcurrido menos de 5000 segundos antes de verificar el estado, debería ver un número de identificación, que se genera cuando agregamos los resultados a la base de datos:

# save the results
try:
    from models import Result
    result = Result(
        url=url,
        result_all=raw_word_count,
        result_no_stop_words=no_stop_words_count
    )
    db.session.add(result)
    db.session.commit()
    return result.id

Ahora, refactoricemos ligeramente la ruta para devolver los resultados reales de la base de datos en JSON:

@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):

    job = Job.fetch(job_key, connection=conn)

    if job.is_finished:
        result = Result.query.filter_by(id=job.result).first()
        results = sorted(
            result.result_no_stop_words.items(),
            key=operator.itemgetter(1),
            reverse=True
        )[:10]
        return jsonify(results)
    else:
        return "Nay!", 202

Asegúrese de agregar la importación:

from flask import jsonify

Prueba esto de nuevo. Si todo salió bien, debería ver algo similar a en su navegador:

[
  [
    "Python", 
    315
  ], 
  [
    "intermediate", 
    167
  ], 
  [
    "python", 
    161
  ], 
  [
    "basics", 
    118
  ], 
  [
    "web-dev", 
    108
  ], 
  [
    "data-science", 
    51
  ], 
  [
    "best-practices", 
    49
  ], 
  [
    "advanced", 
    45
  ], 
  [
    "django", 
    43
  ], 
  [
    "flask", 
    41
  ]
]


¿Qué sigue?

Bonificación gratuita: Haga clic aquí para obtener acceso a un videotutorial gratuito de Flask + Python que le muestra cómo crear la aplicación web de Flask, paso a paso.

En la Parte 5, uniremos el cliente y el servidor agregando Angular a la mezcla para crear un sondeador, que enviará una solicitud cada cinco segundos a /results/<job_key> punto final que solicita actualizaciones. Una vez que los datos estén disponibles, los agregaremos al DOM.

¡Salud!

Esta es una pieza de colaboración entre Cam Linke, cofundador de Startup Edmonton, y la gente de Real Python