将Pyspark RDD拆分为不同的列并转换为Dataframe

Jer*_*rge 2 python dataframe apache-spark rdd pyspark

我有一个rdd:

a,1,2,3,4
b,4,6
c,8,9,10,11
Run Code Online (Sandbox Code Playgroud)

我想将其转换为具有索引的Spark数据框:

df:

Index  Name  Number
 0      a     1,2,3,4
 1      b     4,6
 2      c     8,9,10,11
Run Code Online (Sandbox Code Playgroud)

我尝试拆分RDD:

parts = rdd.flatMap(lambda x: x.split(","))
Run Code Online (Sandbox Code Playgroud)

但结果是:

a,
1,
2,
3,...
Run Code Online (Sandbox Code Playgroud)

如何将RDD拆分并转换为pyspark中的Dataframe,使第一个元素作为第一列,其余元素合并为一列?

如解决方案中所述:

rd = rd1.map(lambda x: x.split("," , 1) ).zipWithIndex()
rd.take(3)
Run Code Online (Sandbox Code Playgroud)

输出:

[(['a', '1,2,3,4'], 0),
(['b', '4,6'], 1),
(['c', '8,9,10,11'], 2)]
Run Code Online (Sandbox Code Playgroud)

下一步:

rd2=rd2=rd.map(lambda x,y: (y, x[0] , x[1]) ).toDF(["index", "name" ,"number"])
rd2.collect()
Run Code Online (Sandbox Code Playgroud)

我收到以下错误:

 An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 0 in stage 55.0 failed 1 times, most recent failure: Lost task 0.0 
in stage 55.0 (TID 85, localhost, executor driver): 
org.apache.spark.api.python.PythonException: Traceback (most recent 
call last):
Run Code Online (Sandbox Code Playgroud)

它是版本的问题吗?

phi*_*ert 6

以下RDD转换,你会很高兴.

df = rdd.map(lambda x: x.split("," , 1) )      # Split only at first occurence of ,
        .zipWithIndex()                        # Add an incrementing index to each element
        .map(lambda (x,y) : (y, x[0] , x[1]) ) # flatten the structure
        .toDF(["index", "name" , "number"])    # Convert to dataframe

df.show()

#+-----+----+---------+
#|index|name|   number|
#+-----+----+---------+
#|    0|   a|  1,2,3,4|
#|    1|   b|      4,6|
#|    2|   c|8,9,10,11|
#+-----+----+---------+
Run Code Online (Sandbox Code Playgroud)