sql >> Base de Datos >  >> RDS >> PostgreSQL

Cuelgue el script de Python usando SQLAlchemy y multiprocesamiento

Creo que el TypeError proviene de multiprocessing de get .

Eliminé todo el código DB de su script. Echa un vistazo a esto:

import multiprocessing
import sqlalchemy.exc

def do(kwargs):
    i = kwargs['i']
    print i
    raise sqlalchemy.exc.ProgrammingError("", {}, None)
    return i


pool = multiprocessing.Pool(processes=5)               # start 4 worker processes
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously

# Use get or wait?
# r.get()
r.wait()

pool.close()
pool.join()
print results

Usando r.wait devuelve el resultado esperado, pero usando r.get lanza TypeError . Como se describe en documentos de Python , usa r.wait después de un map_async .

Editar :Tengo que modificar mi respuesta anterior. Ahora creo el TypeError proviene de SQLAlchemy. He modificado mi secuencia de comandos para reproducir el error.

Editar 2 :Parece que el problema es que multiprocessing.pool no funciona bien si algún trabajador genera una excepción cuyo constructor requiere un parámetro (ver también aquí ).

He modificado mi guión para resaltar esto.

import multiprocessing

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

class GoodExc(Exception):
    def __init__(self, a=None):
        '''Optional param in the constructor.'''
        self.a = a

def do(kwargs):
    i = kwargs['i']
    print i
    raise BadExc('a')
    # raise GoodExc('a')
    return i

pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
    # set a timeout in order to be able to catch C-c
    r.get(1e100)
except KeyboardInterrupt:
    pass
print results

En su caso, dado que su código genera una excepción de SQLAlchemy, la única solución que se me ocurre es capturar todas las excepciones en do función y volver a generar una Exception normal en cambio. Algo como esto:

import multiprocessing

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

def do(kwargs):
    try:
        i = kwargs['i']
        print i
        raise BadExc('a')
        return i
    except Exception as e:
        raise Exception(repr(e))

pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
    # set a timeout in order to be able to catch C-c
    r.get(1e100)
except KeyboardInterrupt:
    pass
print results

Editar 3 :entonces, parece ser un error con Python , pero las excepciones adecuadas en SQLAlchemy lo solucionarían:por lo tanto, he planteé el problema con SQLAlchemy , también.

Como solución alternativa al problema, creo que la solución al final de Editar 2 haría (envolviendo las devoluciones de llamada en try-except y re-raise).