我需要为我的应用程序增加每个分区的输入速率,我已经.set("spark.streaming.kafka.maxRatePerPartition",100)用于配置.流持续时间是10秒,所以我希望5*100*10=5000这批次的流程消息.但是,我收到的输入率大约是500.您能否建议任何修改以提高此费率?
我们的主题上已有一些历史数据排队,我们不想一次处理所有这些数据,因为这样做比较困难(如果失败,则必须重新开始!)。
另外,了解如何控制批处理大小对于调整作业非常有帮助。
当使用DStreams尽可能精确地控制批次大小的方法时,使用Spark Streaming时限制Kafka批次大小
相同的方法(即设置maxRatePerPartition然后进行调整batchDuration)非常麻烦,但DStream与结构化流技术完全不兼容。
理想情况下,我想了解类似maxBatchSize和的配置minBatchSize,在这里我可以简单地设置所需的记录数。
scala apache-kafka apache-spark spark-streaming spark-structured-streaming