从Kafka读取并在镶木地板中写入hdfs

Hen*_*sis 6 hadoop hdfs apache-kafka apache-spark parquet

我是BigData生态系统的新手,有点入门。

我已经阅读了几篇有关使用Spark Streaming阅读Kafka主题的文章,但想知道是否可以使用Spark Job而不是Streaming从Kafka阅读?如果是,你们可以帮助我指出一些可以帮助我入门的文章或代码片段。

我第二部分的问题是以木地板格式写入hdfs。一旦我从Kafka看了书,我就假设我将有一个rdd。将此rdd转换为数据帧,然后将该数据帧作为木地板文件写入。这是正确的方法吗?

任何帮助表示赞赏。

谢谢

him*_*ian 7

对于从 Kafka 读取数据并将其写入 HDFS,在 Parquet 格式中,使用 Spark Ba​​tch 作业而不是流,您可以使用Spark Structured Streaming

Structured Streaming 是一种基于 Spark SQL 引擎构建的可扩展且容错的流处理引擎。您可以像在静态数据上表达批处理计算一样表达流式计算。Spark SQL 引擎将负责以增量方式连续运行它,并随着流数据的不断到达更新最终结果。您可以使用 Scala、Java、Python 或 R 中的 Dataset/DataFrame API 来表达流聚合、事件时间窗口、流到批处理连接等。计算在同一个优化的 Spark SQL 引擎上执行。最后,系统通过检查点和预写日志来确保端到端的一次性容错保证。简而言之,Structured Streaming 提供了快速、可扩展、容错、

它带有 Kafka 作为内置 Source,即我们可以从 Kafka 轮询数据。它与 Kafka 代理版本 0.10.0 或更高版本兼容。

为了以批处理模式从 Kafka 中提取数据,您可以为定义的偏移范围创建一个 Dataset/DataFrame。

// Subscribe to 1 topic defaults to the earliest and latest offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to multiple topics, specifying explicit Kafka offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to a pattern, at the earliest and latest offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
Run Code Online (Sandbox Code Playgroud)

源中的每一行都具有以下架构:

| Column           | Type          |
|:-----------------|--------------:|
| key              |        binary |
| value            |        binary |
| topic            |        string |
| partition        |           int |
| offset           |          long |
| timestamp        |          long |
| timestampType    |           int |
Run Code Online (Sandbox Code Playgroud)

现在,要将数据以 parquet 格式写入 HDFS,可以编写以下代码:

df.write.parquet("hdfs://data.parquet")
Run Code Online (Sandbox Code Playgroud)

有关 Spark Structured Streaming + Kafka 的更多信息,请参阅以下指南 - Kafka 集成指南

我希望它有帮助!

  • 谢谢Himanshu,这很有帮助。似乎这需要 Spark 2.2,在 2.0.0 等较低版本的 Spark 中还有其他方法可以做到这一点吗? (2认同)

Tag*_*gar 5

你已经有几个关于这个话题的好答案。

只是想强调一下 - 小心直接流入镶木地板。当 parquet 行组大小足够大时,Parquet 的性能会大放异彩(为简单起见,您可以说文件大小应该在 64-256Mb 之间),以利用字典压缩、布隆过滤器等(一个 Parquet 文件可以有多个行块,通常每个文件中有多个行块;虽然行块不能跨越多个镶木地板文件)

如果您直接流式传输到镶木地板,那么您最终很可能会得到一堆小型镶木地板文件(取决于 Spark Streaming 的小批量大小和数据量)。查询此类文件可能非常缓慢。例如,Parquet 可能需要读取所有文件的标头以协调架构,这是一个很大的开销。如果是这种情况,您将需要有一个单独的进程,例如,作为一种解决方法,读取旧文件并将它们“合并”写入(这不是简单的文件级合并,一个进程会实际上需要读入所有镶木地板数据并溢出更大的镶木地板文件)。

此解决方法可能会破坏数据“流式传输”的原始目的。您也可以在这里查看其他技术——比如 Apache Kudu、Apache Kafka、Apache Druid、Kinesis 等,它们可以在这里更好地工作。

更新:自从我发布了这个答案,现在这里有一个新的强大的玩家 - Delta Lakehttps://delta.io/如果你习惯了镶木地板,你会发现 Delta 非常有吸引力(实际上,Delta 是建立在镶木地板 + 元数据之上)。三角洲湖提供:

Spark 上的 ACID 事务:

  • 可序列化的隔离级别确保读者永远不会看到不一致的数据。
  • 可扩展的元数据处理:利用 Spark 的分布式处理能力轻松处理 PB 级表的所有元数据,其中包含数十亿个文件。
  • 流式和批处理统一:Delta Lake 中的表是批处理表,也是流式源和接收器。流数据摄取、批量历史回填、交互式查询都是开箱即用的。
  • 架构实施:自动处理架构变化以防止在摄取期间插入不良记录。
  • 时间旅行:数据版本控制支持回滚、完整的历史审计跟踪和可重复的机器学习实验。
  • Upserts 和 deletes:支持合并、更新和删除操作,以支持复杂的用例,如更改数据捕获、缓慢变化维度 (SCD) 操作、流式更新插入等。