Mit*_*ril 12 user-defined-functions pandas apache-spark pyspark
我已经测试过,无论是在集群模式还是客户端模式下,logger都print无法在 a 中打印消息pandas_udf。
测试代码:
import sys
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import logging
logger = logging.getLogger('test')
spark = (SparkSession
.builder
.appName('test')
.getOrCreate())
df = spark.createDataFrame(pd.DataFrame({
'y': np.random.randint(1, 10, (20,)),
'ds': np.random.randint(1000, 9999, (20,)),
'store_id' : ['a'] * 10 + ['b'] *7 + ['q']*3,
'product_id' : ['c'] * 5 + ['d'] *12 + ['e']*3,
})
)
@pandas_udf('y int, ds int, store_id string, product_id string', PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
print('#'*100)
logger.info('$'*100)
logger.error('&'*100)
return pd.DataFrame([], columns=['y', 'ds','store_id','product_id'])
df1 = df.groupby(['store_id', 'product_id']).apply(train_predict)
Run Code Online (Sandbox Code Playgroud)
另请注意:
log4jLogger = spark.sparkContext._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger(__name__)
LOGGER.info("#"*50)
Run Code Online (Sandbox Code Playgroud)
您不能在 中使用它pandas_udf,因为此日志超出了 Spark 上下文对象,您无法在 udf 中引用 Spark 会话/上下文。
我知道的唯一方法就是用作Excetion我在下面写的答案。但这很棘手并且有缺点。我想知道是否有任何方法可以在 pandas_udf 中打印消息。
目前,我尝试了 Spark 2.4 中的所有方法。
如果没有日志,就很难调试有问题的 pandas_udf。我知道可以在 pandas_udf 中打印错误消息的唯一可行方法是raise Exception。所以这样调试确实很费时间,但我知道没有更好的方法了。
@pandas_udf('y int, ds int, store_id string, product_id string', PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
print('#'*100)
logger.info('$'*100)
logger.error('&'*100)
raise Exception('@'*100) # The only way I know can print message but would break execution
return pd.DataFrame([], columns=['y', 'ds','store_id','product_id'])
Run Code Online (Sandbox Code Playgroud)
缺点是打印消息后无法保持 Spark 运行。
| 归档时间: |
|
| 查看次数: |
8750 次 |
| 最近记录: |