小编Pra*_*ada的帖子

在PySpark中登录UDF

我正在UDF中调用API,并尝试将输出记录到Logger中,并出现序列化错误。

以下是我的Logger初始化代码:

log4jLogger = spark._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger(__name__)
Run Code Online (Sandbox Code Playgroud)

在我的UDF中,我正在使用

LOGGER.info("Message")
Run Code Online (Sandbox Code Playgroud)

但是我得到了错误

pickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o31.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
Run Code Online (Sandbox Code Playgroud)

注册UDF时

distance_udf = udf(distfunc, DoubleType())
Run Code Online (Sandbox Code Playgroud)

您能否更正我在日志记录中应该修改的内容,以及是否要登录到单独的日志文件中,该如何纠正我。

谢谢

python logging apache-spark pyspark

5
推荐指数
0
解决办法
696
查看次数

比较两列以在 Spark DataFrame 中创建新列

我有一个包含 2 列的 Spark DataFrame,我正在尝试使用其他两列和 when else 操作创建一个新列。

df_newcol = df.withColumn("Flag", when(col("a") <= lit(ratio1) | col("b") <= lit(ratio1), 1).otherwise(2))
Run Code Online (Sandbox Code Playgroud)

但这会引发错误

ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.
Run Code Online (Sandbox Code Playgroud)

我之前在一个列中使用过 when 和 else ,而在多列中使用它时,我们是否必须以不同的方式编写逻辑。

谢谢。

python apache-spark-sql pyspark

3
推荐指数
1
解决办法
1万
查看次数