Esto aún no está publicado, pero en la rama principal de Alpakka, MongoSource.apply
toma un parámetro de tipo:
object MongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
Por lo tanto, con la próxima versión 0.18 de Alpakka, podrá hacer lo siguiente:
val source: Source[TodoMongo, NotUsed] = MongoSource[TodoMongo](todoCollection.find())
Tenga en cuenta que source
aquí asume que todoCollection.find()
devuelve un Observable[TodoMongo]
; ajusta los tipos según sea necesario.
Mientras tanto, simplemente puede agregar el código anterior manualmente. Por ejemplo:
package akka.stream.alpakka.mongodb.scaladsl
import akka.NotUsed
import akka.stream.alpakka.mongodb.ObservableToPublisher
import akka.stream.scaladsl.Source
import org.mongodb.scala.Observable
object MyMongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
Tenga en cuenta que MyMongoSource
está definido para residir en akka.stream.alpakka.mongodb.scaladsl
paquete (como MongoSource
), porque ObservableToPublisher
es una clase privada de paquete. Usarías MyMongoSource
de la misma manera que usaría MongoSource
:
val source: Source[TodoMongo, NotUsed] = MyMongoSource[TodoMongo](todoCollection.find())