标签: apache-spark-2.3

什么时候在 Spark 数据帧上使用 persist() 性能不实用?

在致力于提高代码性能时,因为我有许多作业失败(中止),persist()所以每当我需要在许多其他操作中使用相同的数据帧时,我都会考虑使用 Spark Dataframe 上的函数。在执行此操作并跟踪 Spark 应用程序 UI 中的作业、阶段时,我觉得这样做并不总是最佳,这取决于分区数量和数据大小。我不确定直到我因为坚持阶段失败而中止工作。

我想知道在数据帧上执行许多操作时使用的最佳实践是否始终有效?persist() 如果不是,什么时候不是?怎么判断?

更具体地说,我将展示我的代码和中止作业的详细信息:

#create a dataframe from another one df_transf_1 on which I made a lot of transformations but no actions
spark_df = df_transf_1.select('user_id', 'product_id').dropDuplicates()
#persist
spark_df.persist()
products_df = spark_df[['product_id']].distinct()
df_products_indexed = products_df.rdd.map(lambda r: r.product_id).zipWithIndex().toDF(['product_id', 'product_index'])
Run Code Online (Sandbox Code Playgroud)

你可能会问我为什么坚持spark_dfproducts_df这是因为我将像 with和 in 一样多次使用它joins(例如:spark_df = spark_df.join(df_products_indexed,"product_id")

工作阶段

第三阶段失败原因详情:

由于阶段失败而中止作业:阶段 3.0 中的任务 40458 失败了 4 次,最近一次失败:阶段 3.0 中丢失任务 40458.3(TID 60778,xx.xx.yyyy.com,执行器 91):ExecutorLostFailure(执行器 91 因一而退出)正在运行的任务)原因:从站丢失驱动程序堆栈跟踪:

输入数据的大小(4 TB …

python-2.7 apache-spark pyspark apache-spark-2.3

9
推荐指数
2
解决办法
6815
查看次数

使用 Spark 和 java 编写 CSV 文件 - 处理空值和引号

初始数据在 Dataset<Row> 中,我正在尝试写入管道分隔文件,我希望每个非空单元格和非空值都放在引号中。空值或空值不应包含引号

result.coalesce(1).write()
            .option("delimiter", "|")
            .option("header", "true")
            .option("nullValue", "")
            .option("quoteAll", "false")
            .csv(Location);
Run Code Online (Sandbox Code Playgroud)

预期输出:

"London"||"UK"
"Delhi"|"India"
"Moscow"|"Russia"
Run Code Online (Sandbox Code Playgroud)

电流输出:

London||UK
Delhi|India
Moscow|Russia
Run Code Online (Sandbox Code Playgroud)

如果我将“quoteAll”更改为“true”,我得到的输出是:

"London"|""|"UK"
"Delhi"|"India"
"Moscow"|"Russia"
Run Code Online (Sandbox Code Playgroud)

Spark 版本是 2.3,java 版本是 java 8

java csv java-8 apache-spark apache-spark-2.3

7
推荐指数
1
解决办法
1208
查看次数

将pyspark数据帧转换为pandas数据帧

我有 pyspark 数据框,其维度为 (28002528,21) 并尝试使用以下代码行将其转换为 Pandas 数据框:

pd_df=spark_df.toPandas()
Run Code Online (Sandbox Code Playgroud)

我收到此错误:

第一部分

Py4JJavaError: An error occurred while calling o170.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 39.0 failed 1 times, most recent failure: Lost task 3.0 in stage 39.0 (TID 89, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:3236)
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
    at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:220)
    at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:173)
    at java.io.DataOutputStream.write(DataOutputStream.java:107)
    at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:552)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:256)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at …
Run Code Online (Sandbox Code Playgroud)

pandas pyspark apache-spark-2.3

5
推荐指数
1
解决办法
5791
查看次数

Pyspark 自加入错误“缺少已解析的属性”

在进行 pyspark 数据帧自联接时,我收到一条错误消息:

Py4JJavaError: An error occurred while calling o1595.join.
: org.apache.spark.sql.AnalysisException: Resolved attribute(s) un_val#5997 missing from day#290,item_listed#281,filename#286 in operator !Project [...]. Attribute(s) with the same name appear in the operation: un_val. Please check if the right attribute(s) are used.;;
Run Code Online (Sandbox Code Playgroud)

这是一个简单的数据帧自连接,如下所示,工作正常,但在对数据帧进行了几次操作(例如添加列或与其他数据帧连接)后,会引发上述错误。

df.join(df,on='item_listed')
Run Code Online (Sandbox Code Playgroud)

使用像波纹管这样的数据帧别名也不起作用,并且会引发相同的错误消息:

df.alias('A').join(df.alias('B'), col('A.my_id') == col('B.my_id'))
Run Code Online (Sandbox Code Playgroud)

python python-3.x pyspark apache-spark-2.3

2
推荐指数
1
解决办法
3486
查看次数