如何有效地更新文件经常被修改的 Impala 表

Vic*_*tor 9 hadoop impala cloudera-cdh spark-structured-streaming

我们有一个基于 Hadoop 的解决方案 (CDH 5.15),我们在 HDFS 中的某些目录中获取新文件。在 os 这些目录上,我们有 4-5 个 Impala (2.1) 表。在 HDFS 中写入这些文件的过程是 Spark Structured Streaming (2.3.1)

现在,一旦我们将文件写入 HDFS,我们就会运行一些 DDL 查询:

  • ALTER TABLE table1 RECOVER PARTITONS 检测添加到表中的新分区(及其 HDFS 目录和文件)。

  • REFRESH table1 PARTITIONS (partition1=X, partition2=Y),使用每个分区的所有键。

现在,这个 DDL 花费的时间有点长,而且它们在我们的系统中排队,破坏了系统的数据可用性。

所以,我的问题是:有没有办法更有效地进行这种数据整合?

我们考虑过:

  • 使用ALTER TABLE .. RECOVER PARTITONS但根据文档,它只会刷新新分区。

  • 尝试REFRESH .. PARTITON ...一次与多个分区一起使用,但语句语法不允许这样做。

  • 尝试批处理查询,但 Hive JDBC 驱动器不支持批处理查询。

  • 鉴于系统已经很忙,我们是否应该尝试并行执行这些更新?

  • 你知道的其他方式吗?

谢谢!

胜利者

注意:我们知道哪些分区需要刷新的方式是使用 HDFS 事件,就像 Spark Structured Streaming 我们不知道文件何时被写入一样。

注意#2:另外,用 HDFS 编写的文件有时很小,所以如果可以同时合并这些文件就太好了。

Vic*_*tor 5

由于似乎没有人能解决我的问题,我想分享我们为提高处理效率而采取的方法,非常欢迎提出意见。

我们发现(文档对此不是很清楚)HDFS 中的 Spark“检查点”中存储的一些信息是许多元数据文件,描述每个 Parquet 文件的写入时间及其大小:

$hdfs dfs -ls -h hdfs://...../my_spark_job/_spark_metadata

w-r--r--   3 hdfs 68K   2020-02-26 20:49 hdfs://...../my_spark_job/_spark_metadata/3248
rw-r--r--  3 hdfs 33.3M 2020-02-26 20:53 hdfs://...../my_spark_job/_spark_metadata/3249.compact
w-r--r--   3 hdfs 68K   2020-02-26 20:54 hdfs://...../my_spark_job/_spark_metadata/3250
...

$hdfs dfs -cat hdfs://...../my_spark_job/_spark_metadata/3250
v1
{"path":"hdfs://.../my_spark_job/../part-00004.c000.snappy.parquet","size":9866555,"isDir":false,"modificationTime":1582750862638,"blockReplication":3,"blockSize":134217728,"action":"add"}
{"path":"hdfs://.../my_spark_job/../part-00004.c001.snappy.parquet","size":526513,"isDir":false,"modificationTime":1582750862834,"blockReplication":3,"blockSize":134217728,"action":"add"}
...
Run Code Online (Sandbox Code Playgroud)

所以,我们所做的是:

  • 构建轮询该文件夹的 Spark Streaming 作业_spark_metadata
    • 我们使用 afileStream因为它允许我们定义要使用的文件过滤器。
    • 该流中的每个条目都是这些 JSON 行之一,解析该行以提取文件路径和大小。
  • 按文件所属的父文件夹(映射到每个 Impala 分区)对文件进行分组。
  • 对于每个文件夹:
    • 读取仅加载目标 Parquet 文件的数据帧(以避免与写入文件的其他作业发生竞争情况)
    • 计算要写入的块数(使用 JSON 中的大小字段和目标块大小)
    • 将数据帧合并到所需数量的分区并将其写回 HDFS
    • 执行DDLREFRESH TABLE myTable PARTITION ([partition keys derived from the new folder]
  • 最后删除源文件

我们取得的成果是:

  • 通过对每个分区和批次执行一次刷新来限制 DDL。

  • 通过可配置批处理时间和块大小,我们能够使我们的产品适应具有更大或更小数据集的不同部署场景。

  • 该解决方案非常灵活,因为我们可以为 Spark Streaming 作业分配更多或更少的资源(执行程序、核心、内存等),并且我们还可以启动/停止它(使用其自己的检查点系统)。

  • 我们还在研究在执行此过程时应用一些数据重新分区的可能性,以使分区尽可能接近最佳大小。