sql >> Base de Datos >  >> NoSQL >> HBase

Spark-on-HBase:conector HBase basado en DataFrame

Esta entrada de blog se publicó en Hortonworks.com antes de la fusión con Cloudera. Es posible que algunos enlaces, recursos o referencias ya no sean precisos.

Nos enorgullece anunciar la vista previa técnica de Spark-HBase Connector, desarrollado por Hortonworks en colaboración con Bloomberg.

El conector Spark-HBase aprovecha la API de fuente de datos (SPARK-3247) introducida en Spark-1.2.0. Cierra la brecha entre el almacén de valor clave HBase simple y las consultas SQL relacionales complejas y permite a los usuarios realizar análisis de datos complejos además de HBase usando Spark. Un HBase DataFrame es un Spark DataFrame estándar y puede interactuar con cualquier otra fuente de datos como Hive, ORC, Parquet, JSON, etc.

Antecedentes

Hay varios conectores Spark HBase de código abierto disponibles como paquetes Spark, como proyectos independientes o en troncal HBase.

Spark se ha movido a las API Dataset/DataFrame, que proporciona una optimización del plan de consulta integrado. Ahora, los usuarios finales prefieren usar la interfaz basada en DataFrames/Datasets.

El conector HBase en el enlace troncal HBase tiene un gran soporte a nivel de RDD, p. BulkPut, etc., pero su compatibilidad con DataFrame no es tan rica. El conector troncal de HBase se basa en HadoopRDD estándar con TableInputFormat integrado de HBase y tiene algunas limitaciones de rendimiento. Además, BulkGet realizado en el controlador puede ser un único punto de error.

Hay algunas otras implementaciones alternativas. Tome Spark-SQL-on-HBase como ejemplo. Aplica técnicas de optimización personalizadas muy avanzadas mediante la incorporación de su propio plan de optimización de consultas dentro del motor Spark Catalyst estándar, envía el RDD a HBase y realiza tareas complicadas, como la agregación parcial, dentro del coprocesador HBase. Este enfoque es capaz de lograr un alto rendimiento, pero es difícil de mantener debido a su complejidad y la rápida evolución de Spark. Además, permitir que se ejecute código arbitrario dentro de un coprocesador puede plantear riesgos de seguridad.

El conector Spark-on-HBase (SHC) se ha desarrollado para superar estos cuellos de botella y debilidades potenciales. Implementa la API Spark Datasource estándar y aprovecha el motor Spark Catalyst para la optimización de consultas. Paralelamente, el RDD se construye desde cero en lugar de usar TableInputFormat para lograr un alto rendimiento. Con este RDD personalizado, todas las técnicas críticas se pueden aplicar e implementar por completo, como la eliminación de particiones, la eliminación de columnas, la inserción de predicados y la localidad de datos. El diseño hace que el mantenimiento sea muy fácil, al tiempo que logra un buen compromiso entre rendimiento y simplicidad.

Arquitectura

Asumimos que Spark y HBase se implementan en el mismo clúster y que los ejecutores de Spark se ubican junto con los servidores regionales, como se ilustra en la figura a continuación.

Figura 1. Arquitectura del conector Spark-on-HBase

En un nivel alto, el conector trata a Scan y Get de manera similar, y ambas acciones se realizan en los ejecutores. El controlador procesa la consulta, agrega exploraciones/obtenciones en función de los metadatos de la región y genera tareas por región. Las tareas se envían a los ejecutores preferidos ubicados junto con el servidor de la región y se realizan en paralelo en los ejecutores para lograr una mejor localidad y concurrencia de datos. Si una región no contiene los datos necesarios, a ese servidor de región no se le asigna ninguna tarea. Una tarea puede constar de varios escaneos y BulkGets, y las solicitudes de datos de una tarea se recuperan de un solo servidor de región, y este servidor de región también será la preferencia de localidad para la tarea. Tenga en cuenta que el controlador no está involucrado en la ejecución del trabajo real, excepto en las tareas de programación. Esto evita que el conductor sea el cuello de botella.

Catálogo de mesas

Para traer la tabla HBase como una tabla relacional a Spark, definimos un mapeo entre las tablas HBase y Spark, llamado Table Catalog. Hay dos partes críticas de este catálogo. Una es la definición de clave de fila y la otra es la asignación entre la columna de la tabla en Spark y la familia de columnas y el calificador de columna en HBase. Consulte la sección Uso para obtener más detalles.

Soporte nativo de Avro

El conector admite el formato Avro de forma nativa, ya que es una práctica muy común conservar los datos estructurados en HBase como una matriz de bytes. El usuario puede conservar el registro de Avro en HBase directamente. Internamente, el esquema de Avro se convierte automáticamente a un tipo de datos nativo de Spark Catalyst. Tenga en cuenta que ambas partes clave-valor en una tabla HBase se pueden definir en formato Avro. Consulte los ejemplos/casos de prueba en el repositorio para conocer el uso exacto.

Inserción de predicado

