Resolví mi problema. El motivo de los recuentos inconsistentes fue el MongoDefaultPartitioner que envuelve MongoSamplePartitioner que utiliza muestreo aleatorio. Para ser honesto, este es un valor predeterminado bastante extraño para mí. Personalmente, preferiría tener un particionador lento pero consistente. Los detalles de las opciones de partición se pueden encontrar en las opciones de configuración oficiales. documentación.
código:
val df = spark.read
.format("com.mongodb.spark.sql.DefaultSource")
.option("uri", "mongodb://127.0.0.1/enron_mail.messages")
.option("partitioner", "spark.mongodb.input.partitionerOptions.MongoPaginateBySizePartitioner ")
.load()