Lit*_*chy 6 redis apache-spark spark-structured-streaming
我正在foreachBatch通过以下代码使用spark-structed-streaming从redis读取批记录(尝试通过设置batchSize stream.read.batch.size)
val data = spark.readStream.format("redis")
.option("stream.read.batch.size").load()
val query = data.writeStream.foreachBatch {
(batchDF: DataFrame, batchId: Long) => ...
// we count size of batchDF here, we want to limit its size
// some operation
}
Run Code Online (Sandbox Code Playgroud)
目前我们设置stream.read.batch.size为 128,但似乎这不起作用。batchSize似乎是随机的,有时超过1000甚至10000。
但是我不想等待这么久(10000条记录),因为我有一些操作(在代码注释中// some operation)需要尽快完成,所以我想控制最大批量大小,以便当记录达到此限制时可以立即处理,怎么办?
我是 Spark-Redis 的维护者。目前不支持此功能。该stream.read.batch.size参数控制单个 Redis API 调用读取的项目数(countXREADGROUP 调用的参数)。它不会影响每个触发器的项目数(batchDF 大小)。我已经在 github 上针对此功能请求开了一张票。