过滤 Spark DataFrame 以获取按天分区的数据的 n 天窗口

and*_*rew 6 python apache-spark pyspark

假设我有一个使用transactions以下整数列调用的 DataFrame:year, month, day, timestamp, transaction_id

In [1]: transactions = ctx.createDataFrame([(2017, 12, 1, 10000, 1), (2017, 12, 2, 10001, 2), (2017, 12, 3, 10003, 3), (2017, 12, 4, 10004, 4), (2017, 12, 5, 10005, 5), (2017, 12, 6, 10006, 6)],('year', 'month', 'day', 'timestamp', 'transaction_id'))

In [2]: transactions.show()
+----+-----+---+---------+--------------+
|year|month|day|timestamp|transaction_id|
+----+-----+---+---------+--------------+
|2017|   12|  1|    10000|             1|
|2017|   12|  2|    10001|             2|
|2017|   12|  3|    10003|             3|
|2017|   12|  4|    10004|             4|
|2017|   12|  5|    10005|             5|
|2017|   12|  6|    10006|             6|
+----+-----+---+---------+--------------+
Run Code Online (Sandbox Code Playgroud)

我想定义一个函数filter_date_range,该函数返回一个 DataFrame,其中包含位于某个日期范围内的事务行。

>>> filter_date_range(  
        df = transactions, 
        start_date = datetime.date(2017, 12, 2), 
        end_date = datetime.date(2017, 12, 4)).show()

+----+-----+---+---------+--------------+
|year|month|day|timestamp|transaction_id|
+----+-----+---+---------+--------------+
|2017|   12|  1|    10001|             2|
|2017|   12|  1|    10003|             3|
|2017|   12|  1|    10004|             4|
+----+-----+---+---------+--------------+
Run Code Online (Sandbox Code Playgroud)

假设数据保存在 Hive 分区中,按year, month,分区day,执行此类涉及日期算术的过滤器的最有效方法是什么?我正在寻找一种以纯粹的 DataFrame-ic 方式执行此操作的方法,而不诉诸使用transactions.rdd,以便 Spark 可以推断实际上只需要读取分区的子集。

hi-*_*zir 2

如果数据是这样分区的:

\n\n
.\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 _SUCCESS\n\xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 year=2017\n    \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 month=12\n        \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 day=1\n        \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 part-0...parquet\n        \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 day=2\n        \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 part-0...parquet\n        \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 day=3\n        \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 part-0...parquet\n        \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 day=4\n        \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 part-0...parquet\n        \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 day=5\n        \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 part-0...parquet\n        \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 day=6\n            \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 part-0...parquet\n
Run Code Online (Sandbox Code Playgroud)\n\n

您可以生成要加载的目录列表:

\n\n
start_date = datetime.date(2017, 12, 2)\nend_date = datetime.date(2017, 12, 4)\nn = (end_date - start_date).days + 1\n\nbase_path = ...\n\npaths = [\n    "{}/year={}/month={}/day={}".format(base_path, d.year, d.month, d.day) \n    for d in [start_date + datetime.timedelta(days=i) for i in  range(n)]\n]\n\nspark.read.option("basePath", base_path).load(paths).explain()\n\n# == Parsed Logical Plan ==\n# Relation[timestamp#47L,transaction_id#48L,year#49,month#50,day#51] parquet\n# \n# == Analyzed Logical Plan ==\n# timestamp: bigint, transaction_id: bigint, year: int, month: int, day: int\n# Relation[timestamp#47L,transaction_id#48L,year#49,month#50,day#51] parquet\n# \n# == Optimized Logical Plan ==\n# Relation[timestamp#47L,transaction_id#48L,year#49,month#50,day#51] parquet\n# \n# == Physical Plan ==\n# *FileScan parquet [timestamp#47L,transaction_id#48L,year#49,month#50,day#51] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/transactions/year=2017/month=12/day=2, file:/user/hiv..., PartitionCount: 3, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<timestamp:bigint,transaction_id:bigint>\n
Run Code Online (Sandbox Code Playgroud)\n\n

参考:

\n\n\n