sql >> Base de Datos >  >> NoSQL >> HBase

Creación de una aplicación de aprendizaje automático con Cloudera Data Science Workbench y Operational Database, parte 1:configuración y conceptos básicos

Introducción

Python se usa ampliamente entre los ingenieros de datos y los científicos de datos para resolver todo tipo de problemas, desde canalizaciones ETL/ELT hasta la creación de modelos de aprendizaje automático. Apache HBase es un sistema de almacenamiento de datos efectivo para muchos flujos de trabajo, pero acceder a estos datos específicamente a través de Python puede ser complicado. Para los profesionales de datos que deseen utilizar los datos almacenados en HBase, el reciente proyecto upstream "hbase-connectors" se puede utilizar con PySpark para operaciones básicas.

En esta serie de blogs, explicaremos cómo configurar PySpark y HBase juntos para el uso básico de Spark, así como para trabajos mantenidos en CDSW. Para aquellos que no están familiarizados con CDSW, es una plataforma de ciencia de datos empresarial segura y de autoservicio para que los científicos de datos administren sus propias canalizaciones de análisis, acelerando así los proyectos de aprendizaje automático desde la exploración hasta la producción. Para obtener más información sobre CDSW, visite la página del producto Cloudera Data Science Workbench.

En esta publicación, se explicarán y demostrarán varias operaciones junto con un resultado de ejemplo. Por contexto, todas las operaciones de ejemplo en esta publicación de blog específica se ejecutan con una implementación de CDSW.

Requisitos previos:

  1. Tener un clúster CDP con HBase y Spark
  2. Si va a seguir los ejemplos a través de CDSW, lo necesitará instalado:instalación de Cloudera Data Science Workbench
  3. Python 3 está instalado en cada nodo en la misma ruta

Configuración:

Primero, HBase y Spark deben configurarse juntos para que las consultas de Spark SQL funcionen correctamente. Para hacerlo hay dos partes:primero, configurar los servidores de la región HBase a través de Cloudera Manager; y segundo, asegúrese de que el tiempo de ejecución de Spark tenga enlaces HBase. Sin embargo, una nota a tener en cuenta es que Cloudera Manager ya configura algunas variables de configuración y de entorno para apuntar automáticamente a Spark a HBase por usted. No obstante, el primer paso de configurar consultas Spark SQL es común en todos los tipos de implementación en clústeres de CDP, pero el segundo es ligeramente diferente según el tipo de implementación.

Configuración de servidores de región HBase

  1. Vaya a Cloudera Manager y seleccione el servicio HBase.
  2. Busque "entorno de servidor regional"

  1. Agregue una nueva variable de entorno mediante el fragmento de código de configuración avanzada del entorno de RegionServer (válvula de seguridad):
    • Clave:HBASE_CLASSPATH
    • Valor:/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar:/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib /hbase-spark-protocol-shaded.jar:/opt/cloudera/parcels/CDH/jars/scala-library-2.11.12.jar
      Asegúrese de utilizar los números de versión adecuados.
  2. Reiniciar servidores de región.

Una vez que siga los pasos anteriores, siga los pasos a continuación dependiendo de si desea una implementación CDSW o no CDSW.

Agregar enlaces HBase a Spark Runtime en implementaciones no CDSW

Para implementar el shell o usar spark-submit correctamente, use los siguientes comandos para asegurarse de que spark tenga los enlaces HBase correctos.

pyspark –jars /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded. tarro

spark-submit –jars /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol- sombreado.jar

Agregar enlaces de HBase a Spark Runtime en implementaciones de CDSW

Para configurar CDSW con HBase y PySpark, debe seguir algunos pasos.

1) Asegúrese de que Python 3 esté instalado en cada nodo del clúster y anote la ruta

2) Cree un nuevo proyecto en CDSW y use una plantilla de PySpark

3) Abra el Proyecto, vaya a Configuración -> Motor -> Variables de entorno.

4) Establecer PYSPARK3_DRIVER_PYTHON y PYSPARK3_PYTHON a la ruta donde está instalado Python en los nodos de su clúster (Ruta indicada en el Paso 1).

A continuación se muestra una muestra de cómo debería verse.

5) En su proyecto, vaya a Archivos -> spark-defaults.conf y ábralo en Workbench

6) Copie y pegue la siguiente línea en ese archivo y asegúrese de guardarlo antes de iniciar una nueva sesión.

spark.jars=/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded.jar

