Esta parte del tutorial detalla cómo implementar una cola de tareas de Redis para manejar el procesamiento de texto.
Actualizaciones:
- 12/02/2020:se actualizó a la versión 3.8.1 de Python, así como a las versiones más recientes de Redis, Python Redis y RQ. Consulte a continuación para obtener más información. Mencione un error en la última versión de RQ y proporcione una solución. Se resolvió el error http antes que https.
- 22/03/2016:actualizado a la versión 3.5.1 de Python, así como a las versiones más recientes de Redis, Python Redis y RQ. Vea a continuación para obtener más detalles.
- 22/02/2015:Se agregó compatibilidad con Python 3.
Bonificación gratuita: Haga clic aquí para obtener acceso a un videotutorial gratuito de Flask + Python que le muestra cómo crear la aplicación web de Flask, paso a paso.
Recuerda:esto es lo que estamos creando:una aplicación Flask que calcula pares de palabras y frecuencias en función del texto de una URL determinada.
- Primera parte:configure un entorno de desarrollo local y luego implemente un entorno de ensayo y uno de producción en Heroku.
- Segunda parte:configure una base de datos PostgreSQL junto con SQLAlchemy y Alembic para manejar las migraciones.
- Tercera parte:agregue la lógica de back-end para raspar y luego procesar los recuentos de palabras de una página web utilizando las bibliotecas de solicitudes, BeautifulSoup y Natural Language Toolkit (NLTK).
- Cuarta parte:implementar una cola de tareas de Redis para manejar el procesamiento de texto. (actual )
- Quinta parte:Configure Angular en el front-end para sondear continuamente el back-end para ver si la solicitud ha terminado de procesarse.
- Seis partes:enviar al servidor de prueba en Heroku:configurar Redis y detallar cómo ejecutar dos procesos (web y trabajador) en un solo Dyno.
- Séptima parte:actualice la interfaz para que sea más fácil de usar.
- Octava parte:Cree una directiva angular personalizada para mostrar un gráfico de distribución de frecuencia usando JavaScript y D3.
¿Necesitas el código? Cógelo del repositorio.
Requisitos de instalación
Herramientas utilizadas:
- Redis (5.0.7)
- Python Redis (3.4.1)
- RQ (1.2.2):una biblioteca simple para crear una cola de tareas
Comience descargando e instalando Redis desde el sitio oficial o a través de Homebrew (brew install redis
). Una vez instalado, inicie el servidor Redis:
$ redis-server
A continuación, instale Python Redis y RQ en una nueva ventana de terminal:
$ cd flask-by-example
$ python -m pip install redis==3.4.1 rq==1.2.2
$ python -m pip freeze > requirements.txt
Configurar el Trabajador
Comencemos por crear un proceso de trabajo para escuchar las tareas en cola. Cree un nuevo archivo worker.py y agrega este código:
import os
import redis
from rq import Worker, Queue, Connection
listen = ['default']
redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')
conn = redis.from_url(redis_url)
if __name__ == '__main__':
with Connection(conn):
worker = Worker(list(map(Queue, listen)))
worker.work()
Aquí, escuchamos una cola llamada default
y estableció una conexión con el servidor Redis en localhost:6379
.
Inicie esto en otra ventana de terminal:
$ cd flask-by-example
$ python worker.py
17:01:29 RQ worker started, version 0.5.6
17:01:29
17:01:29 *** Listening on default...
Ahora necesitamos actualizar nuestro app.py para enviar trabajos a la cola...
Actualizar app.py
Agregue las siguientes importaciones a app.py :
from rq import Queue
from rq.job import Job
from worker import conn
Luego actualice la sección de configuración:
app = Flask(__name__)
app.config.from_object(os.environ['APP_SETTINGS'])
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
db = SQLAlchemy(app)
q = Queue(connection=conn)
from models import *
q = Queue(connection=conn)
configuró una conexión Redis e inicializó una cola basada en esa conexión.
Mueva la funcionalidad de procesamiento de texto fuera de nuestra ruta de índice y a una nueva función llamada count_and_save_words()
. Esta función acepta un argumento, una URL, que le pasaremos cuando la llamemos desde nuestra ruta de índice.
def count_and_save_words(url):
errors = []
try:
r = requests.get(url)
except:
errors.append(
"Unable to get URL. Please make sure it's valid and try again."
)
return {"error": errors}
# text processing
raw = BeautifulSoup(r.text).get_text()
nltk.data.path.append('./nltk_data/') # set the path
tokens = nltk.word_tokenize(raw)
text = nltk.Text(tokens)
# remove punctuation, count raw words
nonPunct = re.compile('.*[A-Za-z].*')
raw_words = [w for w in text if nonPunct.match(w)]
raw_word_count = Counter(raw_words)
# stop words
no_stop_words = [w for w in raw_words if w.lower() not in stops]
no_stop_words_count = Counter(no_stop_words)
# save the results
try:
result = Result(
url=url,
result_all=raw_word_count,
result_no_stop_words=no_stop_words_count
)
db.session.add(result)
db.session.commit()
return result.id
except:
errors.append("Unable to add item to database.")
return {"error": errors}
@app.route('/', methods=['GET', 'POST'])
def index():
results = {}
if request.method == "POST":
# this import solves a rq bug which currently exists
from app import count_and_save_words
# get url that the person has entered
url = request.form['url']
if not url[:8].startswith(('https://', 'http://')):
url = 'http://' + url
job = q.enqueue_call(
func=count_and_save_words, args=(url,), result_ttl=5000
)
print(job.get_id())
return render_template('index.html', results=results)
Toma nota del siguiente código:
job = q.enqueue_call(
func=count_and_save_words, args=(url,), result_ttl=5000
)
print(job.get_id())
count_and_save_words
función en nuestra función index
ya que el paquete RQ actualmente tiene un error, donde no encontrará funciones en el mismo módulo.
Aquí usamos la cola que inicializamos anteriormente y llamamos enqueue_call()
función. Esto agregó un nuevo trabajo a la cola y ese trabajo ejecutó count_and_save_words()
función con la URL como argumento. El result_ttl=5000
El argumento de línea le dice a RQ cuánto tiempo debe retener el resultado del trabajo:5000 segundos, en este caso. Luego enviamos la identificación del trabajo a la terminal. Esta identificación es necesaria para ver si el trabajo ha terminado de procesarse.
Configuremos una nueva ruta para eso...
Obtener resultados
@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):
job = Job.fetch(job_key, connection=conn)
if job.is_finished:
return str(job.result), 200
else:
return "Nay!", 202
Probemos esto.
Inicie el servidor, navegue a http://localhost:5000/, use la URL https://realpython.com y tome la identificación del trabajo de la terminal. Luego use esa identificación en el punto final '/results/', es decir, http://localhost:5000/results/ef600206-3503-4b87-a436-ddd9438f2197.
Siempre que hayan transcurrido menos de 5000 segundos antes de verificar el estado, debería ver un número de identificación, que se genera cuando agregamos los resultados a la base de datos:
# save the results
try:
from models import Result
result = Result(
url=url,
result_all=raw_word_count,
result_no_stop_words=no_stop_words_count
)
db.session.add(result)
db.session.commit()
return result.id
Ahora, refactoricemos ligeramente la ruta para devolver los resultados reales de la base de datos en JSON:
@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):
job = Job.fetch(job_key, connection=conn)
if job.is_finished:
result = Result.query.filter_by(id=job.result).first()
results = sorted(
result.result_no_stop_words.items(),
key=operator.itemgetter(1),
reverse=True
)[:10]
return jsonify(results)
else:
return "Nay!", 202
Asegúrese de agregar la importación:
from flask import jsonify
Prueba esto de nuevo. Si todo salió bien, debería ver algo similar a en su navegador:
[
[
"Python",
315
],
[
"intermediate",
167
],
[
"python",
161
],
[
"basics",
118
],
[
"web-dev",
108
],
[
"data-science",
51
],
[
"best-practices",
49
],
[
"advanced",
45
],
[
"django",
43
],
[
"flask",
41
]
]
¿Qué sigue?
Bonificación gratuita: Haga clic aquí para obtener acceso a un videotutorial gratuito de Flask + Python que le muestra cómo crear la aplicación web de Flask, paso a paso.
En la Parte 5, uniremos el cliente y el servidor agregando Angular a la mezcla para crear un sondeador, que enviará una solicitud cada cinco segundos a /results/<job_key>
punto final que solicita actualizaciones. Una vez que los datos estén disponibles, los agregaremos al DOM.
¡Salud!
Esta es una pieza de colaboración entre Cam Linke, cofundador de Startup Edmonton, y la gente de Real Python