sql >> Base de Datos >  >> RDS >> PostgreSQL

¿Cómo configurar un túnel SSH en Google Cloud Dataflow a un servidor de base de datos externo?

Problema resuelto ! No puedo creer que haya pasado dos días completos en esto... Estaba mirando en la dirección equivocada.

El problema no estaba relacionado con alguna configuración de red de Dataflow o GCP y, por lo que sé...

es verdad.

Por supuesto, el problema estaba en mi código:solo el problema se reveló solo en un entorno distribuido. Cometí el error de abrir el túnel desde el procesador principal de la tubería, en lugar de los trabajadores. Entonces, el túnel SSH estaba activo, pero no entre los trabajadores y el servidor de destino, ¡solo entre la tubería principal y el destino!

Para solucionar esto, tuve que cambiar mi solicitud DoFn para ajustar la ejecución de la consulta con el túnel:

class TunnelledSQLSourceDoFn(sql.SQLSourceDoFn):
"""Wraps SQLSourceDoFn in a ssh tunnel"""

def __init__(self, *args, **kwargs):
    self.dbport = kwargs["port"]
    self.dbhost = kwargs["host"]
    self.args = args
    self.kwargs = kwargs
    super().__init__(*args, **kwargs)

def process(self, query, *args, **kwargs):
    # Remote side of the SSH Tunnel
    remote_address = (self.dbhost, self.dbport)
    ssh_tunnel = (self.kwargs['ssh_host'], self.kwargs['ssh_port'])
    with open_tunnel(
        ssh_tunnel,
        ssh_username=self.kwargs["ssh_user"],
        ssh_password=self.kwargs["ssh_password"],
        remote_bind_address=remote_address,
        set_keepalive=10.0
    ) as tunnel:
        forwarded_port = tunnel.local_bind_port
        self.kwargs["port"] = forwarded_port
        source = sql.SQLSource(*self.args, **self.kwargs)
        sql.SQLSouceInput._build_value(source, source.runtime_params)
        logging.info("Processing - {}".format(query))
        for records, schema in source.client.read(query):
            for row in records:
                yield source.client.row_as_dict(row, schema)

como puede ver, tuve que anular algunos fragmentos de la biblioteca pysql_beam.

Finalmente, cada trabajador abre su propio túnel para cada solicitud. Probablemente sea posible optimizar este comportamiento, pero es suficiente para mis necesidades.