Ath*_*tif 4 apache-spark apache-spark-sql spark-structured-streaming
我正在尝试将两个流合并为一个并将结果写入主题
代码:1-阅读两个主题
val PERSONINFORMATION_df: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "xx:9092")
.option("subscribe", "PERSONINFORMATION")
.option("group.id", "info")
.option("maxOffsetsPerTrigger", 1000)
.option("startingOffsets", "earliest")
.load()
val CANDIDATEINFORMATION_df: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "xxx:9092")
.option("subscribe", "CANDIDATEINFORMATION")
.option("group.id", "candent")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 1000)
.option("failOnDataLoss", "false")
.load()
Run Code Online (Sandbox Code Playgroud)
2-解析数据以加入它们:
val parsed_PERSONINFORMATION_df: DataFrame = PERSONINFORMATION_df
.select(from_json(expr("cast(value as string) as actualValue"), schemaPERSONINFORMATION).as("s")).select("s.*")
val parsed_CANDIDATEINFORMATION_df: DataFrame = CANDIDATEINFORMATION_df
.select(from_json(expr("cast(value as string) as actualValue"), schemaCANDIDATEINFORMATION).as("s")).select("s.*")
val df_person = parsed_PERSONINFORMATION_df.as("dfperson")
val df_candidate = parsed_CANDIDATEINFORMATION_df.as("dfcandidate")
Run Code Online (Sandbox Code Playgroud)
3-连接两个框架
val joined_df : DataFrame = df_candidate.join(df_person, col("dfcandidate.PERSONID") === col("dfperson.ID"),"inner")
val string2json: DataFrame = joined_df.select($"dfcandidate.ID".as("key"),to_json(struct($"dfcandidate.ID", $"FULLNAME", $"PERSONALID")).cast("String").as("value"))
Run Code Online (Sandbox Code Playgroud)
4-将它们写成一个主题
string2json.writeStream.format("kafka")
.option("kafka.bootstrap.servers", xxxx:9092")
.option("topic", "toDelete")
.option("checkpointLocation", "checkpoints")
.option("failOnDataLoss", "false")
.start()
.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
错误信息:
21/01/25 11:01:41 ERROR streaming.MicroBatchExecution: Query [id = 9ce8bcf2-0299-42d5-9b5e-534af8d689e3, runId = 0c0919c6-f49e-48ae-a635-2e95e31fdd50] terminated with error
java.lang.AssertionError: assertion failed: There are [1] sources in the checkpoint offsets and now there are [2] sources requested by the query. Cannot continue.
Run Code Online (Sandbox Code Playgroud)
您的代码对我来说看起来很好,而是导致问题的检查点。
根据您收到的错误消息,您可能仅使用一个流源运行此作业。然后,您添加了流连接的代码,并尝试重新启动应用程序而不删除现有的检查点文件。现在,应用程序尝试从检查点文件中恢复,但意识到您最初只有一个源,现在有两个源。
流式查询更改后的恢复语义部分解释了使用检查点时允许和不允许哪些更改。不允许更改输入源的数量:
“改变输入源的数量或类型(即不同的来源):这是不允许的。”
要解决您的问题: 删除当前检查点文件并重新启动作业。
归档时间: |
|
查看次数: |
2745 次 |
最近记录: |