him*_*shu 5 scala cassandra apache-spark spark-streaming apache-spark-sql
我正在尝试确定不同粒度级别的完成状态。例如,如果该地区的所有城镇都是完整的,则该地区是“完整的”。
我在 Spark 中使用以下方法将状态保持在最低级别(城镇),在内存中:
步骤 1. 将 Cassandra 表中的初始状态加载到 Spark 数据帧中。
+----------+--------+--------+------------+
| country | region | town | isComplete |
+----------+--------+--------+------------+
| Country1 | State1 | Town1 | FALSE |
| Country1 | State1 | Town2 | FALSE |
| Country1 | State1 | Town3 | FALSE |
| Country1 | State1 | Town4 | FALSE |
| Country1 | State1 | Town5 | FALSE |
| Country1 | State1 | Town6 | FALSE |
| Country1 | State1 | Town7 | FALSE |
| Country1 | State1 | Town8 | FALSE |
| Country1 | State1 | Town9 | FALSE |
| Country1 | State1 | Town10 | FALSE |
| Country1 | State1 | Town11 | FALSE |
+----------+--------+--------+------------+
Run Code Online (Sandbox Code Playgroud)
步骤 2. 开始流处理并使用在每个微批次中创建的数据帧尝试使用左外连接更新步骤 1 中数据帧中的状态。
第 1 批:
+----------+--------+-------+------------+
| country | region | town | isComplete |
+----------+--------+-------+------------+
| Country1 | State1 | Town1 | TRUE |
| Country1 | State1 | Town2 | TRUE |
| Country1 | State1 | Town3 | TRUE |
| Country1 | State1 | Town4 | TRUE |
+----------+--------+-------+------------+
Run Code Online (Sandbox Code Playgroud)
应用第 1 批后:
+----------+--------+--------+------------+
| country | state | town | isComplete |
+----------+--------+--------+------------+
| Country1 | State1 | Town1 | TRUE |
| Country1 | State1 | Town2 | TRUE |
| Country1 | State1 | Town3 | TRUE |
| Country1 | State1 | Town4 | TRUE |
| Country1 | State1 | Town5 | FALSE |
| Country1 | State1 | Town6 | FALSE |
| Country1 | State1 | Town7 | FALSE |
| Country1 | State1 | Town8 | FALSE |
| Country1 | State1 | Town9 | FALSE |
| Country1 | State1 | Town10 | FALSE |
| Country1 | State1 | Town11 | FALSE |
+----------+--------+--------+------------+
Run Code Online (Sandbox Code Playgroud)
我的想法是,通过保持数据帧可变,我将能够在每个批次中更新它并在流式作业的整个生命周期内保持整体状态(如全局变量)。
基本数据集大约有 120 万条记录(约 100 MB),预计可扩展到 10 GB。
我遇到了内存不足的问题。每批比前一批花费更多的处理时间。此外,同一作业的阶段数会随着批次的增加而增加。最终应用程序因超出 GC 开销限制而失败。
var statusDf = loadStatusFromCassandra(sparkSession)
ipStream.foreachRDD { statusMsgRDD =>
if (!statusMsgRDD.isEmpty) {
// 1. Create data-frame from the current micro-batch RDD
val messageDf = getMessageDf(sparkSession, statusMsgRDD)
// 2. To update, Left outer join statusDf with messageDf
statusDf = updateStatusDf(sparkSession, statusDf, messageDf)
// 3. Use updated statusDf to generate aggregations at higher levels
// and publish to a Kafka topic
// if a higher level (eg. region) is completed.
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
288 次 |
| 最近记录: |