and*_*rew 6 python apache-spark pyspark
假设我有一个使用transactions以下整数列调用的 DataFrame:year, month, day, timestamp, transaction_id。
In [1]: transactions = ctx.createDataFrame([(2017, 12, 1, 10000, 1), (2017, 12, 2, 10001, 2), (2017, 12, 3, 10003, 3), (2017, 12, 4, 10004, 4), (2017, 12, 5, 10005, 5), (2017, 12, 6, 10006, 6)],('year', 'month', 'day', 'timestamp', 'transaction_id'))
In [2]: transactions.show()
+----+-----+---+---------+--------------+
|year|month|day|timestamp|transaction_id|
+----+-----+---+---------+--------------+
|2017| 12| 1| 10000| 1|
|2017| 12| 2| 10001| 2|
|2017| 12| 3| 10003| 3|
|2017| 12| 4| 10004| 4|
|2017| 12| 5| 10005| 5|
|2017| 12| 6| 10006| 6|
+----+-----+---+---------+--------------+
Run Code Online (Sandbox Code Playgroud)
我想定义一个函数filter_date_range,该函数返回一个 DataFrame,其中包含位于某个日期范围内的事务行。
>>> filter_date_range(
df = transactions,
start_date = datetime.date(2017, 12, 2),
end_date = datetime.date(2017, 12, 4)).show()
+----+-----+---+---------+--------------+
|year|month|day|timestamp|transaction_id|
+----+-----+---+---------+--------------+
|2017| 12| 1| 10001| 2|
|2017| 12| 1| 10003| 3|
|2017| 12| 1| 10004| 4|
+----+-----+---+---------+--------------+
Run Code Online (Sandbox Code Playgroud)
假设数据保存在 Hive 分区中,按year, month,分区day,执行此类涉及日期算术的过滤器的最有效方法是什么?我正在寻找一种以纯粹的 DataFrame-ic 方式执行此操作的方法,而不诉诸使用transactions.rdd,以便 Spark 可以推断实际上只需要读取分区的子集。
如果数据是这样分区的:
\n\n.\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 _SUCCESS\n\xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 year=2017\n \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 month=12\n \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 day=1\n \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 part-0...parquet\n \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 day=2\n \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 part-0...parquet\n \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 day=3\n \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 part-0...parquet\n \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 day=4\n \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 part-0...parquet\n \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 day=5\n \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 part-0...parquet\n \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 day=6\n \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 part-0...parquet\nRun Code Online (Sandbox Code Playgroud)\n\n您可以生成要加载的目录列表:
\n\nstart_date = datetime.date(2017, 12, 2)\nend_date = datetime.date(2017, 12, 4)\nn = (end_date - start_date).days + 1\n\nbase_path = ...\n\npaths = [\n "{}/year={}/month={}/day={}".format(base_path, d.year, d.month, d.day) \n for d in [start_date + datetime.timedelta(days=i) for i in range(n)]\n]\n\nspark.read.option("basePath", base_path).load(paths).explain()\n\n# == Parsed Logical Plan ==\n# Relation[timestamp#47L,transaction_id#48L,year#49,month#50,day#51] parquet\n# \n# == Analyzed Logical Plan ==\n# timestamp: bigint, transaction_id: bigint, year: int, month: int, day: int\n# Relation[timestamp#47L,transaction_id#48L,year#49,month#50,day#51] parquet\n# \n# == Optimized Logical Plan ==\n# Relation[timestamp#47L,transaction_id#48L,year#49,month#50,day#51] parquet\n# \n# == Physical Plan ==\n# *FileScan parquet [timestamp#47L,transaction_id#48L,year#49,month#50,day#51] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/transactions/year=2017/month=12/day=2, file:/user/hiv..., PartitionCount: 3, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<timestamp:bigint,transaction_id:bigint>\nRun Code Online (Sandbox Code Playgroud)\n\n参考:
\n\n\n| 归档时间: |
|
| 查看次数: |
2040 次 |
| 最近记录: |