Jer*_*rge 2 python dataframe apache-spark rdd pyspark
我有一个rdd:
a,1,2,3,4
b,4,6
c,8,9,10,11
Run Code Online (Sandbox Code Playgroud)
我想将其转换为具有索引的Spark数据框:
df:
Index Name Number
0 a 1,2,3,4
1 b 4,6
2 c 8,9,10,11
Run Code Online (Sandbox Code Playgroud)
我尝试拆分RDD:
parts = rdd.flatMap(lambda x: x.split(","))
Run Code Online (Sandbox Code Playgroud)
但结果是:
a,
1,
2,
3,...
Run Code Online (Sandbox Code Playgroud)
如何将RDD拆分并转换为pyspark中的Dataframe,使第一个元素作为第一列,其余元素合并为一列?
如解决方案中所述:
rd = rd1.map(lambda x: x.split("," , 1) ).zipWithIndex()
rd.take(3)
Run Code Online (Sandbox Code Playgroud)
输出:
[(['a', '1,2,3,4'], 0),
(['b', '4,6'], 1),
(['c', '8,9,10,11'], 2)]
Run Code Online (Sandbox Code Playgroud)
下一步:
rd2=rd2=rd.map(lambda x,y: (y, x[0] , x[1]) ).toDF(["index", "name" ,"number"])
rd2.collect()
Run Code Online (Sandbox Code Playgroud)
我收到以下错误:
An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 55.0 failed 1 times, most recent failure: Lost task 0.0
in stage 55.0 (TID 85, localhost, executor driver):
org.apache.spark.api.python.PythonException: Traceback (most recent
call last):
Run Code Online (Sandbox Code Playgroud)
它是版本的问题吗?
以下RDD转换,你会很高兴.
df = rdd.map(lambda x: x.split("," , 1) ) # Split only at first occurence of ,
.zipWithIndex() # Add an incrementing index to each element
.map(lambda (x,y) : (y, x[0] , x[1]) ) # flatten the structure
.toDF(["index", "name" , "number"]) # Convert to dataframe
df.show()
#+-----+----+---------+
#|index|name| number|
#+-----+----+---------+
#| 0| a| 1,2,3,4|
#| 1| b| 4,6|
#| 2| c|8,9,10,11|
#+-----+----+---------+
Run Code Online (Sandbox Code Playgroud)