我正在创建一个Spark Structured流应用程序,它将计算每10秒从Kafka收到的数据.
为了能够进行一些计算,我需要在Cassandra数据库中查找有关传感器和放置的一些信息
我有点陷入困境,围绕如何保持整个集群中的Cassandra数据可用,并且不时地以某种方式更新数据,以防我们对数据库表进行了一些更改.
目前,我在使用Datastax Spark-Cassandra-connector本地启动Spark后立即查询数据库
val cassandraSensorDf = spark
.read
.cassandraFormat("specifications", "sensors")
.load
Run Code Online (Sandbox Code Playgroud)
从这里开始,我可以cassandraSensorDs通过加入我的结构化流数据集来使用它.
.join(
cassandraSensorDs ,
sensorStateDf("plantKey") <=> cassandraSensorDf ("cassandraPlantKey")
)
Run Code Online (Sandbox Code Playgroud)
如何在运行结构化流式传输时执行其他查询来更新此Cassandra数据?如何在群集设置中提供查询的数据?
我正在使用Spark的结构化流(2.2.1),使用Kafka每60秒从传感器接收一次数据。我很难解决如何打包此Kafka数据以使其能够正确处理的问题。
我需要能够进行一些计算,因为数据来自Kafka。
我的问题是将来自Kafka的JSON数据解压缩到我可以使用的数据集中
简化的数据如下所示:
{
id: 1,
timestamp: "timestamp"
pump: {
current: 1.0,
flow: 20.0
torque: 5.0
},
reactors: [
{
id: 1,
status: 200,
},
{
id: 2,
status: 300,
}
],
settings: {
pumpTimer: 20.0,
reactorStatusTimer: 200.0
}
}
Run Code Online (Sandbox Code Playgroud)
为了能够与Spark一起使用,我为其中的每一个创建了一些case类结构:
// First, general package
case class RawData(id: String, timestamp: String, pump: String, reactors: Array[String], settings: String)
// Each of the objects from the data
case class Pump(current: Float, flow: Float, torque: Float)
case class Reactor(id: Int, status: …Run Code Online (Sandbox Code Playgroud) apache-kafka apache-spark apache-spark-sql spark-structured-streaming
我有一个列,其中包含键/值对象列表:
+----+--------------------------------------------------------------------------------------------+
|ID | Settings |
+----+--------------------------------------------------------------------------------------------+
|1 | [{"key":"key1","value":"val1"}, {"key":"key2","value":"val2"}, {"key":"key3","value":"val3"}] |
+----+--------------------------------------------------------------------------------------------+
Run Code Online (Sandbox Code Playgroud)
是否可以将此对象列表拆分为自己的行?因此:
+----+------+-------+-------+
|ID | key1 | key2 | key3 |
+----+------+-------+-------+
|1 | val1 | val2 | val3 |
+----+------+-------+-------+
Run Code Online (Sandbox Code Playgroud)
我试过爆炸,然后放入一个结构:
case class Setting(key: String, value: String)
val newDF = df.withColumn("setting", explode($"settings"))
.select($"id", from_json($"setting" Encoders.product[Setting].schema) as 'settings)
Run Code Online (Sandbox Code Playgroud)
这给了我:
+------+------------------------------+
|ID |settings |
+------+------------------------------+
|1 |[key1,val1] |
|1 |[key2,val2] |
|1 |[key3,val3] |
+------+------------------------------+
Run Code Online (Sandbox Code Playgroud)
从这里我可以通过这样的settings.key使用指定的行但它不是我需要的.我需要访问一行数据中的多个键