Pra*_*pad 3 scala apache-spark
下面是在数据帧中添加序列号列的逻辑.当我从分隔文件中读取数据时,它按预期工作.今天我有一个新任务从oracle表读取数据并添加序列号和进一步处理.当我从oracle表中读取它时,我面临着以下逻辑的问题,在数据帧中添加序列号.
oracleTableDF是我的数据帧
//creating Sequence no. logic for SeqNum
val rowRDD = oracleTableDF.rdd.zipWithIndex().map(indexedRow => Row.fromSeq((((indexedRow._2.toLong+1)).toLong) +: indexedRow._1.toSeq))
//creating StructType to add Seqnum in schema
val newstructure = StructType(Array(StructField("SeqNum",LongType)).++(oracleTableDF.schema.fields))
//creating new Data Frame with seqnum
oracleTableDF = spark.createDataFrame(rowRDD, newstructure)
Run Code Online (Sandbox Code Playgroud)
我无法找到实际问题.因为当我从文件中读取逻辑时,逻辑在集群中按预期工作.但是当我从oracle表中读到它时面临一些问题.它在本地模式下也按预期工作.
以下是错误:
"ERROR scheduler.TaskSetManager:阶段1.0中的任务0失败4次;中止作业org.apache.spark.SparkException:作业因阶段失败而中止:阶段1.0中的任务0失败4次,最近失败:阶段丢失任务0.3 1.0(TID 4,xxxx,executor 1):java.lang.NoClassDefFoundError:无法初始化类oracleDataProcess $"
如果您只需要使用自动增量整数值向数据框添加列,则可以使用以下monotonicallyIncreasingId内容LongType:
val oracleTableDF2 = oracleTableDF.withColumn("SeqNum", monotonicallyIncreasingId)
Run Code Online (Sandbox Code Playgroud)
[UPDATE]
请注意,monotonicallyIncreasingId不推荐使用.monotonically_increasing_id()应该用来代替.
| 归档时间: |
|
| 查看次数: |
5854 次 |
| 最近记录: |