我可以将流“分支”为多个并在 pyspark 中并行写入它们吗?

Dim*_*ims 3 apache-kafka pyspark spark-structured-streaming

我正在 pyspark 中接收 Kafka 流。目前,我将其按一组字段分组并向数据库写入更新:

df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", config["kafka"]["bootstrap.servers"]) \
        .option("subscribe", topic)

...

df = df \
        .groupBy("myfield1") \
        .agg(
            expr("count(*) as cnt"),
            min(struct(col("mycol.myfield").alias("mmm"), col("*"))).alias("minData")
        ) \
        .select("cnt", "minData.*") \
        .select(
            col("...").alias("..."),
            ...
            col("userId").alias("user_id")

query = df \
        .writeStream \
        .outputMode("update") \
        .foreachBatch(lambda df, epoch: write_data_frame(table_name, df, epoch)) \
        .start()

query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

我可以在中间采用相同的链并创建另一个分组,例如

df2 = df \
        .groupBy("myfield2") \
        .agg(
            expr("count(*) as cnt"),
            min(struct(col("mycol.myfield").alias("mmm"), col("*"))).alias("minData")
        ) \
        .select("cnt", "minData.*") \
        .select(
            col("...").alias("..."),
            ...
            col("userId").alias("user_id")
Run Code Online (Sandbox Code Playgroud)

并将其输出并行写入不同的地方?

在哪里writeStream打电话awaitTermination

mik*_*ike 5

是的,您可以将 Kafka 输入流分支为任意数量的流查询。

您需要考虑以下事项:

  1. query.awaitTermination是一种阻塞方法,这意味着您在此方法之后编写的任何代码都不会执行,直到该方法query终止为止。
  2. 每个“分支”流查询都将并行运行,因此在每个 writeStream 调用中定义检查点位置非常重要。

总的来说,您的代码需要具有以下结构:

df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", config["kafka"]["bootstrap.servers"]) \
        .option("subscribe", topic) \
        .[...]

# note that I changed the variable name to "df1"
df1 = df \
    .groupBy("myfield1") \
    .[...]

df2 = df \
    .groupBy("myfield2") \
    .[...]


query1 = df1 \
        .writeStream \
        .outputMode("update") \
        .option("checkpointLocation", "/tmp/checkpointLoc1") \
        .foreachBatch(lambda df, epoch: write_data_frame(table_name, df1, epoch)) \
        .start()

query2 = df2 \
        .writeStream \
        .outputMode("update") \
        .option("checkpointLocation", "/tmp/checkpointLoc2") \
        .foreachBatch(lambda df, epoch: write_data_frame(table_name, df2, epoch)) \
        .start()

spark.streams.awaitAnyTermination
Run Code Online (Sandbox Code Playgroud)

只是一个附加说明:在您显示的代码中,您正在覆盖df,因此 的 推导df2可能无法获得您预期的结果。