Chi*_*asu 5 python-3.x pyspark jupyter-notebook azure-databricks
我正在尝试在 python 类中创建一个 Spark-UDF。意思是,类中的方法之一是 UDF。我收到一个名为“ PicklingError:无法序列化对象:TypeError:无法pickle _MovedItems对象”的错误
Environment : Azure Databricks . (DBR version 6.1 Beta) Code execution : In the built in Notebook. Python version : 3.5 Spark version : 2.4.4
I have tried defining the UDF outside of the class in a separate cell, and the UDF works. I do not want to write code like that, I need to follow OOP principles and would like to keep it structured. I have tried everything on Google, did not help. In fact I did not even get the information about the error I am getting. " PicklingError: Could not serialize object: TypeError: can't pickle _MovedItems objects "
class phases():
def __init__(self, each_mp_pair_df_as_arg, unique_mp_pair_df_as_arg):
print("Inside the constructor of Class phases ")
#I need the below 2 variables to be used in my UDF, so i am trying to put
them in a class
self.each_mp_pair_phases_df = each_mp_pair_df_as_arg
self.unique_mp_pair_phases_df = unique_mp_pair_df_as_arg
#This is the UDF.
def phases_commence(self,each_row):
print(a)
return 1
#This is the function that registers the UDF,
def initiate_the_phases_on_the_major_track_segment(self):
print("Inside the 'initiate_the_phases_on_the_major_track_segment()'")
#registering the UDF
self.phases_udf = udf(self.phases_commence,LongType())
new_df = self.each_mp_pair_phases_df.withColumn("status", self.phases_udf((struct([self.each_mp_pair_phases_df[x] for x in self.each_mp_pair_phases_df.columns]))))
display(new_df)
Run Code Online (Sandbox Code Playgroud)
#This is a method in a different notebook that creates an object for the above shown class and calls the methods that registers the UDF.
def getting_ready_for_the_phases(each_mp_pair_df_as_arg, unique_mp_pair_df_as_arg):
phase_obj = phases(each_mp_pair_df_as_arg, unique_mp_pair_df_as_arg)
phase_obj.initiate_the_phases_on_the_major_track_segment()
Run Code Online (Sandbox Code Playgroud)
The error message is: PicklingError: Could not serialize object: TypeError: can't pickle _MovedItems objects
使用@udf 糖稍微简单的语法...
class Phases:
def __init__(self, df1):
self.df1 = df1
#This is the UDF, double-decorated..
@staticmethod
@udf(returnType=IntegerType())
def phases_udf(age):
age += 3
return age
#This is the function that registers the UDF
def doSomething(self):
self.df1 = self.df1.withColumn('AgeP2', self.phases_udf(col('Age')))
Run Code Online (Sandbox Code Playgroud)
您的函数需要是静态的才能将其定义为 udf。我正在寻找一些文档来提供一个很好的解释,但无法真正找到它。
基本上(可能不是 100% 准确;更正值得赞赏)当您定义一个 udf 时,它会被腌制并自动复制到每个执行程序,但是您不能腌制未在顶级定义的类的单个方法(类是顶层的一部分,但不是它的方法)。查看这篇文章,了解除静态方法之外的其他解决方法。
import pyspark.sql.functions as F
import pyspark.sql.types as T
class Phases():
def __init__(self, df1):
print("Inside the constructor of Class phases ")
self.df1 = df1
self.phases_udf = F.udf(Phases.phases_commence,T.IntegerType())
#This is the UDF.
@staticmethod
def phases_commence(age):
age = age +3
return age
#This is the function that registers the UDF,
def doSomething(self):
print("Inside the doSomething")
self.df1 = self.df1.withColumn('AgeP2', self.phases_udf(F.col('Age')))
l =[(1, 10 , 'F')
,(2 , 2 , 'M')
,(2 , 10 , 'F')
,(2 , 3 , 'F')
,(3 , 10, 'M')]
columns = ['id', 'Age', 'Gender']
df=spark.createDataFrame(l, columns)
bla = Phases(df)
bla.doSomething()
bla.df1.show()
Run Code Online (Sandbox Code Playgroud)
输出:
Inside the constructor of Class phases
Inside the 'initiate_the_phases_on_the_major_track_segment()'
+---+---+------+-----+
| id|Age|Gender|AgeP2|
+---+---+------+-----+
| 1| 10| F| 13|
| 2| 2| M| 5|
| 2| 10| F| 13|
| 2| 3| F| 6|
| 3| 10| M| 13|
+---+---+------+-----+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3868 次 |
| 最近记录: |