Chr*_*ong 3 apache-spark pyspark aws-glue
我正在使用mapDynamicFrame的方法(或等效的Map.apply方法)。我注意到传递给这些函数的函数中的任何错误都会被默默忽略并导致返回的 DynamicFrame 为空。
假设我有一个这样的工作脚本:
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
glueContext = GlueContext(SparkContext.getOrCreate())
dyF = glueContext.create_dynamic_frame.from_catalog(database="radixdemo", table_name="census_csv")
def my_mapper(rec):
import logging
logging.error("[RADIX] An error-log from in the mapper!")
print "[RADIX] from in the mapper!"
raise Exception("[RADIX] A bug!")
dyF = dyF.map(my_mapper, 'my_mapper')
print "Count: ", dyF.count()
dyF.printSchema()
dyF.toDF().show()
Run Code Online (Sandbox Code Playgroud)
如果我在我的 Glue Dev Endpoint 中运行这个脚本gluepython,我会得到如下输出:
[glue@ip-172-31-83-196 ~]$ gluepython gluejob.py
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/share/aws/glue/etl/jars/glue-assembly.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/spark/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
18/05/23 20:56:46 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
Count: 0
root
++
||
++
++
Run Code Online (Sandbox Code Playgroud)
关于此输出的注意事项:
print声明或logging.error声明的结果。my_mapper引发了异常。printSchema调用显示生成的 DynamicFrame 上没有架构元数据show方法也不产生任何输出,表明所有行都消失了。同样,当我将此脚本保存为 AWS Glue 控制台中的作业并运行它时,该作业并未指示发生任何错误——作业状态为“成功”。值得注意的是,我确实得到了作业日志的print语句和logging.error调用输出,但仅在常规“日志”中,而不是在“错误日志”中。
我想要的是能够指示我的工作失败,并且能够轻松找到这些错误日志。最重要的是只是表明它失败了。
有没有办法在映射函数中记录错误,以便 Glue 将其作为“错误日志”提取(并将其放在单独的 AWS CloudWatch Logs 路径中)?如果发生这种情况,它是否会自动将整个 Job 标记为 Failing?或者是否有其他方法可以从映射函数中显式地使作业失败?
(我的计划,如果有一种方法可以记录错误和/或将作业标记为失败,则是创建一个装饰器或其他实用程序函数,该函数将自动捕获我映射函数中的异常并确保它们被记录并标记为失败)。
我发现使 Glue 作业显示为“失败”的唯一方法是从主脚本中引发异常(不在映射器或过滤器函数内,因为它们似乎被转出到数据处理单元)。
幸运的是,有是一种方法来检测是否发生内部地图或滤波器函数的一个例外:使用DynamicFrame.stageErrorsCount()方法。它将返回一个数字,指示在运行最近的转换时引发了多少异常。
所以解决所有问题的正确方法:
try/except在您编写的每个函数中放置语句。stageErrorsCount()方法并检查它是否大于 0。如果您想中止作业,只需引发异常。例如:
import logging
def log_errors(inner):
def wrapper(*args, **kwargs):
try:
inner(*args, **kwargs)
except Exception as e:
logging.exception('Error in function: {}'.format(inner))
raise
return wrapper
@log_errors
def foo(record):
1 / 0
Run Code Online (Sandbox Code Playgroud)
然后,在您的工作中,您会执行以下操作:
df = df.map(foo, "foo")
if df.stageErrorsCount() > 0:
raise Exception("Error in job! See the log!")
Run Code Online (Sandbox Code Playgroud)
请注意,由于某种原因,即使logging.exception从映射器函数内部调用仍然不会将日志写入AWS CloudWatch Logs 中的错误日志。它被写入常规成功日志。但是,使用这种技术,您至少会看到作业失败并能够在日志中找到信息。另一个警告:Dev Endpoints 似乎没有显示来自映射器或过滤器功能的任何日志。