sql >> Base de Datos >  >> NoSQL >> HBase

Serialización robusta de mensajes en Apache Kafka con Apache Avro, parte 1

En Apache Kafka, las aplicaciones de Java llamadas productores escriben mensajes estructurados en un clúster de Kafka (compuesto por intermediarios). De manera similar, las aplicaciones Java denominadas consumidores leen estos mensajes desde el mismo clúster. En algunas organizaciones existen diferentes grupos encargados de redactar y gestionar a los productores y consumidores. En tales casos, un punto crítico importante puede estar en la coordinación del formato de mensaje acordado entre productores y consumidores.

Este ejemplo demuestra cómo usar Apache Avro para serializar registros que se producen en Apache Kafka mientras permite la evolución de esquemas y la actualización asincrónica de aplicaciones de productor y consumidor.

Serialización y deserialización

Un registro de Kafka (anteriormente denominado mensaje) consta de una clave, un valor y encabezados. Kafka no conoce la estructura de los datos en la clave y el valor de los registros. Los maneja como matrices de bytes. Pero los sistemas que leen registros de Kafka sí se preocupan por los datos de esos registros. Por lo tanto, debe producir datos en un formato legible. El formato de datos que utilice debe

  • Sea compacto
  • Sé rápido para codificar y decodificar
  • Permitir evolución
  • Permita que los sistemas ascendentes (aquellos que escriben en un clúster de Kafka) y los sistemas descendentes (aquellos que leen del mismo clúster de Kafka) se actualicen a esquemas más nuevos en diferentes momentos

JSON, por ejemplo, se explica por sí mismo, pero no es un formato de datos compacto y es lento de analizar. Avro es un marco de serialización rápido que crea una salida relativamente compacta. Pero para leer los registros de Avro, necesita el esquema con el que se serializaron los datos.

Una opción es almacenar y transferir el esquema con el propio registro. Esto está bien en un archivo donde almacena el esquema una vez y lo usa para una gran cantidad de registros. Sin embargo, almacenar el esquema en todos y cada uno de los registros de Kafka agrega una sobrecarga significativa en términos de espacio de almacenamiento y utilización de la red. Otra opción es tener un conjunto acordado de asignaciones de esquemas de identificadores y hacer referencia a los esquemas por sus identificadores en el registro.

Del objeto al registro de Kafka y viceversa

Las aplicaciones de productor no necesitan convertir datos directamente en matrices de bytes. KafkaProducer es una clase genérica que necesita que su usuario especifique tipos de clave y valor. Luego, los productores aceptan instancias de ProducerRecord que tienen los mismos parámetros de tipo. La conversión del objeto a la matriz de bytes se realiza mediante un serializador. Kafka proporciona algunos serializadores primitivos:por ejemplo, IntegerSerializer , ByteArraySerializer , StringSerializer . Del lado del consumidor, deserializadores similares convierten matrices de bytes en un objeto que la aplicación puede manejar.

Por lo tanto, tiene sentido conectarse a nivel de serializador y deserializador y permitir que los desarrolladores de aplicaciones de producción y consumo utilicen la conveniente interfaz proporcionada por Kafka. Aunque las últimas versiones de Kafka permiten ExtendedSerializers y ExtendedDeserializers para acceder a los encabezados, decidimos incluir el identificador de esquema en la clave y el valor de los registros de Kafka en lugar de agregar encabezados de registro.

Básicos de Avro

Avro es un marco de serialización de datos (y llamada a procedimiento remoto). Utiliza un documento JSON llamado esquema para describir estructuras de datos. La mayor parte del uso de Avro es a través de GenericRecord o subclases de SpecificRecord. Las clases de Java generadas a partir de los esquemas de Avro son subclases de estos últimos, mientras que las primeras se pueden utilizar sin conocimiento previo de la estructura de datos con la que se trabaja.

Cuando dos esquemas satisfacen un conjunto de reglas de compatibilidad, los datos escritos con un esquema (llamado esquema del escritor) se pueden leer como si estuvieran escritos con el otro (llamado esquema del lector). Los esquemas tienen una forma canónica que tiene todos los detalles que son irrelevantes para la serialización, como los comentarios, eliminados para ayudar a verificar la equivalencia.

VersionedSchema y SchemaProvider

