在Spark中按时间戳对Parquet文件进行分区的最佳实践是什么?

Adr*_*ett 5 apache-spark pyspark

我对Spark非常陌生(2天),并且正在考虑对镶木地板文件进行分区的最佳方法。

我的粗略计划ATM是:

  • 使用com.databricks.spark.csv读取源TSV文件(这些文件具有TimeStampType列)
  • 写出实木复合地板文件,按年/月/日/小时划分
  • 将这些实木复合地板文件用于以后将要发生的所有查询

获得一个简单的版本一直很容易(对Spark开发人员来说是个荣誉)-除了按照我想要的方式进行分区之外。这是在python BTW中:

input = sqlContext.read.format('com.databricks.spark.csv').load(source, schema=myschema)
input.write.partitionBy('type').format("parquet").save(dest, mode="append")
Run Code Online (Sandbox Code Playgroud)

映射RDD的最佳方法是添加年,月,日,小时的新列,然后使用PartitionBy吗?那么对于任何查询,我们必须手动添加年/月等?考虑到到目前为止我已经发现火花如此优雅,这似乎有点奇怪。

谢谢

Adr*_*ett 4

我现在找到了几种方法来做到这一点,尚未对它们进行性能测试,买者自负:

首先我们需要创建一个派生的DataFrame(如下所示的三种方式)然后将其写出来。

1)sql查询(内联函数)

sqlContext.registerFunction("day",lambda f: f.day, IntegerType())
input.registerTempTable("input")
input_ts = sqlContext.sql(
  "select day(inserted_at) AS inserted_at_day, * from input")
Run Code Online (Sandbox Code Playgroud)

2) sql 查询(非内联)- 非常相似

def day(ts):
  return f.day
sqlContext.registerFunction("day",day, IntegerType())
... rest as before
Run Code Online (Sandbox Code Playgroud)

3)带列

from pyspark.sql.functions import udf
day = udf(lambda f: f.day, IntegerType())
input_ts = input.withColumn('inserted_at_day',day(input.inserted_at))
Run Code Online (Sandbox Code Playgroud)

只需写出:

input_ts.write.partitionBy(['inserted_at_day']).format("parquet").save(dest, mode="append")
Run Code Online (Sandbox Code Playgroud)