sql >> Base de Datos >  >> NoSQL >> HBase

Cómo hacerlo:usar la carga masiva de HBase y por qué

Apache HBase se trata de brindarle acceso aleatorio, en tiempo real, de lectura/escritura a su Big Data, pero, en primer lugar, ¿cómo puede obtener esos datos de manera eficiente en HBase? Intuitivamente, un nuevo usuario intentará hacerlo a través de las API del cliente o mediante un trabajo de MapReduce con TableOutputFormat, pero esos enfoques son problemáticos, como aprenderá a continuación. En cambio, la función de carga masiva de HBase es mucho más fácil de usar y puede insertar la misma cantidad de datos más rápidamente.

Esta publicación de blog presentará los conceptos básicos de la función de carga masiva, presentará dos casos de uso y propondrá dos ejemplos.

Descripción general de la carga masiva

Si tiene alguno de estos síntomas, la carga masiva probablemente sea la mejor opción para usted:

  • Necesitabas ajustar tus MemStores para usar la mayor parte de la memoria.
  • Necesitabas usar WAL más grandes o evitarlos por completo.
  • Tus colas de compactación y vaciado son cientos.
  • Su GC está fuera de control porque sus inserciones varían en MB.
  • Su latencia sale de su SLA cuando importa datos.

La mayoría de esos síntomas se conocen comúnmente como "dolores de crecimiento". El uso de la carga masiva puede ayudarlo a evitarlos.

En HBase-speak, la carga masiva es el proceso de preparar y cargar HFiles (el formato de archivo propio de HBase) directamente en RegionServers, evitando así la ruta de escritura y eliminando esos problemas por completo. Este proceso es similar a ETL y se ve así:

1. Extraiga los datos de una fuente, normalmente archivos de texto u otra base de datos. HBase no gestiona esta parte del proceso. En otras palabras, no puede decirle a HBase que prepare HFiles leyéndolos directamente desde MySQL, sino que debe hacerlo por sus propios medios. Por ejemplo, podría ejecutar mysqldump en una tabla y cargar los archivos resultantes en HDFS o simplemente obtener sus archivos de registro HTTP de Apache. En cualquier caso, sus datos deben estar en HDFS antes del siguiente paso.

2. Transforme los datos en HFiles. Este paso requiere un trabajo de MapReduce y para la mayoría de los tipos de entrada tendrá que escribir Mapper usted mismo. El trabajo deberá emitir la clave de fila como clave y un valor de clave, una opción de colocación o una eliminación como valor. El Reductor es manejado por HBase; lo configura usando HFileOutputFormat.configureIncrementalLoad() y hace lo siguiente:

  • Inspecciona la tabla para configurar un particionador de orden total
  • Carga el archivo de particiones al clúster y lo agrega a DistributedCache
  • Establece el número de tareas de reducción para que coincida con el número actual de regiones
  • Establece la clase de clave/valor de salida para que coincida con los requisitos de HFileOutputFormat
  • Configura el reductor para realizar la clasificación adecuada (ya sea KeyValueSortReducer o PutSortReducer)

En esta etapa, se creará un HFile por región en la carpeta de salida. Tenga en cuenta que los datos de entrada se reescriben casi por completo, por lo que necesitará al menos el doble de la cantidad de espacio disponible en disco que el tamaño del conjunto de datos original. Por ejemplo, para un mysqldump de 100 GB, debe tener al menos 200 GB de espacio disponible en disco en HDFS. Puede eliminar el archivo de volcado al final del proceso.

3. Cargue los archivos en HBase diciéndoles a los RegionServers dónde encontrarlos. Este es el paso más fácil. Requiere el uso de LoadIncrementalHFiles (más comúnmente conocido como la herramienta completebulkload), y al pasarle una URL que ubica los archivos en HDFS, cargará cada archivo en la región relevante a través del RegionServer que lo sirve. En el caso de que una región se haya dividido después de que se crearon los archivos, la herramienta dividirá automáticamente el archivo HFile de acuerdo con los nuevos límites. Este proceso no es muy eficiente, por lo que si otros procesos están escribiendo en su tabla, es mejor cargar los archivos tan pronto como se complete el paso de transformación.

Aquí hay una ilustración de este proceso. El flujo de datos va desde la fuente original a HDFS, donde los RegionServers simplemente moverán los archivos a los directorios de sus regiones.

Casos de uso

