小编Mar*_*tin的帖子

将Cassandra查询数据组合/更新到从Kafka收到的结构化流媒体

我正在创建一个Spark Structured流应用程序,它将计算每10秒从Kafka收到的数据.

为了能够进行一些计算,我需要在Cassandra数据库中查找有关传感器和放置的一些信息

我有点陷入困境,围绕如何保持整个集群中的Cassandra数据可用,并且不时地以某种方式更新数据,以防我们对数据库表进行了一些更改.

目前,我在使用Datastax Spark-Cassandra-connector本地启动Spark后立即查询数据库

val cassandraSensorDf = spark
  .read
  .cassandraFormat("specifications", "sensors")
  .load
Run Code Online (Sandbox Code Playgroud)

从这里开始,我可以cassandraSensorDs通过加入我的结构化流数据集来使用它.

.join(
   cassandraSensorDs ,
   sensorStateDf("plantKey") <=> cassandraSensorDf ("cassandraPlantKey")
)
Run Code Online (Sandbox Code Playgroud)

如何在运行结构化流式传输时执行其他查询来更新此Cassandra数据?如何在群集设置中提供查询的数据?

scala cassandra apache-spark spark-structured-streaming

8
推荐指数
1
解决办法
455
查看次数

结构化流式处理并将嵌套数据拆分为多个数据集

我正在使用Spark的结构化流(2.2.1),使用Kafka每60秒从传感器接收一次数据。我很难解决如何打包此Kafka数据以使其能够正确处理的问题。

我需要能够进行一些计算,因为数据来自Kafka。

我的问题是将来自Kafka的JSON数据解压缩到我可以使用的数据集中

数据

简化的数据如下所示:

{
  id: 1,
  timestamp: "timestamp"
  pump: {
    current: 1.0,
    flow: 20.0
    torque: 5.0
  },
  reactors: [
    {
      id: 1,
      status: 200,
    },

    {
      id: 2,
      status: 300,
    }
  ],
  settings: {
    pumpTimer: 20.0,
    reactorStatusTimer: 200.0
  }
}
Run Code Online (Sandbox Code Playgroud)

为了能够与Spark一起使用,我为其中的每一个创建了一些case类结构:

// First, general package
case class RawData(id: String, timestamp: String, pump: String, reactors: Array[String], settings: String)
// Each of the objects from the data
case class Pump(current: Float, flow: Float, torque: Float)
case class Reactor(id: Int, status: …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-spark apache-spark-sql spark-structured-streaming

5
推荐指数
1
解决办法
819
查看次数

将JSON键/值对的列表拆分为数据集中行的列

我有一个列,其中包含键/值对象列表:

+----+--------------------------------------------------------------------------------------------+
|ID  | Settings                                                                                   |
+----+--------------------------------------------------------------------------------------------+
|1   | [{"key":"key1","value":"val1"}, {"key":"key2","value":"val2"}, {"key":"key3","value":"val3"}] |
+----+--------------------------------------------------------------------------------------------+
Run Code Online (Sandbox Code Playgroud)

是否可以将此对象列表拆分为自己的行?因此:

+----+------+-------+-------+
|ID  | key1 | key2  |  key3 |
+----+------+-------+-------+
|1   | val1 | val2  |  val3 |
+----+------+-------+-------+
Run Code Online (Sandbox Code Playgroud)

我试过爆炸,然后放入一个结构:

   case class Setting(key: String, value: String)
   val newDF = df.withColumn("setting", explode($"settings"))
                .select($"id", from_json($"setting" Encoders.product[Setting].schema) as 'settings)
Run Code Online (Sandbox Code Playgroud)

这给了我:

+------+------------------------------+
|ID    |settings                      |
+------+------------------------------+
|1     |[key1,val1]                   |
|1     |[key2,val2]                   |
|1     |[key3,val3]                   |
+------+------------------------------+
Run Code Online (Sandbox Code Playgroud)

从这里我可以通过这样的settings.key使用指定的行但它不是我需要的.我需要访问一行数据中的多个键

scala apache-spark apache-spark-sql

3
推荐指数
1
解决办法
640
查看次数