sql >> Base de Datos >  >> NoSQL >> HBase

Procedimiento:escanear tablas HBase de Apache saladas con intervalos de claves específicos de la región en MapReduce

Gracias a Pengyu Wang, desarrollador de software de FINRA, por permitirnos volver a publicar esta publicación.

Las tablas Salted Apache HBase con división previa son una solución HBase comprobada y eficaz para proporcionar una distribución uniforme de la carga de trabajo en RegionServers y evitar puntos calientes durante las escrituras masivas. En este diseño, se crea una clave de fila con una clave lógica más sal al principio. Una forma de generar sal es calculando el módulo n (número de regiones) en el código hash de la clave de fila lógica (fecha, etc.).

Claves de fila de salazón

Por ejemplo, una tabla que acepte la carga de datos diariamente podría usar claves de fila lógicas que comiencen con una fecha, y queremos dividir previamente esta tabla en 1000 regiones. En este caso, esperamos generar 1.000 sales diferentes. La sal se puede generar, por ejemplo, como:

StringUtils.leftPad(Integer.toString(Math.abs(keyCore.hashCode() % 1000)), 3, "0") + "|" + logicalKey 

logicalKey = 2015-04-26|abc
rowKey = 893|2015-04-26|abc

La salida de hashCode() con módulo proporciona aleatoriedad para el valor de sal de "000" a "999". Con esta transformación clave, la tabla se divide previamente en los límites de sal a medida que se crea. Esto hará que los volúmenes de fila se distribuyan uniformemente mientras se cargan los HFiles con la carga masiva de MapReduce. Garantiza que las claves de fila con la misma sal caigan en la misma región.

En muchos casos de uso, como el archivo de datos, debe escanear o copiar los datos en un rango de clave lógica particular (rango de fechas) usando el trabajo de MapReduce. Los trabajos de MapReduce de la tabla estándar se configuran proporcionando el Scan instancia con atributos de rango clave.

Scan scan = new Scan();
scan.setCaching(1000);
scan.setCacheBlocks(false);
scan.setBatch(1000);
scan.setMaxVersions(1);
scan.setStartRow(Bytes.toBytes("2015-04-26"));
scan.setStopRow(Bytes.toBytes("2015-04-27"));

/* Setup the table mapper job */
TableMapReduceUtil.initTableMapperJob(
tablename,
scan,
DataScanMapper.class,
ImmutableBytesWritable.class,
KeyValue.class,
job, 
true, 
TableInputFormat.class
);
…

Sin embargo, la configuración de dicho trabajo se convierte en un desafío para las mesas predivididas saladas. Las teclas de fila de inicio y detención serán diferentes para cada región porque cada una tiene una sal única. Y no podemos especificar múltiples rangos para un Scan instancia.

Para resolver este problema, debemos analizar cómo funciona la tabla MapReduce. Generalmente, el marco MapReduce crea una tarea de mapa para leer y procesar cada división de entrada. Cada división se genera en InputFormat base de clase, por método getSplits() .

En el trabajo MapReduce de la tabla HBase, TableInputFormat se usa como InputFormat . Dentro de la implementación, getSplits() El método se anula para recuperar las claves de fila de inicio y parada del Scan instancia. Dado que las teclas de fila de inicio y detención abarcan varias regiones, el rango se divide por los límites de la región y devuelve la lista de TableSplit objetos que cubre el rango de la tecla de escaneo. En lugar de basarse en el bloque HDFS, TableSplit s se basan en la región. Sobrescribiendo getSplits() método, podemos controlar el TableSplit .

Creación de un formato de entrada de tabla personalizado

Para cambiar el comportamiento de getSplits() método, una clase personalizada que se extiende TableInputFormat es requerido. El propósito de getSplits() aquí es para cubrir el rango de clave lógica en cada región, construir su rango de clave de fila con su sal única. La clase HTable proporciona el método getStartEndKeys() que devuelve claves de fila de inicio y final para cada región. Desde cada tecla de inicio, analice la sal correspondiente para la región.

Pair keys = table.getStartEndKeys();
for (int i = 0; i < keys.getFirst().length; i++) {

// The first 3 bytes is the salt, for the first region, start key is empty, so apply “000”
if (keys.getFirst()[i].length == 0) {
regionSalt = "000";
} else {
regionSalt = Bytes.toString(keys.getFirst()[i]).substring(0, 3);
}
…
}

La configuración del trabajo supera el intervalo de claves lógicas

TableInputFormat recupera la clave de inicio y parada de Scan instancia. Como no podemos usar Scan en nuestro trabajo de MapReduce, podríamos usar Configuration en su lugar, pasar estas dos variables y solo la clave lógica de inicio y parada es lo suficientemente buena (una variable podría ser una fecha u otra información comercial). El getSplits() el método tiene JobContext argumento, la instancia de configuración se puede leer como context.getConfiguration() .

En el controlador MapReduce:

Configuration conf = getConf();
conf = HBaseConfiguration.addHbaseResources(conf);

