我有一个流数据帧,我正在尝试将其写入数据库。有将 rdd 或 df 写入 Postgres 的文档。但是,我无法找到有关如何在结构化流中完成此操作的示例或文档。
我已阅读文档https://spark.apache.org/docs/latest/structed-streaming-programming-guide.html#foreachbatch,但我无法理解在哪里创建 jdbc 连接以及如何编写它到数据库。
def foreach_batch_function(df, epoch_id):
# what goes in here?
pass
view_counts_query = windowed_view_counts.writeStream \
.outputMode("append") \
.foreachBatch(foreach_batch_function)
.option("truncate", "false") \
.trigger(processingTime="5 seconds") \
.start() \
.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
该函数接收常规数据帧并写入 postgres 表
def postgres_sink(config, data_frame):
config.read('/src/config/config.ini')
dbname = config.get('dbauth', 'dbname')
dbuser = config.get('dbauth', 'user')
dbpass = config.get('dbauth', 'password')
dbhost = config.get('dbauth', 'host')
dbport = config.get('dbauth', 'port')
url = "jdbc:postgresql://"+dbhost+":"+dbport+"/"+dbname
properties = {
"driver": "org.postgresql.Driver",
"user": dbuser,
"password": dbpass
}
data_frame.write.jdbc(url=url, table="metrics", mode="append", …Run Code Online (Sandbox Code Playgroud)