sql >> Base de Datos >  >> NoSQL >> HBase

Sincronización de datos de clústeres de HBase con la herramienta HashTable/SyncTable

La replicación (tratada en este artículo de blog anterior) ha sido lanzada por un tiempo y es una de las funciones más utilizadas de Apache HBase. Tener clústeres que replican datos con diferentes pares es una implementación muy común, ya sea como una estrategia de recuperación ante desastres o simplemente como una forma perfecta de replicar datos entre entornos de producción/escenario/desarrollo. Si bien es una forma eficiente de mantener sincronizadas diferentes bases de datos HBase dentro de una latencia de menos de un segundo, la replicación solo opera sobre los datos ingeridos después de que se haya habilitado la función. Eso significa que cualquier dato preexistente en todos los clústeres involucrados en la implementación de la replicación aún deberá copiarse entre los pares de alguna otra manera. Existen bastantes herramientas que se pueden usar para sincronizar datos preexistentes en diferentes clústeres de pares. Snapshots, BulkLoad, CopyTable son ejemplos bien conocidos de tales herramientas cubiertas en publicaciones anteriores del blog de Cloudera. En este artículo vamos a cubrir HashTable/SyncTable, detallando parte de su lógica de implementación interna, las ventajas y desventajas de usarla y cómo se compara con algunas de las otras técnicas de copia de datos mencionadas anteriormente.

HashTable/SyncTable en pocas palabras

Tabla hash/tabla sincronizada es una herramienta implementada como dos trabajos de reducción de mapa que se ejecutan como pasos individuales. Se parece a la CopyTable herramienta, que puede realizar copias de datos de tablas parciales o completas. A diferencia de CopyTable solo copia los datos divergentes entre los clústeres de destino, lo que ahorra recursos informáticos y de red durante el procedimiento de copia.

El primer paso a ejecutar en el proceso es la HashTable trabajo de reducción de mapa. Esto debe ejecutarse en el clúster cuyos datos deben copiarse al igual remoto, normalmente el clúster de origen. A continuación, se muestra un ejemplo rápido de cómo ejecutarlo. Más adelante en este artículo se proporciona una explicación detallada de cada uno de los parámetros necesarios: 

hbase org.apache.hadoop.hbase.mapreduce.HashTable --families=cf my-table /hashes/test-tbl
…
20/04/28 05:05:48 INFO mapreduce.Job:  map 100% reduce 100%
20/04/28 05:05:49 INFO mapreduce.Job: Job job_1587986840019_0001 completed successfully
20/04/28 05:05:49 INFO mapreduce.Job: Counters: 68
…
File Input Format Counters 
Bytes Read=0
File Output Format Counters 
Bytes Written=6811788

Una vez que la tabla hash se completó la ejecución del trabajo con el comando anterior, se generaron algunos archivos de salida en la fuente hdfs /hashes/my-table directorio:

hdfs dfs -ls -R /hashes/test-tbl
drwxr-xr-x   - root supergroup          0 2020-04-28 05:05 /hashes/test-tbl/hashes
-rw-r--r--   2 root supergroup          0 2020-04-28 05:05 /hashes/test-tbl/hashes/_SUCCESS
drwxr-xr-x   - root supergroup          0 2020-04-28 05:05 /hashes/test-tbl/hashes/part-r-00000
-rw-r--r--   2 root supergroup    6790909 2020-04-28 05:05 /hashes/test-tbl/hashes/part-r-00000/data
-rw-r--r--   2 root supergroup      20879 2020-04-28 05:05 /hashes/test-tbl/hashes/part-r-00000/index
-rw-r--r--   2 root supergroup         99 2020-04-28 05:04 /hashes/test-tbl/manifest
-rw-r--r--   2 root supergroup        153 2020-04-28 05:04 /hashes/test-tbl/partitions

