jar*_*fly 1 replace function dataframe apache-spark pyspark
我定义了一些字典和一个函数:
dict_TEMPERATURE = {(0, 70): 'Low', (70.01, 73.99): 'Normal-Low',(74, 76): 'Normal', (76.01, 80): 'Normal-High', (80.01, 300): 'High'}
...
hierarchy_dict = {'TEMP': dict_TEMPERATURE, 'PRESS': dict_PRESSURE, 'SH_SP': dict_SHAFT_SPEED, 'POI': dict_POI, 'TRIG': dict_TRIGGER}
def function_definition(valor, atributo):
dict_atributo = hierarchy_dict[atributo]
valor_generalizado = None
if isinstance(valor, (int, long, float, complex)):
for key, value in dict_atributo.items():
if(isinstance(key, tuple)):
lista = list(key)
if (valor > key[0] and valor < key[1]):
valor_generalizado = value
else: # if it is not numeric
valor_generalizado = dict_atributo.get(valor)
return valor_generalizado
Run Code Online (Sandbox Code Playgroud)
该函数的主要作用是:检查作为参数传递给“function_definition”函数的值,并根据其字典的引用替换其值。
因此,如果我调用“function_definition(60, 'TEMP')”,它将返回“LOW”。
另一方面,我有一个具有下一个结构的数据框(这是一个示例):
+----+-----+-----+---+----+
|TEMP|SH_SP|PRESS|POI|TRIG|
+----+-----+-----+---+----+
| 0| 1| 2| 0| 0|
| 0| 2| 3| 1| 1|
| 0| 3| 4| 2| 1|
| 0| 4| 5| 3| 1|
| 0| 5| 6| 4| 1|
| 0| 1| 2| 5| 1|
+----+-----+-----+---+----+
Run Code Online (Sandbox Code Playgroud)
我想要做的是根据上面定义的函数替换数据框的一列的值,所以我有下一个代码行:
dataframe_new = dataframe.withColumn(atribute_name, function_definition(dataframe[atribute_name], atribute_name))
Run Code Online (Sandbox Code Playgroud)
但是我在执行它时收到下一条错误消息:
AssertionError: col should be Column
Run Code Online (Sandbox Code Playgroud)
我的代码有什么问题?怎么能这样?
你function_definition(勇武,atributo)返回一个String(valor_generalizado单个)的勇气。
AssertionError: col should be Column意味着您将一个参数传递给WithColumn(colName,col),它不是一个列。所以你必须转换你的数据,以便有Column,例如你可以在下面看到的。
例如数据框(与您的结构相同):
a = [(10.0,1.2),(73.0,4.0)] # like your dataframe, this is only an example
dataframe = spark.createDataFrame(a,["tp", "S"]) # tp and S are random names for these columns
dataframe.show()
+----+---+
| tp| S|
+----+---+
|10.0|1.2|
|73.0|4.0|
+----+---+
Run Code Online (Sandbox Code Playgroud)
正如你在这里看到的
udf创建表示用户定义函数 (UDF) 的列表达式。
解决方案:
from pyspark.sql.functions import udf
attr = 'TEMP'
udf_func = udf(lambda x: function_definition(x,attr),returnType=StringType())
dataframe_new = dataframe.withColumn("newCol",udf_func(dataframe.tp))
dataframe_new.show()
+----+---+----------+
| tp| S| newCol|
+----+---+----------+
|10.0|1.2| Low|
|73.0|4.0|Normal-Low|
+----+---+----------+
Run Code Online (Sandbox Code Playgroud)