El conector solo recupera las columnas requeridas del servidor de la región para reducir la sobrecarga de la red y evitar el procesamiento redundante en el motor Spark Catalyst. Los filtros HBase estándar existentes se utilizan para realizar la inserción de predicados sin aprovechar la capacidad del coprocesador. Debido a que HBase no conoce el tipo de datos, excepto la matriz de bytes, y la inconsistencia de orden entre los tipos primitivos de Java y la matriz de bytes, tenemos que procesar previamente la condición del filtro antes de configurar el filtro en la operación de escaneo para evitar cualquier pérdida de datos. Dentro del servidor de región, se filtran los registros que no coinciden con la condición de consulta.

Recorte de particiones

Al extraer la clave de fila de los predicados, dividimos Scan/BulkGet en múltiples rangos que no se superponen, solo los servidores de región que tienen los datos solicitados realizarán Scan/BulkGet. Actualmente, la poda de partición se realiza en la primera dimensión de las claves de fila. Por ejemplo, si una clave de fila es "clave1:clave2:clave3", la eliminación de la partición se basará únicamente en "clave1". Tenga en cuenta que las condiciones DONDE deben definirse cuidadosamente. De lo contrario, es posible que la eliminación de la partición no surta efecto. Por ejemplo, WHERE filaclave1> “abc” O columna =“xyz” (donde filaclave1 es la primera dimensión de la clave de fila y la columna es una columna normal de hbase) dará como resultado un análisis completo, ya que tenemos que cubrir todos los rangos porque del O lógica.

Localidad de datos

Cuando un ejecutor de Spark se ubica junto con los servidores de la región HBase, la ubicación de los datos se logra mediante la identificación de la ubicación del servidor de la región y se hace el mejor esfuerzo para ubicar la tarea junto con el servidor de la región. Cada ejecutor realiza Scan/BulkGet en la parte de los datos coubicados en el mismo host.

Escanear y obtener masivamente

Estos dos operadores están expuestos a los usuarios especificando la CLÁUSULA WHERE, por ejemplo, WHERE columna> x y columna para escanear y DONDE columna =x olvidar. Las operaciones se realizan en los ejecutores, y el controlador solo construye estas operaciones. Internamente, se convierten para escanear y/u obtener, e Iterator[Row] se devuelve al motor de Catalyst para el procesamiento de la capa superior.

Uso

A continuación se ilustra el procedimiento básico sobre cómo utilizar el conector. Para obtener más detalles y un caso de uso avanzado, como Avro y soporte de clave compuesta, consulte los ejemplos en el repositorio.

1) Defina el catálogo para el esquema de mapeo:

[code language="scala"]def catalog = s"""{
         |"table":{"namespace":"default", "name":"table1"},
         |"rowkey":"key",
         |"columns":{
           |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
           |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
           |"col2":{"cf":"cf2", "col":"col2", "type":"double"},
           |"col3":{"cf":"cf3", "col":"col3", "type":"float"},
           |"col4":{"cf":"cf4", "col":"col4", "type":"int"},
           |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
           |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
           |"col7":{"cf":"cf7", "col":"col7", "type":"string"},
           |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
         |}
       |}""".stripMargin
[/code]

2) Prepare los datos y complete la tabla HBase:
case class HBaseRecord(col0:String, col1:Boolean,col2:Double, col3:Float,col4:Int,       col5:Long, col6:Short, col7:String, col8:Byte)

objeto HBaseRecord {def apply(i:Int, t:String):HBaseRecord ={ val s =s”””row${“%03d”.format(i)}”””       HBaseRecord(s, i % 2 ==0, i.toDouble, i.toFloat,  i, i.toLong, i.toShort,  s”String$i:$t”,      i.toByte) }}

val data =(0 a 255).map { i =>  HBaseRecord(i, “extra”)}

sc.parallelize(data).toDF.write.options(
 Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> “5”))
 .format(“org.apache.spark. sql.execution.datasources.hbase”)
.save()
 
3) Cargue el marco de datos:
def withCatalog(cat:String):DataFrame ={
 sqlContext
 .read
 .options(Map(HBaseTableCatalog.tableCatalog->cat))
 .format( “org.apache.spark.sql.execution.datasources.hbase”)
.load()
}

val df =conCatálogo(catálogo)

4) Consulta integrada de idioma:
val s =df.filter((($”col0″ <=“fila050″ &&$”col0”> “fila040”) ||
 $”col0″ ===“fila005” ||
 $”col0″ ===“fila020” ||
 $”col0″ === “r20” ||
 $”col0″ <=“fila005”) &&
 ($”col4″ ===1 ||
 $”col4″ ===42))
 .select(“col0”, “col1”, “col4”)
s .mostrar

5) Consulta SQL:
df.registerTempTable(“tabla”)
sqlContext.sql(“seleccione recuento(col1) de la tabla”).show

Configuración del paquete Spark

Los usuarios pueden usar el conector Spark-on-HBase como un paquete Spark estándar. Para incluir el paquete en su aplicación Spark, use:

