小编杨嘉辰*_*杨嘉辰的帖子

如何在 pyspark 结构化流中使用 maxOffsetsPerTrigger?

我想限制从 kafka 获取数据时的速率。我的代码看起来像:

df = spark.read.format('kafka') \
        .option("kafka.bootstrap.servers",'...')\
        .option("subscribe",'A') \
        .option("startingOffsets",'''{"A":{"0":200,"1":200,"2":200}}''') \
        .option("endingOffsets",'''{"A":{"0":400,"1":400,"2":400}}''') \
        .option("maxOffsetsPerTrigger",20) \
        .load() \
        .cache()
Run Code Online (Sandbox Code Playgroud)

然而,当我打电话时df.count(),结果是 600。我期望的是 20。有谁知道为什么“maxOffsetsPerTrigger”不起作用。

apache-kafka pyspark

6
推荐指数
1
解决办法
4971
查看次数

Spark 的 df.cache() 是急切执行还是延迟执行?

我在工作中使用 pyspark。在这篇文章https://unraveldata.com/to-cache-or-not-to-cache/中,它说缓存不是一个动作。然而,当我在 RDD 上运行缓存函数时,需要花费很多时间。Spark UI 显示有一些名为 的激活作业cache at NativeMethodAccessorImpl.java:0。那么缓存是一个动作吗?

apache-spark

-1
推荐指数
1
解决办法
2462
查看次数

标签 统计

apache-kafka ×1

apache-spark ×1

pyspark ×1