为什么这个PySpark加入会失败?

Mar*_*kus 2 apache-spark apache-spark-sql pyspark pyspark-sql

在下面的例子中,我误解了PySpark的性能.

我有几个DataFrame,因此我加入了它们.

print"users_data"
print users_data.show()
print"calc"
print calc.show()
print"users_cat_data"
print users_cat_data.show()

data1 = calc.join(users_data, ['category_pk','item_pk'], 'leftouter')
print "DATA1"
print data1.show()
data2 = data1.join(users_cat_data, ['category_pk'], 'leftouter')
print "DATA2"
print data2.show()
data3 = data2.join(category_data, ['category_pk'], 'leftouter')
print "DATA3"
print data3.show()
data4 = data3.join(clicks_data, ['category_pk','item_pk'], 'leftouter')
print "DATA4"
print data4.show()

data4.write.parquet(output + '/test.parquet', mode="overwrite")
Run Code Online (Sandbox Code Playgroud)

我希望leftouter加入会从右侧DataFrame返回带有匹配项(如果有)的左侧DataFrame.

Soma样本输出:

users_data
+--------------+----------+-------------------------+
|   category_pk|   item_pk|             unique_users|
+--------------+----------+-------------------------+
|           321|       460|                        1|
|           730|       740|                        2|
|           140|       720|                       10|


users_cat_data
+--------------+-----------------------+
|   category_pk|   unique_users_per_cat|
+--------------+-----------------------+
|           111|                    258|
|           100|                    260|
|           750|                      9|
Run Code Online (Sandbox Code Playgroud)

但是,我观察到了不同的行为.我曾经show()打印过我在连接操作中使用的所有DataFrame的前5行.所有DataFrame都包含数据.但是我收到以下错误:

None
DATA1
Traceback (most recent call last):
  File "mytest.py", line 884, in <module>
    args.field1, args.field2, args.field3)
  File "mytest.py", line 802, in calc
    print data1.show()
  File "/mnt/yarn/usercache/hdfs/appcache/application_1512391881474_5650/container_1512391881474_5650_01_000001/pyspark.zip/pyspark/sql/dataframe.py", line 336, in show
  File "/mnt/yarn/usercache/hdfs/appcache/application_1512391881474_5650/container_1512391881474_5650_01_000001/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/mnt/yarn/usercache/hdfs/appcache/application_1512391881474_5650/container_1512391881474_5650_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/mnt/yarn/usercache/hdfs/appcache/application_1512391881474_5650/container_1512391881474_5650_01_000001/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o802.showString.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
        at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:123)
        at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:248)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2366)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:245)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 

Caused by: org.apache.spark.SparkException: Task not serializable
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
        at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:794)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:793)
Run Code Online (Sandbox Code Playgroud)

我不明白为什么我在线获得任务序列化错误print data1.show().用于创建的DataFrame ata1不为空.此外,show()在这行代码上方成功使用了2行.

有时它在最后一行失败,data4.write.parquet(output + '/test.parquet', mode="overwrite")当我删除它时,它运行良好.但现在它甚至更早就失败了data1.show().

如何解决这个问题呢.任何帮助将非常感激.

Jac*_*ski 5

认为最顶层的原因org.apache.spark.SparkException: Exception thrown in awaitResult是,在请求BroadcastExchangeExec物理操作员广播关系(也就是表格)时,它只是超时(在默认的5分钟等待直到完成之后).

这是异常含义的低级背景.

现在,你可能会问自己,为什么一开始就会发生这种情况?

设置spark.sql.broadcastTimeout-1完全禁用超时(这将导致线程等待广播无限期完成)或将其增加到大约10分钟.

您还可以禁用通过设置广播台spark.sql.autoBroadcastJoinThreshold-1.

然而,这只会解决您的环境中发生的更严重的问题.

我的猜测是你的YARN集群(猜测/mnt/yarn/usercache/hdfs/appcache/application_1512391881474_5650/container_1512391881474_5650_01_000001)对资源的影响很小,网络也可能很迟钝.

总而言之,我的猜测是您的查询中的某些表低于默认的10MB,导致Spark SQL优化器选择广播(通过其他方式在执行程序上分发数据集).

认为集群中有一些更严重的问题,你面临一些临时问题,直到...管理员修复了YARN集群.提交PySpark应用程序时群集是否会承受更多负载?

我不明白为什么我得到任务序列化错误

认为你可以简单地忽略它作为早期问题的副作用,因为PySpark是如何通过套接字在两个进程(即Python和JVM)之间进行通信的.

  • 实际上,当集群处于更多负载时,会发生此错误,并且我有几个并行的Spark作业在不同的队列中运行.非常感谢您的帮助. (2认同)