Carga del conjunto de datos original: Todos los usuarios que migran desde otro almacén de datos deben considerar este caso de uso. Primero, debe realizar el ejercicio de diseñar el esquema de la tabla y luego crear la tabla en sí, predividida. Los puntos de división deben tener en cuenta la distribución de claves de fila y el número de RegionServers. Recomiendo leer la presentación de mi colega Lars George sobre el diseño de esquemas avanzados para cualquier caso de uso serio.

La ventaja aquí es que es mucho más rápido escribir los archivos directamente que pasar por la ruta de escritura de RegionServer (escribir tanto en MemStore como en WAL) y luego, finalmente, vaciar, compactar, etc. También significa que no tiene que ajustar su clúster para una carga de trabajo de escritura intensa y luego volver a ajustarlo para su carga de trabajo normal.

Carga incremental: Supongamos que tiene un conjunto de datos actualmente servido por HBase, pero ahora necesita importar más datos por lotes de un tercero o tiene un trabajo nocturno que genera algunos gigabytes que necesita insertar. Probablemente no sea tan grande como el conjunto de datos que ya está sirviendo HBase, pero podría afectar el percentil 95 de su latencia. Pasar por la ruta de escritura normal tendrá el efecto adverso de desencadenar más vaciados y compactaciones durante la importación de lo normal. Este estrés de IO adicional competirá con sus consultas sensibles a la latencia.

Ejemplos

Puede usar los siguientes ejemplos en su propio clúster de Hadoop, pero las instrucciones se proporcionan para Cloudera QuickStart VM, que es un clúster de un solo nodo, un sistema operativo invitado y datos de muestra y ejemplos integrados en un dispositivo de máquina virtual para su escritorio.

Una vez que inicie la VM, dígale, a través de la interfaz web que se abrirá automáticamente, que implemente CDH y luego asegúrese de que el servicio HBase también esté iniciado.

Cargador a granel TSV incorporado

HBase se envía con un trabajo de MR que puede leer un archivo de valores separados por delimitadores y enviarlo directamente a una tabla de HBase o crear HFiles para carga masiva. Aquí vamos a:

  1. Obtenga los datos de muestra y cárguelos en HDFS.
  2. Ejecute el trabajo ImportTsv para transformar el archivo en varios HFiles según una tabla preconfigurada.
  3. Preparar y cargar los archivos en HBase.

El primer paso es abrir una consola y usar el siguiente comando para obtener datos de muestra:

curl -O
https://people.apache.org/~jdcryans/word_count.csv

Creé este archivo ejecutando un conteo de palabras en el manuscrito original de esta misma publicación de blog y luego emitiendo el resultado en formato csv, sin ningún título de columna. Ahora, cargue el archivo a HDFS:

hdfs dfs -put word_count.csv

Ahora que la parte de extracción de la carga masiva está completa, debe transformar el archivo. Primero necesitas diseñar la mesa. Para mantener las cosas simples, llámelo "recuento de palabras":las teclas de fila serán las palabras mismas y la única columna contendrá el recuento, en una familia que llamaremos "f". La mejor práctica al crear una tabla es dividirla de acuerdo con la distribución de clave de fila, pero para este ejemplo solo crearemos cinco regiones con puntos de división distribuidos uniformemente en el espacio clave. Abra el shell de hbase:

hbase shell

Y ejecuta el siguiente comando para crear la tabla:

create 'wordcount', {NAME => 'f'},   {SPLITS => ['g', 'm', 'r', 'w']}

Los cuatro puntos de división generarán cinco regiones, donde la primera región comienza con una clave de fila vacía. Para obtener mejores puntos de división, también puede hacer un análisis rápido para ver cómo se distribuyen realmente las palabras, pero eso se lo dejo a usted.

Si apunta el navegador de su VM a http://localhost:60010/ verá nuestra tabla recién creada y sus cinco regiones, todas asignadas al RegionServer.

Ahora es el momento de hacer el trabajo pesado. Al invocar el jar de HBase en la línea de comando con el script "hadoop", se mostrará una lista de herramientas disponibles. El que queremos se llama importtsv y tiene el siguiente uso:

hadoop jar /usr/lib/hbase/hbase-0.94.6-cdh4.3.0-security.jar importtsv
 ERROR: Wrong number of arguments: 0
 Usage: importtsv -Dimporttsv.columns=a,b,c  

