Par*_*son 5 amazon-s3 apache-spark pyspark spark-structured-streaming delta-lake
我有两个 pyspark 流工作:
streaming_job_a从 kafka 读取,将包含一列中的原始数据和另一列中的时间戳的数据帧写入 s3 中的位置,并使用位置A创建非托管增量表table_aAstreaming_job_b从增量表中读取table_a,将原始数据提取到单独的列中,写入到Bs3 中的位置,并创建非托管增量表table_b。如果我想更改这两个作业使用的位置和表名称,如何以保留数据、不会导致检查点问题并且花费最少时间的方式进行更改?必须保留这两个表,因为其他团队会读取这两个表。理想情况下,最终结果如下所示:
streaming_job_a从 kafka 读取,写入A_news3 中的位置并创建增量表table_a_newstreaming_job_b从 delta table 读取table_a_new,写入B_news3 中的位置,并创建 delta table table_b_new。我知道我可以从旧位置读取并写入新位置,如下所示:
incoming_df = spark.readStream.format("delta").table("table_a")
writer_df = (
incoming_df
.writeStream.format("delta")
.option("checkpointLocation", "A_new/_checkpoints")
.option("path", "A_new")
.trigger(once=True)
)
writer_df.start()
Run Code Online (Sandbox Code Playgroud)
然后创建新表:
spark.sql("create table table_a_new using delta location 'A_new'")
Run Code Online (Sandbox Code Playgroud)
然后执行类似的操作,但在这种方法中,我担心在迁移发生时丢失streaming_job_b写入位置的新数据。一般来说,我对 Spark Streaming 还很陌生,所以非常感谢任何建议!Astreaming_job_b
| 归档时间: |
|
| 查看次数: |
518 次 |
| 最近记录: |