conf.set("logical.scan.start", "2015-04-26");
conf.set("logical.scan.stop", "2015-04-27");

En Custom TableInputFormat :

@Override 
public List getSplits(JobContext context) throws IOException {
conf = context.getConfiguration();
String scanStart = conf.get("logical.scan.start");
String scanStop = conf.get("logical.scan.stop");
…
}

Reconstruir el rango de claves saladas por región

Ahora que tenemos la clave salt y lógica de inicio/parada para cada región, podemos reconstruir el rango de clave de fila real.

byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart);
byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop);

Crear un TableSplit para cada región

Con el rango de clave de fila, ahora podemos inicializar TableSplit instancia para la región.

List splits = new ArrayList(keys.getFirst().length);
for (int i = 0; i < keys.getFirst().length; i++) {
…
byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart);
byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop);

InputSplit split = new TableSplit(table.getTableName(), startRowKey, endRowKey, regionLocation);
splits.add(split);
}

Una cosa más a tener en cuenta es la localidad de los datos. El marco utiliza información de ubicación en cada división de entrada para asignar una tarea de mapa en su host local. Para nuestro TableInputFormat , usamos el método getTableRegionLocation() para recuperar la ubicación de la región que sirve la clave de fila.

Esta ubicación luego se pasa a TableSplit constructor. Esto asegurará que el asignador que procesa la división de la tabla esté en el mismo servidor de región. Un método, llamado DNS.reverseDns() , requiere la dirección del servidor de nombres HBase. Este atributo se almacena en la configuración “hbase.nameserver.address “.

this.nameServer = context.getConfiguration().get("hbase.nameserver.address", null);
…

public String getTableRegionLocation(HTable table, byte[] rowKey) throws IOException {
HServerAddress regionServerAddress = table.getRegionLocation(rowKey).getServerAddress();
InetAddress regionAddress = regionServerAddress.getInetSocketAddress().getAddress();
String regionLocation;
try {
regionLocation = reverseDNS(regionAddress);
} catch (NamingException e) {
regionLocation = regionServerAddress.getHostname();
}
return regionLocation;
}

protected String reverseDNS(InetAddress ipAddress) throws NamingException {
String hostName = this.reverseDNSCacheMap.get(ipAddress);
if (hostName == null) {
hostName = Strings.domainNamePointerToHostName(DNS.reverseDns(ipAddress, this.nameServer));
this.reverseDNSCacheMap.put(ipAddress, hostName);
}
return hostName;
}

Un código completo de getSplits se verá así:

@Override 
public List getSplits(JobContext context) throws IOException {
conf = context.getConfiguration();
table = getHTable(conf);
if (table == null) {
throw new IOException("No table was provided.");
}

// Get the name server address and the default value is null.
this.nameServer = conf.get("hbase.nameserver.address", null);
String scanStart = conf.get("region.scan.start");
String scanStop = conf.get("region.scan.stop");

Pair keys = table.getStartEndKeys();
if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
throw new RuntimeException("At least one region is expected");
}
List splits = new ArrayList(keys.getFirst().length);
for (int i = 0; i < keys.getFirst().length; i++) {

String regionLocation = getTableRegionLocation(table, keys.getFirst()[i]);

String regionSalt = null;
if (keys.getFirst()[i].length == 0) {
regionSalt = "000";
} else {
regionSalt = Bytes.toString(keys.getFirst()[i]).substring(0, 3);
}
byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart);
byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop);

InputSplit split = new TableSplit(table.getTableName(), startRowKey, endRowKey, regionLocation);
splits.add(split);
}
log.info("Total table splits: " + splits.size());
return splits;
}

Utilice el TableInoutFormat personalizado en el controlador MapReduce

Ahora necesitamos reemplazar el TableInputFormat class con la compilación personalizada que usamos para la configuración del trabajo MapReduce de la tabla.

Configuration conf = getConf();
conf = HBaseConfiguration.addHbaseResources(conf);
HTableInterface status_table = new HTable(conf, status_tablename);

conf.set("logical.scan.start", "2015-04-26");
conf.set("logical.scan.stop", "2015-04-27");

Scan scan = new Scan();
scan.setCaching(1000);
scan.setCacheBlocks(false);
scan.setBatch(1000);
scan.setMaxVersions(1);

/* Setup the table mapper job */
TableMapReduceUtil.initTableMapperJob(
tablename,
scan,
DataScanMapper.class,
ImmutableBytesWritable.class,
KeyValue.class,
job, 
true, 
MultiRangeTableInputFormat.class
);

El enfoque de TableInputFormat personalizado proporciona una capacidad de escaneo eficiente y escalable para tablas HBase que están diseñadas para usar sal para una carga de datos equilibrada. Dado que el escaneo puede omitir cualquier clave de fila no relacionada, independientemente del tamaño de la tabla, la complejidad del escaneo se limita solo al tamaño de los datos de destino. En la mayoría de los casos de uso, esto puede garantizar un tiempo de procesamiento relativamente constante a medida que crece la tabla.