我正在运行 Pyspark 脚本以将数据帧写入 jupyter Notebook 中的 csv,如下所示:
df.coalesce(1).write.csv('Data1.csv',header = 'true')
Run Code Online (Sandbox Code Playgroud)
运行一个小时后,我收到以下错误。
错误:来自http://.....session 的无效状态代码未激活。
我的配置是这样的:
spark.conf.set("spark.dynamicAllocation.enabled","true")
spark.conf.set("shuffle.service.enabled","true")
spark.conf.set("spark.dynamicAllocation.minExecutors",6)
spark.conf.set("spark.executor.heartbeatInterval","3600s")
spark.conf.set("spark.cores.max", "4")
spark.conf.set("spark.sql.tungsten.enabled", "true")
spark.conf.set("spark.eventLog.enabled", "true")
spark.conf.set("spark.app.id", "Logs")
spark.conf.set("spark.io.compression.codec", "snappy")
spark.conf.set("spark.rdd.compress", "true")
spark.conf.set("spark.executor.instances", "6")
spark.conf.set("spark.executor.memory", '20g')
spark.conf.set("hive.exec.dynamic.partition", "true")
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
spark.conf.set("spark.driver.allowMultipleContexts", "true")
spark.conf.set("spark.master", "yarn")
spark.conf.set("spark.driver.memory", "20G")
spark.conf.set("spark.executor.instances", "32")
spark.conf.set("spark.executor.memory", "32G")
spark.conf.set("spark.driver.maxResultSize", "40G")
spark.conf.set("spark.executor.cores", "5")
Run Code Online (Sandbox Code Playgroud)
我检查了容器节点,错误是:
ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container marked as failed:container_e836_1556653519610_3661867_01_000005 on host: ylpd1205.kmdc.att.com. Exit status: 143. Diagnostics: Container killed on request. Exit code is 143
Run Code Online (Sandbox Code Playgroud)
无法弄清楚问题。
根据输出判断,如果您的应用程序没有以 FAILED 状态结束,这听起来像是 Livy 超时错误:您的应用程序可能比 Livy 会话定义的超时(默认为 1 小时)花费的时间更长,因此即使 Spark应用程序成功,如果应用程序花费的时间超过 Livy 会话的超时时间,您的笔记本将收到此错误。
如果是这种情况,以下是解决方法:
/etc/livy/conf/livy.conf
文件(在集群的主节点中)livy.server.session.timeout
为更高的值,例如 8h(或更大,取决于您的应用)sudo restart livy-server
在集群的 master 中小智 -1
我不太熟悉 pyspark 但在 scala 中解决方案将涉及这样的事情
首先我们需要创建一个用于创建头文件的方法:
def createHeaderFile(headerFilePath: String, colNames: Array[String]) {
//format header file path
val fileName = "dfheader.csv"
val headerFileFullName = "%s/%s".format(headerFilePath, fileName)
//write file to hdfs one line after another
val hadoopConfig = new Configuration()
val fileSystem = FileSystem.get(hadoopConfig)
val output = fileSystem.create(new Path(headerFileFullName))
val writer = new PrintWriter(output)
for (h <- colNames) {
writer.write(h + ",")
}
writer.write("\n")
writer.close()
}
Run Code Online (Sandbox Code Playgroud)
您还需要一个方法来调用hadoop
合并零件文件,该方法将通过以下方法编写df.write
:
def mergeOutputFiles(sourcePaths: String, destLocation: String): Unit = {
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
// in case of array[String] use for loop to iterate over the muliple source paths if not use the code below
// for (sourcePath <- sourcePaths) {
//Get the path under destination where the partitioned files are temporarily stored
val pathText = sourcePaths.split("/")
val destPath = "%s/%s".format(destLocation, pathText.last)
//Merge files into 1
FileUtil.copyMerge(hdfs, new Path(sourcePath), hdfs, new Path(destPath), true, hadoopConfig, null)
// }
//delete the temp partitioned files post merge complete
val tempfilesPath = "%s%s".format(destLocation, tempOutputFolder)
hdfs.delete(new Path(tempfilesPath), true)
}
Run Code Online (Sandbox Code Playgroud)
这是生成输出文件的方法或df.write
将巨大的 DF 写入hadoop
HDFS 的方法:
def generateOutputFiles( processedDf: DataFrame, opPath: String, tempOutputFolder: String,
spark: SparkSession): String = {
import spark.implicits._
val fileName = "%s%sNameofyourCsvFile.csv".format(opPath, tempOutputFolder)
//write as csv to output directory and add file path to array to be sent for merging and create header file
processedDf.write.mode("overwrite").csv(fileName)
createHeaderFile(fileName, processedDf.columns)
//create an array of the partitioned file paths
outputFilePathList = fileName
// you can use array of string or string only depending on if the output needs to get divided in multiple file based on some parameter in that case chagne the signature ot Array[String] as output
// add below code
// outputFilePathList(counter) = fileName
// just use a loop in the above and increment it
//counter += 1
return outputFilePathList
}
Run Code Online (Sandbox Code Playgroud)
对于此处定义的所有方法,您可以如何实现它们:
def processyourlogic( your parameters if any):Dataframe=
{
// your logic to do whatever needs to be done to your data
}
Run Code Online (Sandbox Code Playgroud)
假设上面的方法返回 a dataframe
,以下是将所有内容组合在一起的方法:
val yourbigD f = processyourlogic(your parameters) // returns DF
yourbigDf.cache // caching just in case you need it
val outputPathFinal = " location where you want your file to be saved"
val tempOutputFolderLocation = "temp/"
val partFiles = generateOutputFiles(yourbigDf, outputPathFinal, tempOutputFolderLocation, spark)
mergeOutputFiles(partFiles, outputPathFinal)
Run Code Online (Sandbox Code Playgroud)
如果您还有任何与此相关的问题,请告诉我。如果您寻求的答案不同,则应询问原来的问题。
归档时间: |
|
查看次数: |
3663 次 |
最近记录: |