La línea de comando que vamos a utilizar es la siguiente:

hadoop jar   /usr/lib/hbase/hbase-0.94.6-cdh4.3.0-
security.jar importtsv
-Dimporttsv.separator=,
-Dimporttsv.bulk.output=output
-Dimporttsv.columns=HBASE_ROW_KEY,f:count wordcount word_count.csv

Aquí hay un resumen de los diferentes elementos de configuración:

  • -Dimporttsv.separator=, especifica que el separador es una coma.
  • -Dimporttsv.bulk.output=salida es una ruta relativa a donde se escribirán los HFiles. Dado que su usuario en la máquina virtual es "cloudera" de forma predeterminada, significa que los archivos estarán en /user/cloudera/output. Omitir esta opción hará que el trabajo se escriba directamente en HBase.
  • -Dimporttsv.columns=HBASE_ROW_KEY,f:contar es una lista de todas las columnas contenidas en este archivo. La clave de fila debe identificarse con la cadena HBASE_ROW_KEY en mayúsculas; de lo contrario, no comenzará el trabajo. (Decidí usar el calificador "contar", pero podría ser cualquier otra cosa).

El trabajo debe completarse en un minuto, dado el pequeño tamaño de entrada. Tenga en cuenta que se están ejecutando cinco reductores, uno por región. Aquí está el resultado en HDFS:

-rw-r--r--   3 cloudera cloudera         4265   2013-09-12 13:13 output/f/2c0724e0c8054b70bce11342dc91897b
-rw-r--r--   3 cloudera cloudera         3163   2013-09-12 13:14 output/f/786198ca47ae406f9be05c9eb09beb36
-rw-r--r--   3 cloudera cloudera         2487   2013-09-12 13:14 output/f/9b0e5b2a137e479cbc978132e3fc84d2
-rw-r--r--   3 cloudera cloudera         2961   2013-09-12 13:13 output/f/bb341f04c6d845e8bb95830e9946a914
-rw-r--r--   3 cloudera cloudera         1336   2013-09-12 13:14 output/f/c656d893bd704260a613be62bddb4d5f

Como puede ver, los archivos actualmente pertenecen al usuario “cloudera”. Para cargarlos, debemos cambiar el propietario a "hbase" o HBase no tendrá permiso para mover los archivos. Ejecute el siguiente comando:

sudo -u hdfs hdfs dfs -chown -R   hbase:hbase/user/cloudera/output

Para el paso final, necesitamos usar la herramienta completebulkload para señalar dónde están los archivos y en qué tablas estamos cargando:

hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles output wordcount

Volviendo al shell de HBase, puede ejecutar el comando de conteo que le mostrará cuántas filas se cargaron. Si olvidó chown, el comando se colgará.

Trabajo de MR personalizado

El cargador masivo TSV es bueno para la creación de prototipos, pero debido a que interpreta todo como cadenas y no admite la manipulación de los campos en el momento de la transformación, tendrá que escribir su propio trabajo de MR. Mi colega James Kinley, que trabaja como arquitecto de soluciones en Europa, escribió un trabajo que usaremos para nuestro próximo ejemplo. Los datos del trabajo contienen mensajes públicos de Facebook y Twitter relacionados con las Finales de la NBA de 2010 (juego 1) entre los Lakers y los Celtics. Puedes encontrar el código aquí. (La máquina virtual de inicio rápido viene con git y maven instalados para que pueda clonar el repositorio en ella).

Mirando la clase Driver, los bits más importantes son los siguientes:

job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapOutputValueClass(KeyValue.class);
…
	// Auto configure partitioner and reducer
    HFileOutputFormat.configureIncrementalLoad(job, hTable);

Primero, su Mapper necesita generar un ImmutableBytesWritable que contenga la clave de fila, y el valor de salida puede ser KeyValue, Put o Delete. El segundo fragmento muestra cómo configurar el Reductor; de hecho, está completamente manejado por HFileOutputFormat. configureIncrementalLoad() como se describe en la sección "Transformar" anteriormente.

La clase HBaseKVMapper solo contiene el asignador que respeta la clave de salida configurada y los valores:

public class HBaseKVMapper extends
   Mapper<LongWritable,   Text, ImmutableBytesWritable,
