sql >> Base de Datos >  >> NoSQL >> Redis

Redis en Spark:tarea no serializable

En Spark, las funciones en RDD s (como map aquí) son serializados y enviados a los albaceas para su procesamiento. Esto implica que todos los elementos contenidos dentro de esas operaciones deben ser serializables.

La conexión de Redis aquí no es serializable ya que abre conexiones TCP a la base de datos de destino que están vinculadas a la máquina donde se crea.

La solución es crear esas conexiones en los ejecutores, en el contexto de ejecución local. Hay pocas maneras de hacer eso. Dos que me vienen a la mente son:

  • rdd.mapPartitions :le permite procesar una partición completa a la vez y, por lo tanto, amortizar el costo de crear conexiones)
  • Administradores de conexiones singleton:crea la conexión una vez por ejecutor

mapPartitions es más fácil ya que todo lo que requiere es un pequeño cambio en la estructura del programa:

val perhit = perhitFile.mapPartitions{partition => 
    val r = new RedisClient("192.168.1.101", 6379) // create the connection in the context of the mapPartition operation
    val res = partition.map{ x =>
        ...
        val refStr = r.hmget(...) // use r to process the local data
    }
    r.close // take care of resources
    res
}

Un administrador de conexiones singleton se puede modelar con un objeto que contiene una referencia diferida a una conexión (nota:una referencia mutable también funcionará).

object RedisConnection extends Serializable {
   lazy val conn: RedisClient = new RedisClient("192.168.1.101", 6379)
}

Este objeto se puede usar para instanciar 1 conexión por JVM de trabajador y se usa como Serializable objeto en un cierre de operación.

val perhit = perhitFile.map{x => 
    val param = f(x)
    val refStr = RedisConnection.conn.hmget(...) // use RedisConnection to get a connection to the local data
    }
}

La ventaja de usar el objeto singleton es menos gastos generales, ya que JVM crea las conexiones solo una vez (en lugar de 1 por partición RDD)

También hay algunas desventajas:

  • la limpieza de las conexiones es complicada (gancho de apagado/temporizadores)
  • uno debe garantizar la seguridad de subprocesos de los recursos compartidos

(*) código proporcionado con fines ilustrativos. No compilado ni probado.