Estos son necesarios como entrada para la SyncTable correr. Tabla sincronizada debe iniciarse en el par de destino. El siguiente comando ejecuta SyncTable para la salida de HashTable del ejemplo anterior. Utiliza el dryrun opción explicada más adelante en este artículo:

hbase org.apache.hadoop.hbase.mapreduce.SyncTable --dryrun --sourcezkcluster=zk1.example.com,zk2.example.com,zk3.example.com:2181:/hbase hdfs://source-cluster-active-nn/hashes/test-tbl test-tbl test-tbl
…
org.apache.hadoop.hbase.mapreduce.SyncTable$SyncMapper$Counter
BATCHES=97148
HASHES_MATCHED=97146
HASHES_NOT_MATCHED=2
MATCHINGCELLS=17
MATCHINGROWS=2
RANGESNOTMATCHED=2
ROWSWITHDIFFS=2
SOURCEMISSINGCELLS=1
TARGETMISSINGCELLS=1

Como referencia rápida, puede simplemente reemplazar los parámetros dados en ambos ejemplos por los valores reales de su entorno. El resto de este artículo cubrirá los detalles de implementación con más profundidad.

¿Por qué dos pasos diferentes?

El objetivo principal de esta herramienta es identificar y copiar solo los datos que faltan entre los dos clústeres. tabla hash funciona como un trabajo de fragmentación/indexación, analizando lotes de datos de la tabla y generando índices hash para cada uno de estos lotes. Estos son los resultados escritos en los archivos bajo /hashes/my-table directorio hdfs pasado como uno de los parámetros del trabajo. Como se mencionó anteriormente, esta salida es requerida por SyncTable trabajo. Tabla sincronizada escanea la tabla de destino localmente en los mismos tamaños de lote que usa HashTable, y también calcula valores hash para estos lotes utilizando la misma función utilizada por HashTable. Luego compara el hash de lote local valor con el de la HashTable producción. Si los valores hash son iguales, significa que todo el lote es idéntico en los dos clústeres y no es necesario copiar nada en ese segmento. De lo contrario, abre un escaneo del lote en el clúster de origen, verificando si cada una de las celdas ya existe en el clúster de destino, copiando solo aquellas que divergen. En conjuntos de datos escasos y ligeramente diferentes, esto daría como resultado que se copien muchos menos datos entre los dos clústeres. También requeriría escanear solo una pequeña cantidad de celdas en la fuente para verificar si hay discrepancias.

Parámetros requeridos

tabla hash requiere solo dos parámetros:el nombre de la tabla y la ruta de salida donde se escribirán los hashes relacionados y otros archivos de metainformación. Tabla sincronizada utiliza tabla hash directorio de salida como entrada, junto con los nombres de las tablas en el clúster de origen y de destino, respectivamente. Ya que estamos usando HashTable /Tabla sincronizada para mover datos entre clústeres remotos, sourcezkcluster la opción debe estar definida para SyncTable . Esta debe ser la dirección de quórum de zookeeper del clúster de origen. En este artículo de ejemplo, también hicimos referencia directamente a la dirección del nodo de nombre activo del clúster de origen, de modo que SyncTable leerá el archivo de salida hash directamente desde el clúster de origen. Alternativamente, HashTable la salida se puede copiar manualmente del clúster de origen al clúster remoto (con distcp, por ejemplo).

NOTA:Trabajar con clústeres remotos en diferentes reinos de kerberos solo se admite a partir de CDH 6.2.1 en adelante.

Opciones avanzadas

Ambos tabla hash y SyncTable ofrecen opciones opcionales adicionales que se pueden ajustar para obtener resultados óptimos.

tabla hash permite filtrar los datos por clave de fila y hora de modificación, con startrow/starttime, stoprow/stoptime propiedades, respectivamente. El alcance del conjunto de datos también puede estar limitado por versiones y familias propiedades. El tamaño del lote La propiedad define el tamaño de cada porción que será procesada. Esto impacta directamente en el rendimiento de la sincronización. En casos de muy pocas discrepancias, establecer valores de tamaño de lote más grandes puede conducir a un mejor rendimiento, ya que se ignorarían porciones más grandes del conjunto de datos sin necesidad de que SyncTable lo escanee.

