类中的 Pyspark User-Defined_functions

Chi*_*asu 5 python-3.x pyspark jupyter-notebook azure-databricks

我正在尝试在 python 类中创建一个 Spark-UDF。意思是,类中的方法之一是 UDF。我收到一个名为“ PicklingError:无法序列化对象:TypeError:无法pickle _MovedItems对象”的错误

Environment : Azure Databricks . (DBR version 6.1 Beta) Code execution : In the built in Notebook. Python version : 3.5 Spark version : 2.4.4

I have tried defining the UDF outside of the class in a separate cell, and the UDF works. I do not want to write code like that, I need to follow OOP principles and would like to keep it structured. I have tried everything on Google, did not help. In fact I did not even get the information about the error I am getting. " PicklingError: Could not serialize object: TypeError: can't pickle _MovedItems objects "

class phases():
  def __init__(self, each_mp_pair_df_as_arg, unique_mp_pair_df_as_arg):
    print("Inside the constructor of Class phases ")

    #I need the below 2 variables to be used in my UDF, so i am trying to put 
    them in a class
    self.each_mp_pair_phases_df = each_mp_pair_df_as_arg
    self.unique_mp_pair_phases_df = unique_mp_pair_df_as_arg

  #This is the UDF. 
  def phases_commence(self,each_row):
    print(a)
    return 1

  #This is the function that registers the UDF, 
  def initiate_the_phases_on_the_major_track_segment(self):
    print("Inside the 'initiate_the_phases_on_the_major_track_segment()'")

    #registering the UDF
    self.phases_udf = udf(self.phases_commence,LongType())
    new_df = self.each_mp_pair_phases_df.withColumn("status", self.phases_udf((struct([self.each_mp_pair_phases_df[x] for x in self.each_mp_pair_phases_df.columns]))))
    display(new_df)
Run Code Online (Sandbox Code Playgroud)
#This is a method in a different notebook that creates an object for the above shown class and calls the methods that registers the UDF.
def getting_ready_for_the_phases(each_mp_pair_df_as_arg, unique_mp_pair_df_as_arg):

  phase_obj = phases(each_mp_pair_df_as_arg, unique_mp_pair_df_as_arg)
  phase_obj.initiate_the_phases_on_the_major_track_segment()

Run Code Online (Sandbox Code Playgroud)

The error message is: PicklingError: Could not serialize object: TypeError: can't pickle _MovedItems objects

Lea*_*ple 6

使用@udf 糖稍微简单的语法...

class Phases:
  def __init__(self, df1):
    self.df1 = df1

  #This is the UDF, double-decorated.. 
  @staticmethod
  @udf(returnType=IntegerType())
  def phases_udf(age):
    age += 3
    return age

  #This is the function that registers the UDF 
  def doSomething(self):
    self.df1 = self.df1.withColumn('AgeP2', self.phases_udf(col('Age')))
Run Code Online (Sandbox Code Playgroud)


cro*_*oik 5

您的函数需要是静态的才能将其定义为 udf。我正在寻找一些文档来提供一个很好的解释,但无法真正找到它。

基本上(可能不是 100% 准确;更正值得赞赏)当您定义一个 udf 时,它会被腌制并自动复制到每个执行程序,但是您不能腌制未在顶级定义的类的单个方法(类是顶层的一部分,但不是它的方法)。查看这篇文章,了解除静态方法之外的其他解决方法。

import pyspark.sql.functions as F
import pyspark.sql.types as T


class Phases():
  def __init__(self, df1):
    print("Inside the constructor of Class phases ")

    self.df1 = df1
    self.phases_udf = F.udf(Phases.phases_commence,T.IntegerType())

  #This is the UDF. 
  @staticmethod
  def phases_commence(age):
    age = age +3
    return age

  #This is the function that registers the UDF, 
  def doSomething(self):
    print("Inside the doSomething")
    self.df1 = self.df1.withColumn('AgeP2', self.phases_udf(F.col('Age')))

l =[(1,   10   ,  'F')
,(2 ,   2   ,  'M')
,(2 ,  10  ,   'F')
,(2 ,  3  ,    'F')
,(3 ,  10,     'M')]

columns = ['id',  'Age',  'Gender']

df=spark.createDataFrame(l, columns)

bla = Phases(df)
bla.doSomething()
bla.df1.show()
Run Code Online (Sandbox Code Playgroud)

输出:

Inside the constructor of Class phases 
Inside the 'initiate_the_phases_on_the_major_track_segment()' 
+---+---+------+-----+ 
| id|Age|Gender|AgeP2| 
+---+---+------+-----+ 
|  1| 10|     F|   13| 
|  2|  2|     M|    5| 
|  2| 10|     F|   13| 
|  2|  3|     F|    6| 
|  3| 10|     M|   13| 
+---+---+------+-----+
Run Code Online (Sandbox Code Playgroud)