Vic*_*tor 9 hadoop impala cloudera-cdh spark-structured-streaming
我们有一个基于 Hadoop 的解决方案 (CDH 5.15),我们在 HDFS 中的某些目录中获取新文件。在 os 这些目录上,我们有 4-5 个 Impala (2.1) 表。在 HDFS 中写入这些文件的过程是 Spark Structured Streaming (2.3.1)
现在,一旦我们将文件写入 HDFS,我们就会运行一些 DDL 查询:
ALTER TABLE table1 RECOVER PARTITONS
检测添加到表中的新分区(及其 HDFS 目录和文件)。
REFRESH table1 PARTITIONS (partition1=X, partition2=Y)
,使用每个分区的所有键。
现在,这个 DDL 花费的时间有点长,而且它们在我们的系统中排队,破坏了系统的数据可用性。
所以,我的问题是:有没有办法更有效地进行这种数据整合?
我们考虑过:
使用ALTER TABLE .. RECOVER PARTITONS
但根据文档,它只会刷新新分区。
尝试REFRESH .. PARTITON ...
一次与多个分区一起使用,但语句语法不允许这样做。
尝试批处理查询,但 Hive JDBC 驱动器不支持批处理查询。
鉴于系统已经很忙,我们是否应该尝试并行执行这些更新?
谢谢!
胜利者
注意:我们知道哪些分区需要刷新的方式是使用 HDFS 事件,就像 Spark Structured Streaming 我们不知道文件何时被写入一样。
注意#2:另外,用 HDFS 编写的文件有时很小,所以如果可以同时合并这些文件就太好了。
由于似乎没有人能解决我的问题,我想分享我们为提高处理效率而采取的方法,非常欢迎提出意见。
我们发现(文档对此不是很清楚)HDFS 中的 Spark“检查点”中存储的一些信息是许多元数据文件,描述每个 Parquet 文件的写入时间及其大小:
$hdfs dfs -ls -h hdfs://...../my_spark_job/_spark_metadata
w-r--r-- 3 hdfs 68K 2020-02-26 20:49 hdfs://...../my_spark_job/_spark_metadata/3248
rw-r--r-- 3 hdfs 33.3M 2020-02-26 20:53 hdfs://...../my_spark_job/_spark_metadata/3249.compact
w-r--r-- 3 hdfs 68K 2020-02-26 20:54 hdfs://...../my_spark_job/_spark_metadata/3250
...
$hdfs dfs -cat hdfs://...../my_spark_job/_spark_metadata/3250
v1
{"path":"hdfs://.../my_spark_job/../part-00004.c000.snappy.parquet","size":9866555,"isDir":false,"modificationTime":1582750862638,"blockReplication":3,"blockSize":134217728,"action":"add"}
{"path":"hdfs://.../my_spark_job/../part-00004.c001.snappy.parquet","size":526513,"isDir":false,"modificationTime":1582750862834,"blockReplication":3,"blockSize":134217728,"action":"add"}
...
Run Code Online (Sandbox Code Playgroud)
所以,我们所做的是:
_spark_metadata
。
fileStream
因为它允许我们定义要使用的文件过滤器。REFRESH TABLE myTable PARTITION ([partition keys derived from the new folder]
我们取得的成果是:
通过对每个分区和批次执行一次刷新来限制 DDL。
通过可配置批处理时间和块大小,我们能够使我们的产品适应具有更大或更小数据集的不同部署场景。
该解决方案非常灵活,因为我们可以为 Spark Streaming 作业分配更多或更少的资源(执行程序、核心、内存等),并且我们还可以启动/停止它(使用其自己的检查点系统)。
我们还在研究在执行此过程时应用一些数据重新分区的可能性,以使分区尽可能接近最佳大小。