En este punto, CDSW ahora está configurado para ejecutar trabajos de PySpark en HBase. El resto de esta publicación de blog se refiere a algunas operaciones de muestra en una implementación de CDSW.

Operaciones de ejemplo 

Operaciones de venta

Hay dos formas de insertar y actualizar filas en HBase. El primer método y el más recomendado es crear un catálogo, que es un esquema que asignará las columnas de una tabla HBase a un marco de datos PySpark mientras especifica el nombre de la tabla y el espacio de nombres. La creación de este formato JSON definido por el usuario es el método preferido, ya que también se puede utilizar con otras operaciones. Para obtener más información acerca de los catálogos, consulte esta documentación http://hbase.apache.org/book.html#_define_catalog. El segundo método utiliza un parámetro de mapeo específico llamado "hbase.columns.mapping" que solo toma una cadena de pares clave-valor.

  • Uso de catálogos
from pyspark.sql import Row
from pyspark.sql import SparkSession

spark = SparkSession\
  .builder\
  .appName("SampleApplication")\
  .getOrCreate()

tableCatalog = ''.join("""{
               "table":{"namespace":"default", "name":"tblEmployee", "tableCoder":"PrimitiveType"},
               "rowkey":"key",
               "columns":{
                 "key":{"cf":"rowkey", "col":"key", "type":"int"},
                 "empId":{"cf":"personal","col":"empId","type":"string"},
                 "empName":{"cf":"personal", "col":"empName", "type":"string"},
                 "empState":{"cf":"personal", "col":"empWeight", "type":"string"}
               }
             }""".split())

employee = [(10, 'jonD', 'Jon Daniels', 'CA'), (6, 'billR', 'Bill Robert', 'FL')]
employeeRDD = spark.sparkContext.parallelize(employee)
employeeMap = employeeRDD.map(lambda x: Row(key=int(x[0]), empId=x[1], empName=x[2], empState=x[3]))
employeeDF = spark.createDataFrame(employeeMap)

employeeDF.write.format("org.apache.hadoop.hbase.spark") \
  .options(catalog=tableCatalog, newTable=5) \
  .option("hbase.spark.use.hbasecontext", False) \
  .save()
# newTable refers to the NumberOfRegions which has to be > 3

Verifique que se cree una nueva tabla llamada "tblEmployee" en HBase simplemente abriendo el shell de HBase y ejecutando el siguiente comando:

escanear 'tblEmployee', {'LIMIT' => 2}

Usar catálogos también puede permitirle cargar tablas HBase fácilmente. Esto se discutirá en una entrega futura.

  • Uso de hbase.columns.mapping

Al escribir el marco de datos de PySpark, se puede agregar una opción llamada "hbase.columns.mapping" para incluir una cadena que mapee las columnas correctamente. Esta opción solo le permite insertar filas en tablas existentes.

En el shell de HBase, primero creemos una tabla create 'tblEmployee2', 'personal'

Ahora, en PySpark, insertemos 2 filas usando "hbase.columns.mapping"

from pyspark.sql import Row
from pyspark.sql import SparkSession

spark = SparkSession\
  .builder\
  .appName("SampleApplication")\
  .getOrCreate()

employee = [(10, 'jonD', 'Jon Daniels', 170.7), (6, 'billR', 'Bill Robert', 200.1)]
employeeRDD = spark.sparkContext.parallelize(employee)
employeeMap = employeeRDD.map(lambda x: Row(key=int(x[0]), empId=x[1], empName=x[2], empWeight=float(x[3])))
employeeDF = spark.createDataFrame(employeeMap)


employeeDF.write.format("org.apache.hadoop.hbase.spark") \
       .option("hbase.columns.mapping", "key INTEGER :key, empId STRING personal:empId, empName STRING personal:empName, empWeight FLOAT personal:empWeight") \
       .option("hbase.table", "tblEmployee2") \
       .option("hbase.spark.use.hbasecontext", False) \
       .save()

Nuevamente, simplemente verifique que una nueva tabla llamada "tblEmployee2" tenga estas filas nuevas.

escanear 'tblEmployee2', {'LIMIT' => 2}

Eso completa nuestros ejemplos de cómo insertar filas a través de PySpark en tablas HBase. En la próxima entrega, hablaré sobre las operaciones Get y Scan, PySpark SQL y algunas soluciones de problemas. Hasta entonces, debe obtener un clúster de CDP y seguir estos ejemplos.