在致力于提高代码性能时,因为我有许多作业失败(中止),persist()
所以每当我需要在许多其他操作中使用相同的数据帧时,我都会考虑使用 Spark Dataframe 上的函数。在执行此操作并跟踪 Spark 应用程序 UI 中的作业、阶段时,我觉得这样做并不总是最佳,这取决于分区数量和数据大小。我不确定直到我因为坚持阶段失败而中止工作。
我想知道在数据帧上执行许多操作时使用的最佳实践是否始终有效?persist()
如果不是,什么时候不是?怎么判断?
更具体地说,我将展示我的代码和中止作业的详细信息:
#create a dataframe from another one df_transf_1 on which I made a lot of transformations but no actions
spark_df = df_transf_1.select('user_id', 'product_id').dropDuplicates()
#persist
spark_df.persist()
products_df = spark_df[['product_id']].distinct()
df_products_indexed = products_df.rdd.map(lambda r: r.product_id).zipWithIndex().toDF(['product_id', 'product_index'])
Run Code Online (Sandbox Code Playgroud)
你可能会问我为什么坚持spark_df
?products_df
这是因为我将像 with和 in 一样多次使用它joins
(例如:spark_df = spark_df.join(df_products_indexed,"product_id")
第三阶段失败原因详情:
由于阶段失败而中止作业:阶段 3.0 中的任务 40458 失败了 4 次,最近一次失败:阶段 3.0 中丢失任务 40458.3(TID 60778,xx.xx.yyyy.com,执行器 91):ExecutorLostFailure(执行器 91 因一而退出)正在运行的任务)原因:从站丢失驱动程序堆栈跟踪:
输入数据的大小(4 TB …
初始数据在 Dataset<Row> 中,我正在尝试写入管道分隔文件,我希望每个非空单元格和非空值都放在引号中。空值或空值不应包含引号
result.coalesce(1).write()
.option("delimiter", "|")
.option("header", "true")
.option("nullValue", "")
.option("quoteAll", "false")
.csv(Location);
Run Code Online (Sandbox Code Playgroud)
预期输出:
"London"||"UK"
"Delhi"|"India"
"Moscow"|"Russia"
Run Code Online (Sandbox Code Playgroud)
电流输出:
London||UK
Delhi|India
Moscow|Russia
Run Code Online (Sandbox Code Playgroud)
如果我将“quoteAll”更改为“true”,我得到的输出是:
"London"|""|"UK"
"Delhi"|"India"
"Moscow"|"Russia"
Run Code Online (Sandbox Code Playgroud)
Spark 版本是 2.3,java 版本是 java 8
我有 pyspark 数据框,其维度为 (28002528,21) 并尝试使用以下代码行将其转换为 Pandas 数据框:
pd_df=spark_df.toPandas()
Run Code Online (Sandbox Code Playgroud)
我收到此错误:
第一部分
Py4JJavaError: An error occurred while calling o170.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 39.0 failed 1 times, most recent failure: Lost task 3.0 in stage 39.0 (TID 89, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:220)
at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:173)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:552)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:256)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at …
Run Code Online (Sandbox Code Playgroud) 在进行 pyspark 数据帧自联接时,我收到一条错误消息:
Py4JJavaError: An error occurred while calling o1595.join.
: org.apache.spark.sql.AnalysisException: Resolved attribute(s) un_val#5997 missing from day#290,item_listed#281,filename#286 in operator !Project [...]. Attribute(s) with the same name appear in the operation: un_val. Please check if the right attribute(s) are used.;;
Run Code Online (Sandbox Code Playgroud)
这是一个简单的数据帧自连接,如下所示,工作正常,但在对数据帧进行了几次操作(例如添加列或与其他数据帧连接)后,会引发上述错误。
df.join(df,on='item_listed')
Run Code Online (Sandbox Code Playgroud)
使用像波纹管这样的数据帧别名也不起作用,并且会引发相同的错误消息:
df.alias('A').join(df.alias('B'), col('A.my_id') == col('B.my_id'))
Run Code Online (Sandbox Code Playgroud) pyspark ×3
apache-spark ×2
csv ×1
java ×1
java-8 ×1
pandas ×1
python ×1
python-2.7 ×1
python-3.x ×1