我有这个代码:
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
sc = SparkContext()
sqlContext = SQLContext(sc)
documents = sqlContext.createDataFrame([
Row(id=1, title=[Row(value=u'cars', max_dist=1000)]),
Row(id=2, title=[Row(value=u'horse bus',max_dist=50), Row(value=u'normal bus',max_dist=100)]),
Row(id=3, title=[Row(value=u'Airplane', max_dist=5000)]),
Row(id=4, title=[Row(value=u'Bicycles', max_dist=20),Row(value=u'Motorbikes', max_dist=80)]),
Row(id=5, title=[Row(value=u'Trams', max_dist=15)])])
documents.show(truncate=False)
#+---+----------------------------------+
#|id |title |
#+---+----------------------------------+
#|1 |[[1000,cars]] |
#|2 |[[50,horse bus], [100,normal bus]]|
#|3 |[[5000,Airplane]] |
#|4 |[[20,Bicycles], [80,Motorbikes]] |
#|5 |[[15,Trams]] |
#+---+----------------------------------+
Run Code Online (Sandbox Code Playgroud)
我需要将所有复合行(例如2和4)拆分为多行,同时保留'id',以获得如下结果:
#+---+----------------------------------+
#|id |title |
#+---+----------------------------------+
#|1 |[1000,cars] |
#|2 |[50,horse bus] |
#|2 |[100,normal bus] |
#|3 …Run Code Online (Sandbox Code Playgroud) 我使用Spark MLlib来计算每个文档的所有术语'TFIDF的总和(每个文档由一行数据帧描述),我编写了以下代码:
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.feature import IDF
from pyspark.mllib.linalg import SparseVector
sc = SparkContext()
sqlContext = SQLContext(sc)
#SECTION 1
documents = sqlContext.createDataFrame([
(0, "hello spark", "data1"),
(1, "this is example", "data2"),
(2, "spark is fast","data3"),
(3, "hello world","data4")], ["doc_id", "doc_text", "another"])
#SECTION 2
documents.registerTempTable("doc_table")
textcolumn= sqlContext.sql("SELECT doc_text FROM doc_table")
doc_words= textcolumn.map(lambda d: d.doc_text).map(lambda t: t.split(" "))
#SECTION 3
hashingTF = HashingTF()
tf = hashingTF.transform(doc_words).cache()
idf = IDF().fit(tf)
tfidf …Run Code Online (Sandbox Code Playgroud)