Une*_*ver 7 python user-defined-functions apache-spark pyspark databricks
我正在尝试制作一个 pandas UDF,它接受两列具有整数值的数据,并根据这些值之间的差异返回一个小数数组,其长度等于上述差异。
到目前为止,这是我的尝试,我一直在尝试很多不同的方法,试图让它发挥作用,但这是总体思路
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
import pandas as pd
@pandas_udf(ArrayType(DecimalType()), PandasUDFType.SCALAR)
def zero_pad(x, y):
buffer = []
for i in range(0, (x - y)):
buffer.append(0.0)
return buffer
Run Code Online (Sandbox Code Playgroud)
我的使用方法如下:
df = df.withColumn("zero_list", zero_pad(df.x, df.y))
Run Code Online (Sandbox Code Playgroud)
最终结果是df一个名为“zero_list”的新列,该ArrayType(DecimalType())列看起来[0.0, 0.0, 0.0, ...]长度为(df.x - df.y)。
错误消息非常笼统,几乎不值得发布,只是“作业因阶段失败而中止”,它只能追溯到我执行以下操作的代码部分df.show():
Py4JJavaError Traceback (most recent call last)
<command-103561> in <module>()
---> 33 df.orderBy("z").show(n=1000)
/databricks/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
350 """
351 if isinstance(truncate, bool) and truncate:
--> 352 print(self._jdf.showString(n, 20, vertical))
353 else:
354 print(self._jdf.showString(n, int(truncate), vertical))
/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
Run Code Online (Sandbox Code Playgroud)
如何创建一个pandas_udf返回可变长度数组的数组?
我使用 Databricks 和 Spark 2.3.1 来完成所有这些工作。
Ali*_*lli -1
我不明白为什么你从函数返回 pandas Series 值。它为每个输入返回多行。
>>> import pandas as pd
>>> def zero_pad(x, y):
... buffer = []
... for i in range(0, (x - y)):
... buffer.append(0.0)
... return pd.Series(buffer)
...
>>> zero_pad(5,1)
0 0.0
1 0.0
2 0.0
3 0.0
dtype: float64
Run Code Online (Sandbox Code Playgroud)
因此,您无法添加具有多行结果的列。
另一方面,您不能直接在 withColumn 语句中使用 udf。请参阅下面的我的脚本,我认为结果正是您正在寻找的
>>> from pyspark.sql.functions import udf
>>>
>>> data = sc.parallelize([
... (2,1),
... (8,1),
... (5,2),
... (6,4)])
>>> columns = ['x','y']
>>> df = spark.createDataFrame(data, columns)
>>> df.show()
+---+---+
| x| y|
+---+---+
| 2| 1|
| 8| 1|
| 5| 2|
| 6| 4|
+---+---+
>>> def zero_pad(x, y):
... buffer = []
... for i in range(0, (x - y)):
... buffer.append(0.0)
... return buffer
...
>>> my_udf = udf(zero_pad)
>>> df = df.withColumn("zero_list", my_udf(df.x, df.y))
>>> df.show()
+---+---+--------------------+
| x| y| zero_list|
+---+---+--------------------+
| 2| 1| [0.0]|
| 8| 1|[0.0, 0.0, 0.0, 0...|
| 5| 2| [0.0, 0.0, 0.0]|
| 6| 4| [0.0, 0.0]|
+---+---+--------------------+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
8936 次 |
| 最近记录: |