sql >> Base de Datos >  >> RDS >> PostgreSQL

Problemas de conexión con SQLAlchemy y múltiples procesos

Citando "¿Cómo uso motores/conexiones/sesiones con multiprocesamiento Python u os.fork()?" con énfasis añadido:

El objeto SQLAlchemy Engine hace referencia a un grupo de conexiones de conexiones de bases de datos existentes. Entonces, cuando este objeto se replica en un proceso secundario, el objetivo es garantizar que no se transfieran conexiones a la base de datos .

y

Sin embargo, para el caso de que se comparta una sesión o conexión con transacciones activas, no existe una solución automática para esto; una aplicación debe garantizar que un nuevo proceso secundario solo inicie nuevos objetos y transacciones de conexión, así como objetos de sesión ORM.

El problema se deriva del proceso secundario bifurcado que hereda la session global en vivo , que se aferra a una Connection . Cuando target llama a init , sobrescribe las referencias globales a engine y session , disminuyendo así sus refcounts a 0 en el niño, obligándolos a finalizar. Si, por ejemplo, de una forma u otra crea otra referencia a la sesión heredada en el elemento secundario, evita que se limpie, pero no lo haga. Después de main se ha unido y vuelve al negocio como de costumbre, está tratando de usar la conexión ahora potencialmente finalizada, o de otro modo no sincronizada. En cuanto a por qué esto causa un error solo después de una cierta cantidad de iteraciones, no estoy seguro.

La única forma de manejar esta situación usando globales de la forma en que lo haces es

  1. Cerrar todas las sesiones
  2. Llamar a engine.dispose()

antes de bifurcar. Esto evitará que las conexiones se filtren al niño. Por ejemplo:

def main():
    global session
    init()
    try:
        dummy = Dummy(value=1)
        session.add(dummy)
        session.commit()
        dummy_id = dummy.id
        # Return the Connection to the pool
        session.close()
        # Dispose of it!
        engine.dispose()
        # ...or call your cleanup() function, which does the same
        p = multiprocessing.Process(target=target, args=(dummy_id,))
        p.start()
        p.join()
        # Start a new session
        session = Session()
        dummy = session.query(Dummy).get(dummy_id)
        assert dummy.value == 2
    finally:
        cleanup()

Su segundo ejemplo no desencadena la finalización en el elemento secundario, por lo que solo parece funcionar, aunque podría estar tan dañado como el primero, ya que aún hereda una copia de la sesión y su conexión definida localmente en main .