Kev*_* Xu 7 python apache-spark pyspark
我使用以下代码遇到 pandas_udf 错误。该代码是根据另一列创建数据类型的列。相同的代码对于正常的较慢的 udf 工作得很好(已注释掉)。
基本上任何比“字符串”+数据更复杂的东西都会返回错误。
# from pyspark.sql.functions import udf
import pyspark.sql.types
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf(returnType=pyspark.sql.types.StringType(), functionType=PandasUDFType.SCALAR)
def my_transform (data) -> bytes:
return_val = str(type(data))
return return_val
rawdata_df = process_fails.toDF()
# decode_df = rawdata_df.withColumn('new_col', udf_decode(udf_unzip(udf_b64decode(udf_bytes(rawdata_df.rawData)))))
decode_df = rawdata_df.withColumn('new_col', my_transform(rawdata_df.rawData))
decode_df.show()
Run Code Online (Sandbox Code Playgroud)
我收到以下错误:
An error occurred while calling o887.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 23.0 failed 4 times, most recent failure: Lost task 0.3 in stage 23.0 (TID 70, ip-10-213-56-185.ap-southeast-2.compute.internal, executor 10): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/mnt/yarn/usercache/livy/appcache/application_1574912148721_0001/container_1574912148721_0001_01_000020/pyspark.zip/pyspark/worker.py", line 377, in main
process()
File "/mnt/yarn/usercache/livy/appcache/application_1574912148721_0001/container_1574912148721_0001_01_000020/pyspark.zip/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/mnt/yarn/usercache/livy/appcache/application_1574912148721_0001/container_1574912148721_0001_01_000020/pyspark.zip/pyspark/serializers.py", line 286, in dump_stream
for series in iterator:
File "<string>", line 1, in <lambda>
File "/mnt/yarn/usercache/livy/appcache/application_1574912148721_0001/container_1574912148721_0001_01_000020/pyspark.zip/pyspark/worker.py", line 101, in <lambda>
return lambda *a: (verify_result_length(*a), arrow_return_type)
File "/mnt/yarn/usercache/livy/appcache/application_1574912148721_0001/container_1574912148721_0001_01_000020/pyspark.zip/pyspark/worker.py", line 98, in verify_result_length
"expected %d, got %d" % (len(a[0]), len(result)))
RuntimeError: Result vector from pandas_udf was not the required length: expected 12, got 35
Run Code Online (Sandbox Code Playgroud)
这也给出了一个错误:
import pandas as pd
import numpy as np
from pyspark.sql.functions import pandas_udf, PandasUDFType, udf
df = pd.DataFrame({'x': ["1","2","3"], 'y':[1.0,2.0,3.0]})
sp_df = spark.createDataFrame(df)
@pandas_udf('long', PandasUDFType.SCALAR)
def pandas_plus_one(v):
return len(v)
sp_df.withColumn('v2', pandas_plus_one(sp_df.x)).show()
Run Code Online (Sandbox Code Playgroud)
错误信息是:
TypeError: Return type of the user-defined function should be Pandas.Series, but is <class 'int'>
Run Code Online (Sandbox Code Playgroud)
ayp*_*lam 13
pandas_udfPandasUDFType.Scalar 类型的 s 需要pd.Series输入和pd.Series返回。这就是TypeError返回 的原因 - 该函数pandas_plus_one返回一个int而不是pd.Series使用第二个值示例,鉴于数据帧的列x实际上是UDF 的输入
v = pd.Series(["1", "2", "3"])
print(v)
# 0 1
# 1 2
# 2 3
# dtype: object
Run Code Online (Sandbox Code Playgroud)
如果您想要该系列中每个项目的长度,最简单的方法是绘制它。函数定义(为了清晰起见,带有类型提示)应该看起来更接近:
@pandas_udf('long', PandasUDFType.SCALAR)
def pandas_plus_one(v: pd.Series) -> pd.Series:
return v.map(lambda x: len(x))
Run Code Online (Sandbox Code Playgroud)
您可以将相同的概念(使用 amap确保pandas_udf返回 a 的pd.Series长度相同)应用于原始问题,它应该可以解决您的问题。
| 归档时间: |
|
| 查看次数: |
5334 次 |
| 最近记录: |