Pyspark无效输入异常尝试除错误

ano*_*428 9 python exception-handling amazon-s3 apache-spark pyspark

我试图使用pyspark从s3读取最近4个月的数据并处理数据,但是我收到以下异常.

org.apache.hadoop.mapred.InvalidInputException:输入模式s3:// path_to_clickstream/date = 201508*

在每个月的第一天,由于s3路径中没有条目(单独的作业处理并将数据上传到s3路径并且我的作业在该路径之前运行),作业失败.我想知道是否有办法让我抓住这个异常并允许作业继续处理存在的所有路径?

zer*_*323 11

您可以在加载和捕获后立即尝试触发一个廉价的操作Py4JJavaError:

from py4j.protocol import Py4JJavaError

def try_load(path):
    rdd = sc.textFile(path)
    try:
        rdd.first()
        return rdd
    except Py4JJavaError as e:
        return sc.emptyRDD()

rdd = try_load(s3_path)
if not rdd.isEmpty():
    run_the_rest_of_your_code(rdd)
Run Code Online (Sandbox Code Playgroud)

编辑:

如果要处理多个路径,可以单独处理每个路径并合并结果:

paths = [
    "s3://path_to_inputdir/month1*/",
    "s3://path_to_inputdir/month2*/",
    "s3://path_to_inpu??tdir/month3*/"]

rdds = sc.union([try_load(path) for path in paths])
Run Code Online (Sandbox Code Playgroud)

如果您想要更好的控件,可以列出内容并加载已知文件.

如果这些路径中至少有一个是非空的,那么你应该能够使事情变得更简单,并使用像这样的glob:

sc.textFile("s3://path_to_inputdir/month[1-3]*/")
Run Code Online (Sandbox Code Playgroud)