KeyValue> {

Para ejecutarlo, deberá compilar el proyecto usando maven y obtener los archivos de datos siguiendo los enlaces en el LÉAME. (También contiene el script de shell para crear la tabla). Antes de comenzar el trabajo, no olvide cargar los archivos en HDFS y configurar su classpath para que tenga en cuenta HBase porque no usará su jar esta vez. :

export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/etc/hbase/conf/:/usr/lib/hbase/*

Podrá iniciar el trabajo usando una línea de comando similar a esta:

hadoop jar hbase-examples-0.0.1-SNAPSHOT.jar
com.cloudera.examples.hbase.bulkimport.Driver -libjars
/home/cloudera/.m2/repository/joda-time/joda-time/2.1/joda-time-2.1.jar,
/home/cloudera/.m2/repository/net/sf/opencsv/opencsv/2.3/opencsv-2.3.jar
RowFeeder\ for\ Celtics\ and\ Lakers\ Game\ 1.csv output2 NBAFinal2010

Como puede ver, las dependencias del trabajo deben agregarse por separado. Finalmente, puede cargar los archivos cambiando primero su propietario y luego ejecutando la herramienta completebulkload:

sudo -u hdfs hdfs dfs -chown -R hbase:hbase/user/cloudera/output2
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles output2 NBAFinal2010

Problemas potenciales

Reaparecen los datos eliminados recientemente. Este problema ocurre cuando se inserta una eliminación a través de una carga masiva y se compacta en gran medida mientras la opción Put correspondiente todavía está en un MemStore. Los datos se considerarán eliminados cuando el Eliminar esté en un HFile pero, una vez que se elimine durante la compactación, el Put volverá a ser visible. Si tiene un caso de uso de este tipo, considere configurar sus familias de columnas para mantener las celdas eliminadas con KEEP_DELETED_CELLS en el shell o HColumnDescriptor.setKeepDeletedCells().

Los datos de carga masiva no se pueden sobrescribir con otra carga masiva. Este problema ocurre cuando dos HFiles de carga masiva cargados en diferentes momentos intentan escribir un valor diferente en la misma celda, lo que significa que tienen la misma clave de fila, familia, calificador y marca de tiempo. El resultado es que se devolverá el primer valor insertado en lugar del segundo. Este error se corregirá en HBase 0.96.0 y CDH 5 (la próxima versión principal de CDH) y se está trabajando en HBASE-8521 para la rama 0.94 y CDH 4.

La carga a granel provoca compactaciones importantes. Este problema surge cuando realiza cargas masivas incrementales y hay suficientes archivos cargados masivamente para activar una compactación menor (el umbral predeterminado es 3). Los HFiles se cargan con un número de secuencia establecido en 0, por lo que se seleccionan primero cuando RegionServer selecciona archivos para una compactación y, debido a un error, también seleccionará todos los archivos restantes. Este problema afectará gravemente a aquellos que ya tienen grandes regiones (varios GB) o que realizan cargas masivas con frecuencia (cada pocas horas o menos), ya que se compactarán muchos datos. HBase 0.96.0 tiene la solución adecuada, al igual que CDH 5; HBASE-8521 soluciona el problema en 0.94, ya que ahora se asigna un número de secuencia adecuado a los archivos HFiles de carga masiva. HBASE-8283 se puede habilitar con hbase.hstore.useExploringCompation después de 0.94.9 y CDH 4.4.0 para mitigar este problema simplemente siendo un algoritmo de selección de compactación más inteligente.

Los datos de carga masiva no se replican . Como la carga masiva omite la ruta de escritura, el WAL no se escribe como parte del proceso. La replicación funciona al leer los archivos WAL, por lo que no verá los datos cargados de forma masiva, y lo mismo ocurre con las ediciones que usan Put.setWriteToWAL (verdadero). Una forma de manejar eso es enviar los archivos sin procesar o los HFiles al otro clúster y realizar el otro procesamiento allí.

Conclusión

El objetivo de esta publicación de blog era presentarle los conceptos básicos de la carga masiva de Apache HBase. Explicamos cómo el proceso es como hacer ETL, y que es mucho mejor para grandes conjuntos de datos que usar la API normal, ya que omite la ruta de escritura. Los dos ejemplos se incluyeron para mostrar cómo se pueden cargar archivos TSV de forma masiva en HBase y cómo escribir su propio Mapper para otros formatos de datos.

Ahora puede intentar hacer lo mismo usando una interfaz gráfica de usuario a través de Hue.