kal*_*thy 8 scala apache-spark apache-spark-sql apache-spark-mllib
我有一个包含以下列的数据框。
scala> show_times.printSchema
root
|-- account: string (nullable = true)
|-- channel: string (nullable = true)
|-- show_name: string (nullable = true)
|-- total_time_watched: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)
这是有关客户观看特定节目的次数的数据。我应该根据观看的总时间对每个节目的客户进行分类。
该数据集共有 1.33 亿行,其中 192 个不同的show_names
.
对于每个单独的节目,我应该将客户分为 3 类(1、2、3)。
我使用 Spark MLlib 的QuantileDiscretizer
目前,我循环播放每个节目并按QuantileDiscretizer
顺序运行,如下面的代码所示。
我最终想要的是以下示例输入以获得示例输出。
输入示例:
account,channel,show_name,total_time_watched
acct1,ESPN,show1,200
acct2,ESPN,show1,250
acct3,ESPN,show1,800
acct4,ESPN,show1,850
acct5,ESPN,show1,1300
acct6,ESPN,show1,1320
acct1,ESPN,show2,200
acct2,ESPN,show2,250
acct3,ESPN,show2,800
acct4,ESPN,show2,850
acct5,ESPN,show2,1300
acct6,ESPN,show2,1320
Run Code Online (Sandbox Code Playgroud)
示例输出:
account,channel,show_name,total_time_watched,Time_watched_bin
acct1,ESPN,show1,200,1
acct2,ESPN,show1,250,1
acct3,ESPN,show1,800,2
acct4,ESPN,show1,850,2
acct5,ESPN,show1,1300,3
acct6,ESPN,show1,1320,3
acct1,ESPN,show2,200,1
acct2,ESPN,show2,250,1
acct3,ESPN,show2,800,2
acct4,ESPN,show2,850,2
acct5,ESPN,show2,1300,3
acct6,ESPN,show2,1320,3
Run Code Online (Sandbox Code Playgroud)
是否有一种更有效和分布式的方法来使用groupBy
类似的操作来完成此操作,而不是循环遍历每个操作show_name
并将其一个接一个地装箱?
我对此一无所知QuantileDiscretizer
,但认为您最关心的是要应用的数据集QuantileDiscretizer
。我认为你想弄清楚如何将你的输入数据集分割成更小的数据集(你说输入数据集中show_name
有 192 个不同的数据集)。show_name
我注意到您使用镶木地板作为输入格式。我对格式的理解非常有限,但我注意到人们正在使用某种分区方案将大型数据集分割成更小的块,然后他们可以处理他们喜欢的任何内容(根据某些分区方案)。
在您的情况下,分区方案可能包括show_name
.
这将使你的情况变得微不足道,因为分割是在编写时完成的(又不再是我的问题)。
请参阅如何在 Spark 2.1 中保存分区的 parquet 文件?
给定您的迭代解决方案,您可以将每次迭代包装到您提交并行处理的Future中。
Spark SQL 的SparkSession(和 Spark Core 的SparkContext)是线程安全的。
filter
和union
运算符在遵循这个解决方案之前我会三思而后行,因为它会给你的肩膀带来负担,我认为这可以通过解决方案 1 轻松解决。
假设您有一个包含 1.33 亿行的大型镶木地板文件,我首先会为每个show_name
usingfilter
运算符构建 192 个数据集(就像您构建的show_rdd
那样,这与名称相反,因为它不是DataFrame
not RDD
)并且union
(再次像您所做的那样)。
请参阅数据集 API。
我认为这是可行的,但我自己没有检查过。
您可以使用窗口函数(请参阅WindowSpec和 Column 的over运算符)。
窗口函数将为您提供分区(窗口),同时over
以某种方式应用于QuantileDiscretizer
窗口/分区。然而,这需要“解构”QuantileDiscretizer
来Estimator
训练模型并以某种方式将结果模型再次拟合到窗口。
我认为这是可行的,但我自己没有做到。对不起。