Und*_*ood 16 python flatmap apache-spark pyspark
我有一组文件.文件的路径保存在文件中,例如"all_files.txt".使用apache spark,我需要对所有文件进行操作并对结果进行处理.
我想要做的步骤是:
这是我为此写的代码:
def return_contents_from_file (file_name):
return spark.read.text(file_name).rdd.map(lambda r: r[0])
def run_spark():
file_name = 'path_to_file'
spark = SparkSession \
.builder \
.appName("PythonWordCount") \
.getOrCreate()
counts = spark.read.text(file_name).rdd.map(lambda r: r[0]) \ # this line is supposed to return the paths to each file
.flatMap(return_contents_from_file) \ # here i am expecting to club all the contents of all files
.flatMap(do_operation_on_each_line_of_all_files) # here i am expecting do an operation on each line of all files
Run Code Online (Sandbox Code Playgroud)
这是抛出错误:
第323行,在get_return_value中py4j.protocol.Py4JError:调用o25时发生错误.getnewargs.跟踪:py4j.Py4JException:方法getnewargs([])在py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)的py4j.Gateway上不存在.在py4j.commands.CallCommand.exe执行(CallCommand.java:79)py4j.GatewayConnection.run(GatewayConnection.java:214)的py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)上调用(Gateway.java:272) )在java.lang.Thread.run(Thread.java:745)
有人可以告诉我我做错了什么以及我应该如何进一步.提前致谢.
Mar*_*usz 19
不允许在执行程序中使用spark内部flatMap或任何转换(spark会话仅在驱动程序上可用).也无法创建RDD的RDD(请参阅:是否可以在Apache Spark中创建嵌套的RDD?)
但是你可以实现以另一种方式这一转变-读的所有内容all_files.txt到数据帧,使用当地 map使他们dataframes和当地 reduce工会所有,见例如:
>>> filenames = spark.read.text('all_files.txt').collect()
>>> dataframes = map(lambda r: spark.read.text(r[0]), filenames)
>>> all_lines_df = reduce(lambda df1, df2: df1.unionAll(df2), dataframes)
Run Code Online (Sandbox Code Playgroud)
我今天遇到这个问题,终于弄清楚我引用了一个spark.DataFrame对象pandas_udf,导致这个错误。
结论:
你不能在and 中使用sparkSessionobject 、spark.DataFrameobject 或其他 Spark 分布式对象,因为它们是 unpickled 。 udfpandas_udf
如果您遇到此错误并且您正在使用udf,请仔细检查,一定是相关问题。
| 归档时间: |
|
| 查看次数: |
10989 次 |
| 最近记录: |