Spark - 按键对DStream进行排序,并限制为5个值

Dhi*_*TdG 3 apache-spark spark-streaming rdd pyspark

我已经开始学习spark,我写了一个pyspark流程序来读取端口的库存数据(符号,体积)3333.

流式传输的示例数据 3333

"AAC",111113
"ABT",7451020
"ABBV",7325429
"ADPT",318617
"AET",1839122
"ALR",372777
"AGN",4170581
"ABC",3001798
"ANTM",1968246
Run Code Online (Sandbox Code Playgroud)

我想基于显示前5个符号volume.所以我用一个mapper来读取每一行,然后将它拆分comma并反转.

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 5)

lines = ssc.socketTextStream("localhost", 3333)
stocks = lines.map(lambda line: sorted(line.split(','), reverse=True))
stocks.pprint()
Run Code Online (Sandbox Code Playgroud)

以下是输出 stocks.pprint()

[u'111113', u'"AAC"']
[u'7451020', u'"ABT"']
[u'7325429', u'"ABBV"']
[u'318617', u'"ADPT"']
[u'1839122', u'"AET"']
[u'372777', u'"ALR"']
[u'4170581', u'"AGN"']
[u'3001798', u'"ABC"']
[u'1968246', u'"ANTM"']
Run Code Online (Sandbox Code Playgroud)

我有以下功能,以显示股票代码,但不知道如何按键(volume)排序股票,然后限制功能只显示前5个值.

stocks.foreachRDD(processStocks)

def processStocks(stock):
    for st in stock.collect():
        print st[1]
Run Code Online (Sandbox Code Playgroud)

use*_*411 7

由于stream表示无限序列,因此您可以对每个批处理进行排序.首先,您必须正确解析数据:

lines = ssc.queueStream([sc.parallelize([
    "AAC,111113", "ABT,7451020", "ABBV,7325429","ADPT,318617",
    "AET,1839122", "ALR,372777", "AGN,4170581", "ABC,3001798", 
    "ANTM,1968246"
])])

def parse(line):
    try:
        k, v = line.split(",")
        yield (k, int(v))
    except ValueError:
        pass 

parsed = lines.flatMap(parse)
Run Code Online (Sandbox Code Playgroud)

下一个排序:

sorted_ = parsed.transform(
    lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False))
Run Code Online (Sandbox Code Playgroud)

最后你可以pprint顶级元素:

sorted_.pprint(5)
Run Code Online (Sandbox Code Playgroud)

如果一切顺利,你应该获得如下输出:

-------------------------------------------                         
Time: 2016-10-02 14:52:30
-------------------------------------------
('ABT', 7451020)
('ABBV', 7325429)
('AGN', 4170581)
('ABC', 3001798)
('ANTM', 1968246)
...
Run Code Online (Sandbox Code Playgroud)

根据批次的大小,完全排序可能非常昂贵.在这种情况下,你可以采取topparallelize:

sorted_ = parsed.transform(lambda rdd:rdd.ctx.parallelize(rdd.top(5)))

甚至reduceByKey:

sorted_ = parsed.transform(lambda rdd: rdd.ctx.parallelize(rdd.top(5)))
Run Code Online (Sandbox Code Playgroud)