小编K.A*_*Ali的帖子

在Pyspark中将复杂的数据行划分为简单行

我有这个代码:

from pyspark import SparkContext
from pyspark.sql import SQLContext, Row

sc = SparkContext()
sqlContext = SQLContext(sc)
documents = sqlContext.createDataFrame([
    Row(id=1, title=[Row(value=u'cars', max_dist=1000)]),
    Row(id=2, title=[Row(value=u'horse bus',max_dist=50), Row(value=u'normal bus',max_dist=100)]),
    Row(id=3, title=[Row(value=u'Airplane', max_dist=5000)]),
    Row(id=4, title=[Row(value=u'Bicycles', max_dist=20),Row(value=u'Motorbikes', max_dist=80)]),
    Row(id=5, title=[Row(value=u'Trams', max_dist=15)])])

documents.show(truncate=False)
#+---+----------------------------------+
#|id |title                             |
#+---+----------------------------------+
#|1  |[[1000,cars]]                     |
#|2  |[[50,horse bus], [100,normal bus]]|
#|3  |[[5000,Airplane]]                 |
#|4  |[[20,Bicycles], [80,Motorbikes]]  |
#|5  |[[15,Trams]]                      |
#+---+----------------------------------+
Run Code Online (Sandbox Code Playgroud)

我需要将所有复合行(例如2和4)拆分为多行,同时保留'id',以获得如下结果:

#+---+----------------------------------+
#|id |title                             |
#+---+----------------------------------+
#|1  |[1000,cars]                       |
#|2  |[50,horse bus]                    |
#|2  |[100,normal bus]                  |
#|3 …
Run Code Online (Sandbox Code Playgroud)

python dataframe apache-spark apache-spark-sql pyspark

5
推荐指数
1
解决办法
3021
查看次数

将生成的TFIDF计算添加到Pyspark中原始文档的数据框中

我使用Spark MLlib来计算每个文档的所有术语'TFIDF的总和(每个文档由一行数据帧描述),我编写了以下代码:

from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.feature import IDF
from pyspark.mllib.linalg import SparseVector

sc = SparkContext() 
sqlContext = SQLContext(sc)

#SECTION 1
documents = sqlContext.createDataFrame([
    (0, "hello spark", "data1"),
    (1, "this is example", "data2"),
    (2, "spark is fast","data3"),
    (3, "hello world","data4")], ["doc_id", "doc_text", "another"])

#SECTION 2
documents.registerTempTable("doc_table")
textcolumn= sqlContext.sql("SELECT doc_text FROM doc_table")
doc_words= textcolumn.map(lambda d: d.doc_text).map(lambda t: t.split(" "))

#SECTION 3
hashingTF = HashingTF()
tf = hashingTF.transform(doc_words).cache()
idf = IDF().fit(tf)
tfidf …
Run Code Online (Sandbox Code Playgroud)

python tf-idf apache-spark pyspark apache-spark-mllib

3
推荐指数
1
解决办法
4140
查看次数