小编Ram*_*Ram的帖子

如何将流式数据帧写入 PostgreSQL?

我有一个流数据帧,我正在尝试将其写入数据库。有将 rdd 或 df 写入 Postgres 的文档。但是,我无法找到有关如何在结构化流中完成此操作的示例或文档。

我已阅读文档https://spark.apache.org/docs/latest/structed-streaming-programming-guide.html#foreachbatch,但我无法理解在哪里创建 jdbc 连接以及如何编写它到数据库。

def foreach_batch_function(df, epoch_id):
    # what goes in here?
    pass

view_counts_query = windowed_view_counts.writeStream \
    .outputMode("append") \
    .foreachBatch(foreach_batch_function)
    .option("truncate", "false") \
    .trigger(processingTime="5 seconds") \
    .start() \
    .awaitTermination()
Run Code Online (Sandbox Code Playgroud)

该函数接收常规数据帧并写入 postgres 表

def postgres_sink(config, data_frame):
    config.read('/src/config/config.ini')
    dbname = config.get('dbauth', 'dbname')
    dbuser = config.get('dbauth', 'user')
    dbpass = config.get('dbauth', 'password')
    dbhost = config.get('dbauth', 'host')
    dbport = config.get('dbauth', 'port')

    url = "jdbc:postgresql://"+dbhost+":"+dbport+"/"+dbname
    properties = {
        "driver": "org.postgresql.Driver",
        "user": dbuser,
        "password": dbpass
    }

    data_frame.write.jdbc(url=url, table="metrics", mode="append", …
Run Code Online (Sandbox Code Playgroud)

postgresql apache-spark pyspark spark-structured-streaming

4
推荐指数
1
解决办法
7163
查看次数