Spark Streaming - DStream没有distinct()

use*_*688 3 apache-spark spark-streaming

我想计算一些表示为RDD的ID的不同值.

在非流媒体案例中,它相当简单.Say IDs是从平面文件读取的ID的RDD.

    print ("number of unique IDs %d" %  (IDs.distinct().count()))
Run Code Online (Sandbox Code Playgroud)

但我似乎无法在流媒体案例中做同样的事情.假设我们streamIDsDStream从网络读取的ID.

    print ("number of unique IDs from stream %d" %  (streamIDs.distinct().count()))
Run Code Online (Sandbox Code Playgroud)

给我这个错误

AttributeError: 'TransformedDStream' object has no attribute 'distinct'
Run Code Online (Sandbox Code Playgroud)

我究竟做错了什么?如何打印出在此批次中显示的不同ID的数量?

jua*_*011 8

使用RDD,您只有一个结果,但使用DStreams,您会得到一系列结果,每个微批次的结果.因此,您无法一次打印唯一ID的数量,而是必须注册一个操作来打印每个微批次的唯一ID,这是一个可以使用distinct的RDD:

streamIDs.foreachRDD(rdd => println(rdd.distinct().count()))
Run Code Online (Sandbox Code Playgroud)

请记住,您可以使用window更大批量创建转换后的dstream:

streamIDs.window(Duration(1000)).foreachRDD(rdd => println(rdd.distinct().count()))
Run Code Online (Sandbox Code Playgroud)