use*_*688 3 apache-spark spark-streaming
我想计算一些表示为RDD的ID的不同值.
在非流媒体案例中,它相当简单.Say IDs是从平面文件读取的ID的RDD.
print ("number of unique IDs %d" % (IDs.distinct().count()))
Run Code Online (Sandbox Code Playgroud)
但我似乎无法在流媒体案例中做同样的事情.假设我们streamIDs是DStream从网络读取的ID.
print ("number of unique IDs from stream %d" % (streamIDs.distinct().count()))
Run Code Online (Sandbox Code Playgroud)
给我这个错误
AttributeError: 'TransformedDStream' object has no attribute 'distinct'
Run Code Online (Sandbox Code Playgroud)
我究竟做错了什么?如何打印出在此批次中显示的不同ID的数量?
使用RDD,您只有一个结果,但使用DStreams,您会得到一系列结果,每个微批次的结果.因此,您无法一次打印唯一ID的数量,而是必须注册一个操作来打印每个微批次的唯一ID,这是一个可以使用distinct的RDD:
streamIDs.foreachRDD(rdd => println(rdd.distinct().count()))
Run Code Online (Sandbox Code Playgroud)
请记住,您可以使用window更大批量创建转换后的dstream:
streamIDs.window(Duration(1000)).foreachRDD(rdd => println(rdd.distinct().count()))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2640 次 |
| 最近记录: |