Tin*_*mes 6 apache-spark spark-streaming pyspark
试图使用pyspark做一个示例窗口函数.
conf = (SparkConf()
.setMaster("local[2]")
.setAppName("PythonStreamingKafka")
)
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc, 5)
input_stream = KafkaUtils.createDirectStream(ssc, [thing_topic], {"metadata.broker.list": brokers})
messages = input_stream.map(lambda x: x[1]).window(10,5)
messages.foreachRDD(windowMetrics)
def windowMetrics(rdd):
print ("Window RDD size:", rdd.count())
Run Code Online (Sandbox Code Playgroud)
从Kafka接收数据.一旦执行开始,正如您在下面的Spark GUI中看到的那样,只执行一个批处理,所有其他批处理都进入队列.实际上窗口内没有进行任何处理,因为kafka没有提供任何处理数据(InputSize = 0).
如果不使用窗口操作,它的工作完全正常.代码如下.
messages = input_stream.map(lambda x: x[1])
messages.foreachRDD(windowMetrics)
Run Code Online (Sandbox Code Playgroud)
尝试了一些谷歌搜索并找到了一些相关内容,但没有一个具有相同的问题.
这里的中间批次排队等候.
https://forums.databricks.com/questions/1276/kafka-direct-api-from-spark-streaming-what-happens.html
这里讨论了背压.由于没有提供数据,希望不会有任何背压.
| 归档时间: |
|
| 查看次数: |
1009 次 |
| 最近记录: |