Como se mencionó anteriormente, necesitamos un mapeo uno a uno entre los esquemas y sus identificadores. A veces es más fácil referirse a los esquemas por sus nombres. Cuando se crea un esquema compatible, se puede considerar una próxima versión del esquema. Por lo tanto, podemos referirnos a esquemas con un nombre, par de versiones. Llamemos al esquema, su identificador, nombre y versión juntos un VersionedSchema . Este objeto puede contener metadatos adicionales que requiere la aplicación.

public class VersionedSchema {
  private final int id;
  private final String name;
  private final int version;
  private final Schema schema;

  public VersionedSchema(int id, String name, int version, Schema schema) {
    this.id = id;
    this.name = name;
    this.version = version;
    this.schema = schema;
  }

  public String getName() {
    return name;
  }

  public int getVersion() {
    return version;
  }

  public Schema getSchema() {
    return schema;
  }
    
  public int getId() {
    return id;
  }
}

SchemaProvider los objetos pueden buscar las instancias de VersionedSchema .

public interface SchemaProvider extends AutoCloseable {
  public VersionedSchema get(int id);
  public VersionedSchema get(String schemaName, int schemaVersion);
  public VersionedSchema getMetadata(Schema schema);
}

La forma en que se implementa esta interfaz se explica en "Implementación de un almacén de esquemas" en una publicación de blog futura.

Serializar datos genéricos

Al serializar un registro, primero debemos averiguar qué esquema usar. Cada registro tiene un getSchema método. Pero encontrar el identificador del esquema puede llevar mucho tiempo. Por lo general, es más eficiente establecer el esquema en el momento de la inicialización. Esto puede hacerse directamente por identificador o por nombre y versión. Además, al producir para múltiples temas, es posible que deseemos establecer diferentes esquemas para diferentes temas y encontrar el esquema del nombre del tema proporcionado como parámetro para el método serialize(T, String) . Esta lógica se omite en nuestros ejemplos en aras de la brevedad y la simplicidad.

private VersionedSchema getSchema(T data, String topic) {
  return schemaProvider.getMetadata( data.getSchema());
}

Con el esquema en la mano, necesitamos almacenarlo en nuestro mensaje. Serializar el ID como parte del mensaje nos brinda una solución compacta, ya que toda la magia ocurre en el Serializador/Deserializador. También permite una integración muy fácil con otros marcos y bibliotecas que ya son compatibles con Kafka y le permite al usuario usar su propio serializador (como Spark).

Usando este enfoque, primero escribimos el identificador de esquema en los primeros cuatro bytes.

private void writeSchemaId(ByteArrayOutputStream stream, int id) throws IOException {
    try (DataOutputStream os = new DataOutputStream(stream)) {
    os.writeInt(id);
  }
}

Entonces podemos crear un DatumWriter y serializar el objeto.

private void writeSerializedAvro(ByteArrayOutputStream stream, T data, Schema schema) throws IOException {
  BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(stream, null);
  DatumWriter<T> datumWriter = new GenericDatumWriter<>(schema);
  datumWriter.write(data, encoder);
  encoder.flush();
}

Poniendo todo esto junto, hemos implementado un serializador de datos genérico.

public class KafkaAvroSerializer<T extends GenericContainer> implements Serializer<T> {

  private SchemaProvider schemaProvider;

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
    schemaProvider = SchemaUtils.getSchemaProvider(configs);
  }

  @Override
  public byte[] serialize(String topic, T data) {
    try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
      VersionedSchema schema = getSchema(data, topic);
   
      writeSchemaId(stream, schema.getId());
      writeSerializedAvro(stream, data, schema.getSchema());
      return stream.toByteArray();
    } catch (IOException e) {
      throw new RuntimeException("Could not serialize data", e);
    }
  }

  private void writeSchemaId(ByteArrayOutputStream stream, int id) throws IOException {...}

  private void writeSerializedAvro(ByteArrayOutputStream stream, T data, Schema schema) throws IOException {...}

  private VersionedSchema getSchema(T data, String topic) {...}

  @Override
  public void close() {
    try {
      schemaProvider.close();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }
}

Deserializar datos genéricos

