小编Col*_*ham的帖子

如何在 PySpark 中创建 merge_asof 功能?

A有许多列带有日期列,表B有一个日期时间和一个值。两个表中的数据都是零星生成的,没有固定的时间间隔。桌子A很小,桌子B很大。

我需要加入BA的条件下,给定元素aA.datetime对应

B[B['datetime'] <= a]]['datetime'].max()
Run Code Online (Sandbox Code Playgroud)

有几种方法可以做到这一点,但我想要最有效的方法。

选项1

将小数据集广播为 Pandas DataFrame。设置一个 Spark UDF,使用merge_asof.

选项 2

使用 Spark SQL 的广播连接功能:在以下条件下设置 theta 连接

B['datetime'] <= A['datetime']
Run Code Online (Sandbox Code Playgroud)

然后消除所有多余的行。

选项 B 看起来很糟糕……但是请告诉我第一种方法是否有效或者是否有另一种方法。

编辑:这是示例输入和预期输出:

A =
+---------+----------+
| Column1 | Datetime |
+---------+----------+
|    A    |2019-02-03|
|    B    |2019-03-14|
+---------+----------+

B =
+---------+----------+
|   Key   | Datetime |
+---------+----------+
|    0    |2019-01-01|
|    1    |2019-01-15|
|    2 …
Run Code Online (Sandbox Code Playgroud)

python pandas pyspark

11
推荐指数
1
解决办法
1839
查看次数

如何使用具有不受支持类型的 Spark 读取镶木地板?

我想从一个包含文件拼花使用PySpark提取数据UINT64目前映射到列typeNotSupported()星火。我不需要这些列,所以我希望我可以使用以下命令使用谓词下推拉出其他列:

spark.read.parquet('path/to/dir/').select('legalcol1', 'legalcol2')
Run Code Online (Sandbox Code Playgroud)

但是,我仍然遇到以下错误。

An error was encountered:
An error occurred while calling o86.parquet.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, 
most recent failure: Lost task 0.3 in stage 0.0 (TID 3, ..., executor 1):
org.apache.spark.sql.AnalysisException: Parquet type not supported: INT64 (UINT_64);
Run Code Online (Sandbox Code Playgroud)

有没有办法在不抛出上述错误的情况下摄取这些数据?

apache-spark pyspark

5
推荐指数
1
解决办法
813
查看次数

标签 统计

pyspark ×2

apache-spark ×1

pandas ×1

python ×1