sql >> Base de Datos >  >> NoSQL >> MongoDB

Sink Kafka Stream a MongoDB usando PySpark Structured Streaming

Encontré una solución. Como no pude encontrar el controlador Mongo adecuado para la transmisión estructurada, trabajé en otra solución. Ahora, uso la conexión directa a mongoDb y uso "foreach(...)" en lugar de foreachbatch(. ..). Mi código se ve así en el archivo testSpark.py:

....
import pymongo
from pymongo import MongoClient

local_url = "mongodb://localhost:27017"


def write_machine_df_mongo(target_df):

    cluster = MongoClient(local_url)
    db = cluster["test_db"]
    collection = db.test1

    post = {
            "machine_id": target_df.machine_id,
            "proc_type": target_df.proc_type,
            "sensor1_id": target_df.sensor1_id,
            "sensor2_id": target_df.sensor2_id,
            "time": target_df.time,
            "sensor1_val": target_df.sensor1_val,
            "sensor2_val": target_df.sensor2_val,
            }

    collection.insert_one(post)

machine_df.writeStream\
    .outputMode("append")\
    .foreach(write_machine_df_mongo)\
    .start()