Tabla sincronizada proporciona una prueba opción que permite obtener una vista previa de los cambios que se aplicarán en el destino.

Tabla sincronizada el comportamiento predeterminado es reflejar los datos de origen en el lado de destino, por lo que cualquier celda adicional presente en el destino pero ausente en el origen termina siendo eliminada en el lado de destino. Eso puede ser indeseable cuando se sincronizan clústeres en una configuración de replicación Activo-Activo y, para tales casos, doDeletes las opciones se pueden convertir en falsas, omitiendo la replicación de eliminaciones en el destino. También hay un doPuts similar marca para los casos en los que no se deben insertar celdas adicionales en el clúster de destino.

Análisis de los resultados

tabla hash genera algunos archivos con metainformación para SyncTable, esos, sin embargo, no son legibles por humanos. No realiza ningún cambio en los datos existentes, por lo que la información relacionada tiene poco interés para el contexto de un usuario.

Tabla sincronizada es el paso que realmente aplica las modificaciones en el destino y podría ser importante revisar su resumen antes de modificar los datos del clúster de destino (ver dryrun opción mencionada anteriormente). Publica algunos contadores relevantes al final del mapa para reducir la ejecución. Mirando los valores del ejemplo anterior, podemos ver que había 97148 particiones con hash (reportado por BATCHES contador), que SyncTable detectó divergencias en solo dos de ellos (según el HASHES_MATCHED y HASHES_NOT_MACTHED contadores). Además, dentro de las dos particiones que tienen hashes diferentes,  17 celdas en 2 filas coincidían (según lo informado por MATCHING_CELLS y FILAS_COINCIDENTES, respectivamente), pero también había 2 filas divergentes, en estas dos particiones (según RANGESNOTMATCHED y FILAS CON DIFERENCIAS ). Finalmente, SOURCEMISSINGCELLS y TARGETMISSINGCELLS díganos en detalle si las celdas estaban presentes solo en el grupo de origen o de destino. En este ejemplo, el clúster de origen tenía una celda que no estaba en el destino, pero el destino también tenía una celda que no estaba en el origen. Desde SyncTable se ejecutó sin especificar dryrun opción y configuración doDeletes opción a falso , el trabajo eliminó la celda adicional en el clúster de destino y agregó la celda adicional que se encuentra en el origen al clúster de destino. Asumiendo que no hay escrituras ocurrir en cualquiera de los clústeres, una ejecución posterior de la misma SyncTable el comando en el clúster de destino no mostraría diferencias:

hbase org.apache.hadoop.hbase.mapreduce.SyncTable --sourcezkcluster=zk1.example.com,zk2.example.com,zk3.example.com:2181:/hbase hdfs://nn:9000/hashes/test-tbl test-tbl test-tbl
…
org.apache.hadoop.hbase.mapreduce.SyncTable$SyncMapper$Counter
BATCHES=97148
HASHES_MATCHED=97148
…

Escenarios aplicables

Sincronización de datos

A primera vista, HashTable/SyncTable puede parecer que se superpone con la CopyTable herramienta, pero todavía hay escenarios específicos en los que cualquiera de las herramientas sería más adecuada. Como primer ejemplo de comparación, usando HashTable/SyncTable para una carga inicial de una tabla que contiene 100.004 filas y un tamaño total de datos de 5,17 GB, se necesitaron unos minutos solo para SyncTable para completar:

