将Cassandra查询数据组合/更新到从Kafka收到的结构化流媒体

Mar*_*tin 8 scala cassandra apache-spark spark-structured-streaming

我正在创建一个Spark Structured流应用程序,它将计算每10秒从Kafka收到的数据.

为了能够进行一些计算,我需要在Cassandra数据库中查找有关传感器和放置的一些信息

我有点陷入困境,围绕如何保持整个集群中的Cassandra数据可用,并且不时地以某种方式更新数据,以防我们对数据库表进行了一些更改.

目前,我在使用Datastax Spark-Cassandra-connector本地启动Spark后立即查询数据库

val cassandraSensorDf = spark
  .read
  .cassandraFormat("specifications", "sensors")
  .load
Run Code Online (Sandbox Code Playgroud)

从这里开始,我可以cassandraSensorDs通过加入我的结构化流数据集来使用它.

.join(
   cassandraSensorDs ,
   sensorStateDf("plantKey") <=> cassandraSensorDf ("cassandraPlantKey")
)
Run Code Online (Sandbox Code Playgroud)

如何在运行结构化流式传输时执行其他查询来更新此Cassandra数据?如何在群集设置中提供查询的数据?

Sud*_*adi 2

使用广播变量,您可以编写一个包装器来定期从 Cassandra 获取数据并更新广播变量。使用广播变量在流上执行映射端连接。我还没有测试过这种方法,我认为这可能是一种矫枉过正,具体取决于您的用例(吞吐量)。

如何更新 Spark Streaming 中的广播变量?

另一种方法是查询 Cassandra 以获取流中的每个项目,为了优化连接,您应该确保使用连接池并为 JVM/分区仅创建一个连接。这种方法更简单,您不必担心定期加热 Cassandra 数据。

Spark-Streaming和连接池的实现