我在一个独立的Spark Cluster中有三个slave.每个从站有48GB的RAM.当我为执行程序分配超过31GB(例如32GB或更多)的RAM时:
.config("spark.executor.memory", "44g")
Run Code Online (Sandbox Code Playgroud)
在连接两个大型Dataframe期间,执行程序终止时没有太多信息.Slave驱动程序的输出消息显示"缺少shuffle的输出位置":
17/09/21 12:34:18 INFO StandaloneSchedulerBackend: Granted executor ID app-20170921123240-0000/3 on hostPort XXX.XXX.XXX.92:33705 with 6 cores, 44.0 GB RAM
17/09/21 12:34:18 WARN TaskSetManager: Lost task 14.0 in stage 7.0 (TID 124, XXX.XXX.XXX.92, executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
17/09/21 12:34:18 WARN TaskSetManager: Lost task 5.0 in stage 7.0 (TID 115, …Run Code Online (Sandbox Code Playgroud) 我有一个与此类似的问题,但由collect_list操作的列数由名称列表给出。例如:
scala> w.show
+---+-----+----+-----+
|iid|event|date|place|
+---+-----+----+-----+
| A| D1| T0| P1|
| A| D0| T1| P2|
| B| Y1| T0| P3|
| B| Y2| T2| P3|
| C| H1| T0| P5|
| C| H0| T9| P5|
| B| Y0| T1| P2|
| B| H1| T3| P6|
| D| H1| T2| P4|
+---+-----+----+-----+
scala> val combList = List("event", "date", "place")
combList: List[String] = List(event, date, place)
scala> val v = w.groupBy("iid").agg(collect_list(combList(0)), collect_list(combList(1)), collect_list(combList(2)))
v: org.apache.spark.sql.DataFrame = [iid: string, collect_list(event): …Run Code Online (Sandbox Code Playgroud) 如果我有一个包含一列 Array[String] 的数据框:
scala> y.show
+---+----------+
|uid|event_comb|
+---+----------+
| c| [xx, zz]|
| b| [xx, xx]|
| b| [xx, yy]|
| b| [xx, zz]|
| b| [xx, yy]|
| b| [xx, zz]|
| b| [yy, zz]|
| a| [xx, yy]|
+---+----------+
Run Code Online (Sandbox Code Playgroud)
如何将列拆分"event_comb"为两列(例如"event1"和"event2")?
apache-spark ×2
aggregate ×1
arrays ×1
fatal-error ×1
group-by ×1
join ×1
memory ×1
split ×1
string ×1