...
20/04/29 03:48:00 INFO mapreduce.Job: Running job: job_1587985272792_0011
20/04/29 03:48:09 INFO mapreduce.Job: Job job_1587985272792_0011 running in uber mode : false
20/04/29 03:48:09 INFO mapreduce.Job:  map 0% reduce 0%
20/04/29 03:54:08 INFO mapreduce.Job:  map 100% reduce 0%
20/04/29 03:54:09 INFO mapreduce.Job: Job job_1587985272792_0011 completed successfully
…
org.apache.hadoop.hbase.mapreduce.SyncTable$SyncMapper$Counter
BATCHES=97148
EMPTY_BATCHES=97148
HASHES_NOT_MATCHED=97148
RANGESNOTMATCHED=97148
ROWSWITHDIFFS=100004
TARGETMISSINGCELLS=749589
TARGETMISSINGROWS=100004

Incluso en un conjunto de datos tan pequeño, CopyTable ejecutado más rápido (aproximadamente 3 minutos, mientras que SyncTable tardó 6 minutos en copiar todo el conjunto de datos):

...
20/04/29 05:12:07 INFO mapreduce.Job: Running job: job_1587986840019_0005
20/04/29 05:12:24 INFO mapreduce.Job: Job job_1587986840019_0005 running in uber mode : false
20/04/29 05:12:24 INFO mapreduce.Job:  map 0% reduce 0%
20/04/29 05:13:16 INFO mapreduce.Job:  map 25% reduce 0%
20/04/29 05:13:49 INFO mapreduce.Job:  map 50% reduce 0%
20/04/29 05:14:37 INFO mapreduce.Job:  map 75% reduce 0%
20/04/29 05:15:14 INFO mapreduce.Job:  map 100% reduce 0%
20/04/29 05:15:14 INFO mapreduce.Job: Job job_1587986840019_0005 completed successfully
…
HBase Counters
BYTES_IN_REMOTE_RESULTS=2787236791
BYTES_IN_RESULTS=5549784428
MILLIS_BETWEEN_NEXTS=130808
NOT_SERVING_REGION_EXCEPTION=0
NUM_SCANNER_RESTARTS=0
NUM_SCAN_RESULTS_STALE=0
REGIONS_SCANNED=4
REMOTE_RPC_CALLS=1334
REMOTE_RPC_RETRIES=0
ROWS_FILTERED=0
ROWS_SCANNED=100004
RPC_CALLS=2657
RPC_RETRIES=0

Ahora usemos ambas herramientas nuevamente para lidiar con las escasas diferencias en el conjunto de datos. La prueba-tbl La tabla utilizada en todos estos ejemplos tiene cuatro regiones en el clúster de origen. Después de que todo el conjunto de datos original se haya copiado en el clúster de destino en el ejemplo anterior, agregamos cuatro filas adicionales solo en el lado de origen, una para cada una de las regiones existentes, luego ejecutamos HashTable/SyncTable de nuevo para sincronizar ambos grupos:

20/04/29 05:29:23 INFO mapreduce.Job: Running job: job_1587985272792_0013
20/04/29 05:29:39 INFO mapreduce.Job: Job job_1587985272792_0013 running in uber mode : false
20/04/29 05:29:39 INFO mapreduce.Job:  map 0% reduce 0%
20/04/29 05:29:53 INFO mapreduce.Job:  map 50% reduce 0%
20/04/29 05:30:42 INFO mapreduce.Job:  map 100% reduce 0%
20/04/29 05:30:42 INFO mapreduce.Job: Job job_1587985272792_0013 completed successfully
…
org.apache.hadoop.hbase.mapreduce.SyncTable$SyncMapper$Counter
BATCHES=97148
HASHES_MATCHED=97144
HASHES_NOT_MATCHED=4
MATCHINGCELLS=42
MATCHINGROWS=5
RANGESNOTMATCHED=4
ROWSWITHDIFFS=4
TARGETMISSINGCELLS=4
TARGETMISSINGROWS=4

Podemos ver eso con solo cuatro las particiones no coinciden, SyncTable fue considerablemente más rápido (aproximadamente un minuto para completar). Uso de Copiar tabla para realizar esta misma sincronización mostró los siguientes resultados:

