sql >> Base de Datos >  >> RDS >> Mysql

Automatice la carga masiva de datos desde s3 a la instancia RDS de Aurora MySQL

El enfoque es como se indicó anteriormente, tener un disparador de eventos S3 y un trabajo lambda escuchando en la ubicación del objeto/cubo s3. Tan pronto como se cargue un archivo en la ubicación s3, se ejecutará el trabajo de lambda y, en la lambda, puede configurar para llamar a un trabajo de AWS Glue. Esto es exactamente lo que hemos hecho y se ha puesto en marcha con éxito. Lambda tiene una vida útil de 15 minutos, y debería tomar menos de un minuto activar/iniciar un trabajo de Glue.

Encuentre aquí una fuente de muestra como referencia.

from __future__ import print_function
import json
import boto3
import time
import urllib

print('Loading function')

s3 = boto3.client('s3')
glue = boto3.client('glue')

def lambda_handler(event, context):
    gluejobname="your-glue-job-name here"

    try:
        runId = glue.start_job_run(JobName=gluejobname)
        status = glue.get_job_run(JobName=gluejobname, RunId=runId['JobRunId'])
        print("Job Status : ", status['JobRun']['JobRunState'])
    except Exception as e:
        print(e)
        print('Error getting object {} from bucket {}. Make sure they exist '
              'and your bucket is in the same region as this '
              'function.'.format(source_bucket, source_bucket))
    raise e

Para crear una función de Lambda, vaya a AWS Lambdra->Crear una nueva función desde cero->Seleccione S3 para el evento y luego configure las ubicaciones de depósito de S3, los prefijos según sea necesario. Luego, copie y pegue el ejemplo de código anterior, el área de código en línea y configure el nombre del trabajo de pegado según sea necesario. Asegúrese de tener todas las funciones de IAM/configuración de acceso requeridas.

El trabajo de pegado debe tener una disposición para conectarse a su Aurora, y luego puede usar el comando "CARGAR DESDE S3..." proporcionado por Aurora. Asegúrese de que todos los ajustes/configuraciones del grupo de parámetros se realicen según sea necesario.

Avísame si hay algún problema.

ACTUALIZACIÓN:fragmento de código de MUESTRA para CARGAR DESDE S3:

conn = mysql.connector.connect(host=url, user=uname, password=pwd, database=dbase)
cur = conn.cursor()
cur, conn = connect()
createStgTable1 = "DROP TABLE IF EXISTS mydb.STG_TABLE;"
createStgTable2 = "CREATE TABLE mydb.STG_TABLE(COL1 VARCHAR(50) NOT NULL, COL2 VARCHAR(50), COL3 VARCHAR(50), COL4 CHAR(1) NOT NULL);"
loadQry = "LOAD DATA FROM S3 PREFIX 's3://<bucketname>/folder' REPLACE INTO TABLE mydb.STG_TABLE FIELDS TERMINATED BY '|' LINES TERMINATED BY '\n' IGNORE 1 LINES (@var1, @var2, @var3, @var4) SET col1= @var1, col2= @var2, col3= @var3, [email protected];"
cur.execute(createStgTable1)
cur.execute(createStgTable2)
cur.execute(loadQry)
conn.commit()
conn.close()