使用 Spark Structured Streaming 从多个 Kafka 主题读取并写入不同接收器的最佳方法是什么?

Bra*_*don 7 apache-kafka apache-spark pyspark spark-structured-streaming

我正在尝试编写一个 Spark Structured Streaming 作业,该作业从多个 Kafka 主题(可能是 100 个)读取并根据主题名称将结果写入 S3 上的不同位置。我开发了这段代码,它当前从多个主题中读取并将结果输出到控制台(基于循环),它按预期工作。但是,我想了解性能影响是什么。这是推荐的方法吗?不建议有多个 readStream 和 writeStream 操作吗?如果是这样,推荐的方法是什么?

my_topics = ["topic_1", "topic_2"]

for i in my_topics:
    df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", bootstrap_servers) \
        .option("subscribePattern", i) \
        .load() \
        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

    output_df = df \
        .writeStream \
        .format("console") \
        .option("truncate", False) \
        .outputMode("update") \
        .option("checkpointLocation", "s3://<MY_BUCKET>/{}".format(i)) \
        .start()
Run Code Online (Sandbox Code Playgroud)

Dou*_*s M 6

每个驱动程序节点运行多个并发流当然是合理的。

每个.start()都会消耗spark中一定量的driver资源。您的限制因素将是驱动程序节点上的负载及其可用资源。数百个连续高速运行的主题需要分布在多个驱动程序节点上 [在 Databricks 中,每个集群有一个驱动程序]。正如您提到的,Spark 的优点是多个接收器以及用于转换的统一批处理和流处理 API。

另一个问题是处理您最终可能对 S3 进行的小写入和文件一致性。查看 delta.io 以处理对 S3 的一致且可靠的写入。


Sri*_*vas 5

以下方法的优点。

  1. 通用的
  2. 多线程,所有线程都将单独工作。
  3. 易于维护代码并支持任何问题。
  4. 如果一个主题失败,不会影响生产中的其他主题。你只需要专注于失败的人。
  5. 如果您想提取特定主题的所有数据,您只需停止该主题的作业,更新或更改配置并重新启动同一作业即可。

注意- 下面的代码不是完整的通用代码,您可能需要更改或调整下面的代码。

topic="" // Get value from input arguments
sink="" // Get value from input arguments

df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", bootstrap_servers) \
        .option("subscribePattern", topic) \
        .load() \
        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

    output_df = df \
        .writeStream \
        .format("console") \
        .option("truncate", False) \
        .outputMode("update") \
        .option("checkpointLocation", sink) \
        .start()        

Run Code Online (Sandbox Code Playgroud)

以下方法存在问题。

  1. 如果一个主题失败,它将终止整个程序。
  2. 有限的线程。
  3. 难以维护代码、调试和支持任何问题。
  4. 如果您想从 kafka 中提取特定主题的所有数据,这是不可能的,因为任何配置更改都将应用于所有主题,因此其操作成本太高。
my_topics = ["topic_1", "topic_2"]

for i in my_topics:
    df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", bootstrap_servers) \
        .option("subscribePattern", i) \
        .load() \
        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

    output_df = df \
        .writeStream \
        .format("console") \
        .option("truncate", False) \
        .outputMode("update") \
        .option("checkpointLocation", "s3://<MY_BUCKET>/{}".format(i)) \
        .start()
Run Code Online (Sandbox Code Playgroud)