spark-shell, pyspark o spark-submit

> $SPARK_HOME/bin/spark-shell –paquetes zhzhan:shc:0.0.11-1.6.1-s_2.10

Los usuarios también pueden incluir el paquete como dependencia en su archivo SBT. El formato es Spark-package-name:versión

spDependencias +=“zhzhan/shc:0.0.11-1.6.1-s_2.10”

Ejecución en clúster seguro

Para ejecutar en un clúster habilitado para Kerberos, el usuario debe incluir archivos jar relacionados con HBase en el classpath, ya que Spark realiza la recuperación y renovación del token de HBase y es independiente del conector. En otras palabras, el usuario debe iniciar el entorno de forma normal, ya sea mediante kinit o proporcionando principal/keytab. Los siguientes ejemplos muestran cómo ejecutar en un clúster seguro tanto con el modo yarn-client como con el modo yarn-cluster. Tenga en cuenta que SPARK_CLASSPATH debe configurarse para ambos modos, y el jar de ejemplo es solo un marcador de posición para Spark.

export SPARK_CLASSPATH=/usr/hdp/current/hbase-client/lib/hbase-common.jar:/usr/hdp/current/hbase-client/lib/hbase-client.jar:/usr/hdp/current/hbase- cliente/lib/hbase-server.jar:/usr/hdp/current/hbase-client/lib/hbase-protocol.jar:/usr/hdp/current/hbase-client/lib/guava-12.0.1.jar

Supongamos que hrt_qa es una cuenta sin cabeza, el usuario puede usar el siguiente comando para kinit:

kinit -k -t /tmp/hrt_qa.headless.keytab hrt_qa

/usr/hdp/current/spark-client/bin/spark-submit –class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource –master yarn-client –packages zhzhan:shc:0.0.11- 1.6.1-s_2.10 –num-executors 4 –driver-memory 512m –executor-memory 512m –executor-cores 1 /usr/hdp/current/spark-client/lib/spark-examples-1.6.1.2.4.2. 0-106-hadoop2.7.1.2.4.2.0-106.jar

/usr/hdp/current/spark-client/bin/spark-submit –class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource –master yarn-cluster –files /etc/hbase/conf/hbase -site.xml –packages zhzhan:shc:0.0.11-1.6.1-s_2.10 –num-executors 4 –driver-memory 512m –executor-memory 512m –executor-cores 1 /usr/hdp/current/spark- cliente/lib/chispa-ejemplos-1.6.1.2.4.2.0-106-hadoop2.7.1.2.4.2.0-106.jar

Poniéndolo todo junto

Acabamos de brindar una descripción general rápida de cómo HBase admite Spark en el nivel de DataFrame. Con la API de DataFrame, las aplicaciones Spark pueden trabajar con datos almacenados en la tabla HBase tan fácilmente como cualquier dato almacenado en otras fuentes de datos. Con esta nueva función, las aplicaciones Spark y otras herramientas interactivas pueden consumir fácilmente los datos de las tablas de HBase, p. los usuarios pueden ejecutar una consulta SQL compleja sobre una tabla HBase dentro de Spark, realizar una unión de tabla contra Dataframe o integrarse con Spark Streaming para implementar un sistema más complicado.

¿Qué sigue?

Actualmente, el conector está alojado en el repositorio de Hortonworks y se publica como un paquete de Spark. Está en proceso de migración al tronco de Apache HBase. Durante la migración, identificamos algunos errores críticos en el enlace troncal de HBase y se corregirán junto con la fusión. El trabajo de la comunidad es rastreado por el paraguas HBase JIRA HBASE-14789, que incluye HBASE-14795 y HBASE-14796 para optimizar la arquitectura informática subyacente para Scan y BulkGet, HBASE-14801 para proporcionar una interfaz de usuario JSON para facilitar el uso, HBASE-15336 para la ruta de escritura de DataFrame, HBASE-15334 para compatibilidad con Avro, HBASE-15333 para admitir tipos primitivos de Java, como short, int, long, float y double, etc., HBASE-15335 para admitir clave de fila compuesta y HBASE-15572 para agregar semántica de marca de tiempo opcional. Esperamos producir una versión futura del conector que haga que sea aún más fácil trabajar con él.

Reconocimiento

Queremos agradecer a Hamel Kothari, Sudarshan Kadambi y al equipo de Bloomberg por guiarnos en este trabajo y también por ayudarnos a validarlo. También queremos agradecer a la comunidad de HBase por brindar sus comentarios y mejorar esto. Finalmente, este trabajo ha aprovechado las lecciones de integraciones anteriores de Spark HBase y queremos agradecer a sus desarrolladores por allanar el camino.

Referencia:

SHC:https://github.com/hortonworks/shc-release

Paquete Spark:http://spark-packages.org/package/zhzhan/shc

Apache HBase: https://hbase.apache.org/

Apache Spark:http://spark.apache.org/