20/04/29 08:32:38 INFO mapreduce.Job: Running job: job_1587986840019_0008
20/04/29 08:32:52 INFO mapreduce.Job: Job job_1587986840019_0008 running in uber mode : false
20/04/29 08:32:52 INFO mapreduce.Job:  map 0% reduce 0%
20/04/29 08:33:38 INFO mapreduce.Job:  map 25% reduce 0%
20/04/29 08:34:15 INFO mapreduce.Job:  map 50% reduce 0%
20/04/29 08:34:48 INFO mapreduce.Job:  map 75% reduce 0%
20/04/29 08:35:31 INFO mapreduce.Job:  map 100% reduce 0%
20/04/29 08:35:32 INFO mapreduce.Job: Job job_1587986840019_0008 completed successfully
…
HBase Counters
BYTES_IN_REMOTE_RESULTS=2762547723
BYTES_IN_RESULTS=5549784600
MILLIS_BETWEEN_NEXTS=340672
NOT_SERVING_REGION_EXCEPTION=0
NUM_SCANNER_RESTARTS=0
NUM_SCAN_RESULTS_STALE=0
REGIONS_SCANNED=4
REMOTE_RPC_CALLS=1323
REMOTE_RPC_RETRIES=0
ROWS_FILTERED=0
ROWS_SCANNED=100008
RPC_CALLS=2657
RPC_RETRIES=0

Copiar tabla tardó la misma cantidad de tiempo en sincronizar las tablas que cuando se copió todo el conjunto de datos, aunque solo había cuatro celdas a copiar. Esto todavía estaba bien para este conjunto de datos muy pequeño, y con un clúster inactivo, pero en casos de uso de producción con conjuntos de datos más grandes y donde el clúster de destino también puede estar en uso por muchas aplicaciones cliente que escriben datos en él, CopyTable degradación del rendimiento en comparación con SyncTable sería aún mayor.

Vale la pena mencionar que también hay herramientas/características adicionales que podrían usarse en combinación para una carga inicial de un clúster de destino (el destino no tiene ningún dato), como la exportación de instantáneas, la carga masiva o incluso una copia directa del original. directorios de tabla del clúster de origen. Para cargas iniciales con grandes cantidades de datos para copiar, tomar una instantánea de la tabla y luego usar ExportSnapshot La herramienta superará a las herramientas de copia en línea como SyncTable o Copiar tabla.

Comprobación de la integridad de la replicación

Otro uso común de HashTable/SyncTable es monitorear el estado de replicación entre clústeres, al solucionar posibles problemas de replicación. En este escenario, funciona como una alternativa a la herramienta VerifyReplication. Por lo general, cuando se verifica el estado entre dos clústeres, no hay discrepancias en absoluto o un problema temporal ocasional ha provocado que una pequeña parte del conjunto de datos más grande pierda la sincronización. En el entorno de prueba que hemos estado usando para nuestro ejemplo anterior, debería haber 100 008 filas con valores coincidentes en ambos clústeres. Ejecutar SyncTable en el clúster de destino con la opción dryrun nos permitirá identificar cualquier diferencia:

20/05/04 10:47:25 INFO mapreduce.Job: Running job: job_1588611199158_0004
…
20/05/04 10:48:48 INFO mapreduce.Job:  map 100% reduce 0%
20/05/04 10:48:48 INFO mapreduce.Job: Job job_1588611199158_0004 completed successfully
…
HBase Counters
BYTES_IN_REMOTE_RESULTS=3753476784
BYTES_IN_RESULTS=5549784600
ROWS_SCANNED=100008
…
org.apache.hadoop.hbase.mapreduce.SyncTable$SyncMapper$Counter
BATCHES=97148
HASHES_MATCHED=97148
...

