相关疑难解决方法(0)

Spark 结构化流 - 由于输入源数量增加而导致检查点中出现断言错误

我正在尝试将两个流合并为一个并将结果写入主题

代码:1-阅读两个主题

val PERSONINFORMATION_df: DataFrame = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "xx:9092")
    .option("subscribe", "PERSONINFORMATION")
    .option("group.id", "info")
    .option("maxOffsetsPerTrigger", 1000)
    .option("startingOffsets", "earliest")
    .load()


val CANDIDATEINFORMATION_df: DataFrame = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "xxx:9092")
    .option("subscribe", "CANDIDATEINFORMATION")
    .option("group.id", "candent")
    .option("startingOffsets", "earliest")
    .option("maxOffsetsPerTrigger", 1000)
    .option("failOnDataLoss", "false")
    .load()
Run Code Online (Sandbox Code Playgroud)

2-解析数据以加入它们:

val parsed_PERSONINFORMATION_df: DataFrame = PERSONINFORMATION_df
      .select(from_json(expr("cast(value as string) as actualValue"), schemaPERSONINFORMATION).as("s")).select("s.*")

   val parsed_CANDIDATEINFORMATION_df: DataFrame = CANDIDATEINFORMATION_df
      .select(from_json(expr("cast(value as string) as actualValue"), schemaCANDIDATEINFORMATION).as("s")).select("s.*")

   val df_person = parsed_PERSONINFORMATION_df.as("dfperson")
   val df_candidate = parsed_CANDIDATEINFORMATION_df.as("dfcandidate")
Run Code Online (Sandbox Code Playgroud)

3-连接两个框架

  val joined_df : DataFrame = df_candidate.join(df_person, col("dfcandidate.PERSONID") === col("dfperson.ID"),"inner")

  val …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql spark-structured-streaming

4
推荐指数
1
解决办法
2745
查看次数