我想限制从 kafka 获取数据时的速率。我的代码看起来像:
df = spark.read.format('kafka') \
.option("kafka.bootstrap.servers",'...')\
.option("subscribe",'A') \
.option("startingOffsets",'''{"A":{"0":200,"1":200,"2":200}}''') \
.option("endingOffsets",'''{"A":{"0":400,"1":400,"2":400}}''') \
.option("maxOffsetsPerTrigger",20) \
.load() \
.cache()
Run Code Online (Sandbox Code Playgroud)
然而,当我打电话时df.count()
,结果是 600。我期望的是 20。有谁知道为什么“maxOffsetsPerTrigger”不起作用。
我在工作中使用 pyspark。在这篇文章https://unraveldata.com/to-cache-or-not-to-cache/中,它说缓存不是一个动作。然而,当我在 RDD 上运行缓存函数时,需要花费很多时间。Spark UI 显示有一些名为 的激活作业cache at NativeMethodAccessorImpl.java:0
。那么缓存是一个动作吗?