Unlike SyncTable we must run the VerifyReplication tool on the source cluster. We pass the peer id as one of its parameters so that it can find the remote cluster to scan for comparison:
20/05/04 11:01:58 INFO mapreduce.Job: Running job: job_1588611196128_0001
…
20/05/04 11:04:39 INFO mapreduce.Job:  map 100% reduce 0%
20/05/04 11:04:39 INFO mapreduce.Job: Job job_1588611196128_0001 completed successfully
…
HBase Counters
BYTES_IN_REMOTE_RESULTS=2761955495
BYTES_IN_RESULTS=5549784600
…

org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication$Verifier$Counters
GOODROWS=100008
...

Sin diferencias, SyncTable encuentra todos los hashes que coinciden entre las particiones de origen y de destino y, por lo tanto, evita la necesidad de escanear el clúster de origen remoto nuevamente. VerifyReplication realiza una comparación uno por uno para cada celda en ambos clústeres, lo que ya podría conllevar un alto costo de red incluso cuando se trata de conjuntos de datos tan pequeños.

Agregar una fila adicional en el clúster de origen y realizar las comprobaciones nuevamente. Con VerificarReplicación:

20/05/05 11:14:05 INFO mapreduce.Job: Running job: job_1588611196128_0004
…
20/05/05 11:16:32 INFO mapreduce.Job:  map 100% reduce 0%
20/05/05 11:16:32 INFO mapreduce.Job: Job job_1588611196128_0004 completed successfully
…
org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication$Verifier$Counters
BADROWS=1
GOODROWS=100008
ONLY_IN_SOURCE_TABLE_ROWS=1
…

Antes de que podamos usar SyncTable, tenemos que volver a generar hashes en la fuente con HashTable, ya que ahora hay una nueva celda:

20/05/04 11:31:48 INFO mapreduce.Job: Running job: job_1588611196128_0003
…
20/05/04 11:33:15 INFO mapreduce.Job: Job job_1588611196128_0003 completed successfully
...

Ahora SyncTable:

20/05/07 05:47:51 INFO mapreduce.Job: Running job: job_1588611199158_0014
…

20/05/07 05:49:20 INFO mapreduce.Job: Job job_1588611199158_0014 completed successfully

org.apache.hadoop.hbase.mapreduce.SyncTable$SyncMapper$Counter
BATCHES=97148
HASHES_NOT_MATCHED=97148
MATCHINGCELLS=749593
MATCHINGROWS=100008
RANGESMATCHED=97147
RANGESNOTMATCHED=1
ROWSWITHDIFFS=1
TARGETMISSINGCELLS=1
TARGETMISSINGROWS=1

Podemos ver el aumento en el tiempo de ejecución debido al escaneo adicional y la comparación de celdas entre los dos clústeres remotos. Mientras tanto, el tiempo de ejecución de VerifyReplication mostró poca variación.

Conclusión

Tabla hash/tabla sincronizada es una herramienta valiosa para mover datos, cuando se trata de discrepancias escasas entre dos conjuntos de datos de clústeres. Hace uso de la partición y el hashing de datos para detectar de manera eficiente las diferencias en los rangos de los dos conjuntos de datos, lo que reduce la cantidad de celdas que se escanearán mientras se comparan los datos de los dos grupos, al mismo tiempo que evita la colocación innecesaria de valores ya existentes en el grupo de destino. Sin embargo, no es una bala de plata, como se demuestra en algunos de los ejemplos anteriores, aunque puede parecer que se superpone en función con CopyTable herramienta, este último puede ofrecer un mejor rendimiento cuando se maneja un rango secuencial más grande de celdas que no coinciden. En ese sentido, HashTable/SyncTable sería más aplicable en los casos en que ambos clústeres ya tengan algunos datos residentes, o en los casos en que una configuración de replicación existente se haya interrumpido por la falta de disponibilidad temporal de uno de los pares.

Artículos relacionados:

https://blog.cloudera.com/apache-hbase-replication-overview/

https://blog.cloudera.com/approaches-to-backup-and-disaster-recovery-in-hbase/

https://blog.cloudera.com/online-apache-hbase-backups-with-copytable/

https://blog.cloudera.com/introduction-to-apache-hbase-snapshots/