sql >> Base de Datos >  >> NoSQL >> MongoDB

MongoDBObject no se agrega dentro de un bucle foreach rrd casbah scala apache spark

Los cálculos en los RDD se distribuyen por el clúster. No puede actualizar una variable que se creó fuera del cierre de la operación de RDD desde dentro del RDD. Están básicamente en dos lugares diferentes:la variable se crea en el controlador Spark y se accede a ella en los trabajadores y debe tratarse como de solo lectura.

Spark admite acumuladores distribuidos que podrían usarse en este caso:Spark Cummulators

Otra opción (la que prefiero) es transformar el flujo de RDD en el formato de datos deseado y usar foreachRDD método para persistir en el almacenamiento secundario. Esta sería una forma más funcional de abordar el problema. Más o menos se vería así:

  val filteredStream = twitterStream.filter(entry => filters.exists(term => entry.getText.getStatus.contains(term)))
  val filteredStreamWithTs = filteredStream.map(x => ((DateTime.now.toString(), x)))
  filteredStreamWithTs.foreachRdd(rdd => // write to Mongo)