如何消除“缺少转换属性错误”?

ar_*_*m18 3 transform apache-spark-sql pyspark palantir-foundry

我正在使用 pyspark 在 palantir 中编写代码,但遇到了这个错误,我无法弄清楚。

错误是:

A TransformInput object does not have an attribute withColumn. 
Please check the spelling and/or the datatype of the object.
Run Code Online (Sandbox Code Playgroud)

我的代码供你参考

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.functions import when
from transforms.api import configure, transform, Input, Output

@transform(
    result = Output('Output_data_file_location'),
    first_input=Input('Input_file1'),
    second_input= Input('Input_file2'),
)
def function_temp(first_input, second_input, result):
    from pyspark.sql.functions import monotonically_increasing_id
    res = ncbs.withColumn("id", monotonically_increasing_id())

    # Recode type
    res = res.withColumn("old_col_type", F.when(
        (F.col("col_type") == 'left') | (F.col("col_type") == 'right'), 'turn'
        ).when(
            (F.col("col_type") == 'up') | (F.col("col_type") == 'down'), 'straight'
        ))


    res = res.withColumnRenamed("old_col_type","t_old_col_type") \
    .withColumnRenamed("old_col2_type","t_old_col2_type")


    res = res.filter((res.col_type== 'straight') 


    res = res.join(second_input,  #eqNullSafe is like an equal sign but includes null in join
                (res.col1.eqNullSafe(second_input.pre_col1)) & 
                (res.col2.eqNullSafe(second_input.pre_col2)), 
                how='left')\
                    .drop(*["pre_col1", "pre_col2"]).withColumnRenamed("temp_result", "final_answer")


    result.write_dataframe(res)
    
Run Code Online (Sandbox Code Playgroud)

谁能帮我解决这个错误。提前致谢

Rob*_*dey 8

您收到的错误代码很好地解释了这一点,您正在调用的.withColumn()对象不是常规 Spark Dataframe,而是一个TransformInput对象。您需要调用该.dataframe()方法来访问 Dataframe。

供参考的文档。

此外,monotonically_increasing_id根据文档,您应该将 移至文件顶部,因为 Foundrys 转换逻辑级别版本控制仅在模块级别发生导入时才起作用。