Omr*_*374 3 apache-spark pyspark apache-arrow
我正在尝试从 pandas_udf 返回特定结构。它在一个集群上工作,但在另一个集群上失败。我尝试在组上运行 udf,这要求返回类型为数据框。
from pyspark.sql.functions import pandas_udf
import pandas as pd
import numpy as np
from pyspark.sql.types import *
schema = StructType([
StructField("Distance", FloatType()),
StructField("CarId", IntegerType())
])
def haversine(lon1, lat1, lon2, lat2):
#Calculate distance, return scalar
return 3.5 # Removed logic to facilitate reading
@pandas_udf(schema)
def totalDistance(oneCar):
dist = haversine(oneCar.Longtitude.shift(1),
oneCar.Latitude.shift(1),
oneCar.loc[1:, 'Longitude'],
oneCar.loc[1:, 'Latitude'])
return pd.DataFrame({"CarId":oneCar['CarId'].iloc[0],"Distance":np.sum(dist)},index = [0])
## Calculate the overall distance made by each car
distancePerCar= df.groupBy('CarId').apply(totalDistance)
Run Code Online (Sandbox Code Playgroud)
这是我得到的例外:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py in returnType(self)
114 try:
--> 115 to_arrow_type(self._returnType_placeholder)
116 except TypeError:
C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\types.py in to_arrow_type(dt)
1641 else:
-> 1642 raise TypeError("Unsupported type in conversion to Arrow: " + str(dt))
1643 return arrow_type
TypeError: Unsupported type in conversion to Arrow: StructType(List(StructField(CarId,IntegerType,true),StructField(Distance,FloatType,true)))
During handling of the above exception, another exception occurred:
NotImplementedError Traceback (most recent call last)
<ipython-input-35-4f2194cfb998> in <module>()
18 km = 6367 * c
19 return km
---> 20 @pandas_udf("CarId: int, Distance: float")
21 def totalDistance(oneUser):
22 dist = haversine(oneUser.Longtitude.shift(1), oneUser.Latitude.shift(1),
C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py in _create_udf(f, returnType, evalType)
62 udf_obj = UserDefinedFunction(
63 f, returnType=returnType, name=None, evalType=evalType, deterministic=True)
---> 64 return udf_obj._wrapped()
65
66
C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py in _wrapped(self)
184
185 wrapper.func = self.func
--> 186 wrapper.returnType = self.returnType
187 wrapper.evalType = self.evalType
188 wrapper.deterministic = self.deterministic
C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py in returnType(self)
117 raise NotImplementedError(
118 "Invalid returnType with scalar Pandas UDFs: %s is "
--> 119 "not supported" % str(self._returnType_placeholder))
120 elif self.evalType == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
121 if isinstance(self._returnType_placeholder, StructType):
NotImplementedError: Invalid returnType with scalar Pandas UDFs: StructType(List(StructField(CarId,IntegerType,true),StructField(Distance,FloatType,true))) is not supported
Run Code Online (Sandbox Code Playgroud)
我也尝试将架构更改为
@pandas_udf("<CarId:int,Distance:float>")
Run Code Online (Sandbox Code Playgroud)
和
@pandas_udf("CarId:int,Distance:float")
Run Code Online (Sandbox Code Playgroud)
但得到同样的例外。我怀疑这与我的 pyarrow 版本有关,它与我的 pyspark 版本不兼容。
任何帮助,将不胜感激。谢谢!
正如错误消息中所报告的(“Invalid returnType with scalar Pandas UDFs”),您正在尝试创建一个 SCALAR 向量化的 Pandas UDF,但使用 StructType 架构并返回一个 Pandas DataFrame。
您应该将您的函数声明为 GROUPED MAP pandas UDF,即:
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
Run Code Online (Sandbox Code Playgroud)
pyspark 文档中解释了标量和分组矢量化 UDF 之间的区别:http ://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf 。
标量 UDF 定义了一个转换:一个或多个 pandas.Series -> A pandas.Series。returnType 应该是原始数据类型,例如 DoubleType()。返回的pandas.Series 的长度必须与输入pandas.Series 的长度相同。
总而言之,标量 Pandas UDF 一次处理一列(pandas 系列),比一次处理一个行元素的传统 UDF 具有更好的性能。请注意,性能改进是由于使用 PyArrow 进行高效的 Python 序列化。
分组映射 UDF 定义转换: A pandas.DataFrame -> A pandas.DataFrame returnType 应该是 StructType 描述返回的 pandas.DataFrame 的模式。返回的 pandas.DataFrame 的长度可以是任意的,并且必须对列进行索引,以便它们的位置与模式中的相应字段匹配。
分组的 Pandas UDF 一次处理多行和多列(使用 Pandas DataFrame,不要与 Spark DataFrame 混淆),并且对于多变量操作非常有用和高效(尤其是在使用本地 python 数值分析和机器学习库时,如numpy、scipy、scikit-learn 等)。在这种情况下,输出是具有多列的单行 DataFrame。
请注意,我没有检查代码的内部逻辑,只检查方法论。
| 归档时间: |
|
| 查看次数: |
6138 次 |
| 最近记录: |