我有一个包含以下列的数据框。
scala> show_times.printSchema
root
|-- account: string (nullable = true)
|-- channel: string (nullable = true)
|-- show_name: string (nullable = true)
|-- total_time_watched: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)
这是有关客户观看特定节目的次数的数据。我应该根据观看的总时间对每个节目的客户进行分类。
该数据集共有 1.33 亿行,其中 192 个不同的show_names.
对于每个单独的节目,我应该将客户分为 3 类(1、2、3)。
我使用 Spark MLlib 的QuantileDiscretizer
目前,我循环播放每个节目并按QuantileDiscretizer顺序运行,如下面的代码所示。
我最终想要的是以下示例输入以获得示例输出。
输入示例:
account,channel,show_name,total_time_watched
acct1,ESPN,show1,200
acct2,ESPN,show1,250
acct3,ESPN,show1,800
acct4,ESPN,show1,850
acct5,ESPN,show1,1300
acct6,ESPN,show1,1320
acct1,ESPN,show2,200
acct2,ESPN,show2,250
acct3,ESPN,show2,800
acct4,ESPN,show2,850
acct5,ESPN,show2,1300
acct6,ESPN,show2,1320
Run Code Online (Sandbox Code Playgroud)
示例输出:
account,channel,show_name,total_time_watched,Time_watched_bin
acct1,ESPN,show1,200,1
acct2,ESPN,show1,250,1
acct3,ESPN,show1,800,2
acct4,ESPN,show1,850,2
acct5,ESPN,show1,1300,3
acct6,ESPN,show1,1320,3
acct1,ESPN,show2,200,1
acct2,ESPN,show2,250,1
acct3,ESPN,show2,800,2
acct4,ESPN,show2,850,2
acct5,ESPN,show2,1300,3
acct6,ESPN,show2,1320,3
Run Code Online (Sandbox Code Playgroud)
是否有一种更有效和分布式的方法来使用groupBy类似的操作来完成此操作,而不是循环遍历每个操作show_name …
我正在编写一个spark / scala程序,以读取ZIP文件,将其解压缩并将内容写入一组新文件。我可以将其用于写入本地文件系统,但想知道是否存在一种将输出文件写入分布式文件系统(例如HDFS)的方法。代码如下所示。
import java.util.zip.ZipInputStream
import org.apache.spark.input.PortableDataStream
import java.io._
var i =1
sc.binaryFiles("file:///d/tmp/zips/").flatMap((file:(String, PortableDataStream)) => {
val zipStream = new ZipInputStream(file._2.open)
val entry = zipStream.getNextEntry
val iter = scala.io.Source.fromInputStream(zipStream).getLines
val fname = f"/d/tmp/myfile$i.txt"
i = i + 1
val xx = iter.mkString
val writer = new PrintWriter(new File(fname))
writer.write(xx)
writer.close()
iter
}).collect()
Run Code Online (Sandbox Code Playgroud)
`