相关疑难解决方法(0)

如何在 DataFrame 中跨组使用 QuantileDiscretizer?

我有一个包含以下列的数据框。

scala> show_times.printSchema
root
 |-- account: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- show_name: string (nullable = true)
 |-- total_time_watched: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)

这是有关客户观看特定节目的次数的数据。我应该根据观看的总时间对每个节目的客户进行分类。

该数据集共有 1.33 亿行,其中 192 个不同的show_names.

对于每个单独的节目,我应该将客户分为 3 类(1、2、3)。

我使用 Spark MLlib 的QuantileDiscretizer

目前,我循环播放每个节目并按QuantileDiscretizer顺序运行,如下面的代码所示。

在此输入图像描述

我最终想要的是以下示例输入以获得示例输出。

输入示例:

account,channel,show_name,total_time_watched
acct1,ESPN,show1,200
acct2,ESPN,show1,250
acct3,ESPN,show1,800
acct4,ESPN,show1,850
acct5,ESPN,show1,1300
acct6,ESPN,show1,1320
acct1,ESPN,show2,200
acct2,ESPN,show2,250
acct3,ESPN,show2,800
acct4,ESPN,show2,850
acct5,ESPN,show2,1300
acct6,ESPN,show2,1320
Run Code Online (Sandbox Code Playgroud)

示例输出:

account,channel,show_name,total_time_watched,Time_watched_bin
acct1,ESPN,show1,200,1
acct2,ESPN,show1,250,1
acct3,ESPN,show1,800,2
acct4,ESPN,show1,850,2
acct5,ESPN,show1,1300,3
acct6,ESPN,show1,1320,3
acct1,ESPN,show2,200,1
acct2,ESPN,show2,250,1
acct3,ESPN,show2,800,2
acct4,ESPN,show2,850,2
acct5,ESPN,show2,1300,3
acct6,ESPN,show2,1320,3
Run Code Online (Sandbox Code Playgroud)

是否有一种更有效和分布式的方法来使用groupBy类似的操作来完成此操作,而不是循环遍历每个操作show_name …

scala apache-spark apache-spark-sql apache-spark-mllib

8
推荐指数
1
解决办法
1984
查看次数

在Spark / Scala中写入HDFS以读取zip文件

我正在编写一个spark / scala程序,以读取ZIP文件,将其解压缩并将内容写入一组新文件。我可以将其用于写入本地文件系统,但想知道是否存在一种将输出文件写入分布式文件系统(例如HDFS)的方法。代码如下所示。

import java.util.zip.ZipInputStream
import org.apache.spark.input.PortableDataStream
import java.io._

var i =1
sc.binaryFiles("file:///d/tmp/zips/").flatMap((file:(String, PortableDataStream)) => {   
   val zipStream = new ZipInputStream(file._2.open)            
   val entry = zipStream.getNextEntry                            
   val iter = scala.io.Source.fromInputStream(zipStream).getLines          
   val fname = f"/d/tmp/myfile$i.txt" 

   i = i + 1

   val xx = iter.mkString
   val writer = new PrintWriter(new File(fname))
   writer.write(xx)
   writer.close()

   iter                                                       
}).collect()
Run Code Online (Sandbox Code Playgroud)

`

scala hdfs apache-spark

1
推荐指数
1
解决办法
2万
查看次数