如何解决pyspark中的pickle错误?

Sot*_*her 1 python unicode dictionary apache-spark pyspark

我正在迭代文件以收集有关字典中的列和行中的值的信息.我有以下代码在本地工作:

def search_nulls(file_name):
    separator = ','
    nulls_dict = {}
    fp = open(file_name,'r')
    null_cols = {}
    lines = fp.readlines()

    for n,line in enumerate(lines):
        line = line.split(separator)
        for m,data in enumerate(line):
            data = data.strip('\n').strip('\r')
            if str(m) not in null_cols:
                null_cols[str(m)] = defaultdict(lambda: 0)
            if len(data) <= 4:
                null_cols[str(m)][str(data)] = null_cols[str(m)][str(data)] + 1

    return null_cols


files_to_process = ['tempfile.csv']
results = map(lambda file: search_nulls(file), files_to_process)
Run Code Online (Sandbox Code Playgroud)

上面的代码工作正常没有火花.我评论上面的最后两行,我尝试使用spark,因为这是需要运行分布式的原型:

os.environ['SPARK_HOME'] = <path_to_spark_folder>
conf = SparkConf().setAppName("search_files").setMaster('local')

sc = SparkContext(conf=conf)

objects = sc.parallelize(files_to_process)
resulting_object = \
    objects.map(lambda file_object: find_nulls(file_object))

result = resulting_object.collect()
Run Code Online (Sandbox Code Playgroud)

但是,当使用spark时,会导致以下错误:

File "<path-to-spark>/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "<path-to-spark>/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "<path-to-spark>/python/lib/pyspark.zip/pyspark/serializers.py", line 267, in dump_stream
    bytes = self.serializer.dumps(vs)
  File "<path-to-spark>/python/lib/pyspark.zip/pyspark/serializers.py", line 415, in dumps
    return pickle.dumps(obj, protocol)
TypeError: expected string or Unicode object, NoneType found?
Run Code Online (Sandbox Code Playgroud)

我一直无法找到任何明显的原因,为什么这会失败,因为它在本地完美运行,而且我不在工作节点之间共享任何文件.事实上,我只是在本地机器上运行它.

有谁知道为什么这可能失败的一个很好的理由?

zer*_*323 6

您的问题的来源是以下行:

null_cols[str(m)] = defaultdict(lambda: 0)
Run Code Online (Sandbox Code Playgroud)

正如你可以阅读什么可以腌制和未腌制?的部分pickle模块文档:

可以腌制以下类型:

  • ...
  • 在模块的顶层定义的函数(使用def,而不是lambda)
  • 在模块顶层定义的内置函数
  • ...

应该清楚的是lambda: 0,不符合上述标准.为了使它工作,您可以例如用以下代码替换lambda表达式int:

null_cols[str(m)] = defaultdict(int)
Run Code Online (Sandbox Code Playgroud)

我们怎么可能将lambda表达式传递给PySpark中的高阶函数?魔鬼在细节.PySpark根据上下文使用不同的序列化程序.要序列化闭包,包括lambda表达式,它使用cloudpickle支持lambda表达式和嵌套函数的自定义.要处理数据,它使用默认的Python工具.


一些附注:

  • 我不会使用Python file对象来读取数据.它不可移植,不会超出本地文件系统.你可以SparkContex.wholeTextFiles改用.
  • 如果确实要确保关闭连接.使用with语句通常是最好的方法
  • 在分割线之前,您可以安全地剥离换行符