Shi*_*nar 3 apache-spark pyspark
我是学习Apache Spark的初学者。目前,我正在尝试使用Python学习各种聚合。
为了给我所面临的问题提供一些背景信息,我发现很难理解aggregateByKey函数根据“状态”计算订单数的工作。
我正在跟踪ITVersity的YouTube播放列表,以下是我正在使用的代码和一些示例输出。
ordersRDD = sc.textFile("/user/cloudera/sqoop_import/orders")
for i in ordersRDD.take(10): print(i)
Run Code Online (Sandbox Code Playgroud)
输出:
1,2013-07-25 00:00:00.0,11599,CLOSED
2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
3,2013-07-25 00:00:00.0,12111,COMPLETE
4,2013-07-25 00:00:00.0,8827,关闭
5,2013-07-25 00:00:00.0,11318,完成
6,2013-07-25 00:00:00.0,7130,完成
7, 2013-07-25 00:00:00.0,4530,COMPLETE
8,2013-07-25 00:00:00.0,2911,处理
9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT
10,2013- 07-25 00:00:00.0,5648,PENDING_PAYMENT
ordersMap = ordersRDD.map(lambda x: (x.split(",")[3], x))
Run Code Online (Sandbox Code Playgroud)
输出:
(u'CLOSED',u'1,2013-07-25 00:00:00.0,11599,CLOSED')
(u'PENDING_PAYMENT',u'2,2013-07-25 00:00:00.0,256 ,PENDING_PAYMENT')
(u'COMPLETE',u'3,2013-07-25 00:00:00.0,12111,COMPLETE')
(u'CLOSED',u'4,2013-07-25 00:00:00.0 ,8827,CLOSED')
(u'COMPLETE',u'5,2013-07-25 00:00:00.0,11318,COMPLETE')
(u'COMPLETE',u'6,2013-07-25 00:00 :00.0,7130,COMPLETE')
(u'COMPLETE',u'7,2013-07-25 00:00:00.0,4530,COMPLETE')
(u'PROCESSING',u'8,2013-07-25 00 :00:00.0,2911,PROCESSING')
(u'PENDING_PAYMENT',u'9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT')
(u'PENDING_PAYMENT',u'10,2013-07- 25 00:00:00.0, 5648,PENDING_PAYMENT')
ordersByStatus = ordersMap.aggregateByKey(0, lambda acc, val: acc + 1, lambda acc,val: acc + val)
for i in ordersByStatus.take(10): print(i)
Run Code Online (Sandbox Code Playgroud)
最终输出:
(u'SUSPECTED_FRAUD',1558)
(u'CANCELED',1428)
(u'COMPLETE',22899)
(u'PENDING_PAYMENT',15030)
(u'PENDING',7610)
(u'CLOSED',7556 )
(u'ON_HOLD',3798)
(u'PROCESSING',8275)
(u'PAYMENT_REVIEW',729)
我很难理解的问题是:
1.为什么在2个lambda函数中将aggregateByKey函数用作参数?
2. Vizualize第一个lambda函数的作用是什么?
3. Vizualize第二个lambda函数做什么?
如果可以向我解释上述问题,以及如果可能的话,还可以使用一些简单的框图来解释AggregatByKey的工作方式,将非常有帮助?也许是一些中间计算?
感谢你的帮助!
谢谢你,
希夫
小智 5
Spark RDD分为多个分区,因此,当您对所有数据执行聚合功能时,您将首先在每个分区内聚合数据(分区只是数据的细分)。然后,您将需要告诉Spark如何聚合分区。
第一个lambda函数告诉Spark在遇到新值时如何更改运行计数(累加器)。由于要进行计数,因此只需将1加到累加器上。在一个切片中,如果运行计数当前为4,并且添加了另一个值,则运行计数应为4 + 1 = 5。因此,您的第一个lambda函数是:
lambda acc, val: acc + 1
Run Code Online (Sandbox Code Playgroud)
第二个lambda函数告诉Spark如何将一个数据切片的运行计数与另一个数据切片的运行计数结合在一起。如果一个切片的计数为5,第二个切片的计数为7,则总计数为5 + 7 = 12。因此,您的第二个函数最好写成:
lambda acc1, acc2: acc1 + acc2
Run Code Online (Sandbox Code Playgroud)
剩下的唯一微妙之处是,所有操作都是基于“按键”完成的。累加器(数量)因钥匙而异。
| 归档时间: |
|
| 查看次数: |
446 次 |
| 最近记录: |