La deserialización puede funcionar con un solo esquema (los datos del esquema se escribieron), pero puede especificar un esquema de lector diferente. El esquema del lector debe ser compatible con el esquema con el que se serializaron los datos, pero no es necesario que sea equivalente. Por esta razón, introdujimos nombres de esquema. Ahora podemos especificar que queremos leer datos con una versión específica de un esquema. En el momento de la inicialización, leemos las versiones de esquema deseadas por nombre de esquema y almacenamos metadatos en readerSchemasByName para un acceso rápido. Ahora podemos leer cada registro escrito con una versión compatible del esquema como si estuviera escrito con la versión especificada.

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
  this.schemaProvider = SchemaUtils.getSchemaProvider(configs);
  this.readerSchemasByName = SchemaUtils.getVersionedSchemas(configs, schemaProvider);
}

Cuando es necesario deserializar un registro, primero leemos el identificador del esquema del escritor. Esto permite buscar el esquema del lector por nombre. Con ambos esquemas disponibles podemos crear un GeneralDatumReader y leer el registro.

@Override
public GenericData.Record deserialize(String topic, byte[] data) {
  try (ByteArrayInputStream stream = new ByteArrayInputStream(data)) {

    int schemaId = readSchemaId(stream);
    VersionedSchema writerSchema = schemaProvider.get(schemaId);

    VersionedSchema readerSchema =
        readerSchemasByName.get(writerSchema.getName());
    GenericData.Record avroRecord = readAvroRecord(stream,
        writerSchema.getSchema(), readerSchema.getSchema());
    return avroRecord;
  } catch (IOException e) {
    throw new RuntimeException(e);
  }
}

private int readSchemaId(InputStream stream ) throws IOException {
  try(DataInputStream is = new DataInputStream(stream)) {
    return is.readInt();
  }
}

private GenericData.Record readAvroRecord(InputStream stream, Schema writerSchema, Schema readerSchema) throws IOException {
  DatumReader<Object> datumReader = new GenericDatumReader<>(writerSchema,
      readerSchema);
  BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null);
  GenericData.Record record = new GenericData.Record(readerSchema);
  datumReader.read(record, decoder);
  return record;
}

Manejo de registros específicos

La mayoría de las veces hay una clase que queremos usar para nuestros registros. Esta clase generalmente se genera a partir de un esquema de Avro. Apache Avro proporciona herramientas para generar código Java a partir de esquemas. Una de esas herramientas es el complemento Avro Maven. Las clases generadas tienen el esquema a partir del cual se generaron disponibles en tiempo de ejecución. Esto hace que la serialización y la deserialización sean más simples y efectivas. Para la serialización podemos usar la clase para conocer el identificador de esquema a usar.

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
  String className = configs.get(isKey ? KEY_RECORD_CLASSNAME : VALUE_RECORD_CLASSNAME).toString();
  try (SchemaProvider schemaProvider = SchemaUtils.getSchemaProvider(configs)) {
    Class<?> recordClass = Class.forName(className);
    Schema writerSchema = new
        SpecificData(recordClass.getClassLoader()).getSchema(recordClass);
    this.writerSchemaId = schemaProvider.getMetadata(writerSchema).getId();
  } catch (Exception e) {
    throw new RuntimeException(e);
  }
}

Por lo tanto, no necesitamos la lógica para determinar el esquema a partir del tema y los datos. Usamos el esquema disponible en la clase de registro para escribir registros.

De manera similar, para la deserialización, el esquema del lector se puede encontrar en la propia clase. La lógica de deserialización se vuelve más simple, porque el esquema del lector se corrige en el momento de la configuración y no es necesario buscarlo por nombre de esquema.

@Override
public T deserialize(String topic, byte[] data) {
  try (ByteArrayInputStream stream = new ByteArrayInputStream(data)) {
    int schemaId = readSchemaId(stream);
    VersionedSchema writerSchema = schemaProvider.get(schemaId);
    return readAvroRecord(stream, writerSchema.getSchema(), readerSchema);
  } catch (IOException e) {
    throw new RuntimeException(e);
  }
}

private T readAvroRecord(InputStream stream, Schema writerSchema, Schema readerSchema) throws IOException {
  DatumReader<T> datumReader = new SpecificDatumReader<>(writerSchema, readerSchema);
  BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null);
  return datumReader.read(null, decoder);
}

Lectura adicional

Para obtener más información sobre la compatibilidad de esquemas, consulte la especificación de Avro para la resolución de esquemas.

Para obtener más información sobre formularios canónicos, consulte la especificación de Avro para el análisis de formularios canónicos para esquemas.

La próxima vez...

La Parte 2 mostrará una implementación de un sistema para almacenar las definiciones del esquema de Avro.