结构化流式处理并将嵌套数据拆分为多个数据集

Mar*_*tin 5 apache-kafka apache-spark apache-spark-sql spark-structured-streaming

我正在使用Spark的结构化流(2.2.1),使用Kafka每60秒从传感器接收一次数据。我很难解决如何打包此Kafka数据以使其能够正确处理的问题。

我需要能够进行一些计算,因为数据来自Kafka。

我的问题是将来自Kafka的JSON数据解压缩到我可以使用的数据集中

数据

简化的数据如下所示:

{
  id: 1,
  timestamp: "timestamp"
  pump: {
    current: 1.0,
    flow: 20.0
    torque: 5.0
  },
  reactors: [
    {
      id: 1,
      status: 200,
    },

    {
      id: 2,
      status: 300,
    }
  ],
  settings: {
    pumpTimer: 20.0,
    reactorStatusTimer: 200.0
  }
}
Run Code Online (Sandbox Code Playgroud)

为了能够与Spark一起使用,我为其中的每一个创建了一些case类结构:

// First, general package
case class RawData(id: String, timestamp: String, pump: String, reactors: Array[String], settings: String)
// Each of the objects from the data
case class Pump(current: Float, flow: Float, torque: Float)
case class Reactor(id: Int, status: Int)
case class Settings(oos: Boolean, pumpTimer: Float, reactorStatusTimer: Float)
Run Code Online (Sandbox Code Playgroud)

并使用以下方式生成架构:

val rawDataSchema = Encoders.product[RawData].schema
Run Code Online (Sandbox Code Playgroud)

原始数据到Spark模式

首先,我将卡夫卡的“值”字段放入我的一般模式中:

val rawDataSet = df.select($"value" cast "string" as "json")
  .select(from_json($"json", rawDataSchema))
  .select("data.*").as[RawData]
Run Code Online (Sandbox Code Playgroud)

使用此rawDataSet,我可以将每个单独的对象打包到数据集中。

val pump = rawDataSet.select(from_json($"pump", pumpSchema) as 'pumpData)
  .select("pumpData.*").as[Pump]

val settings = rawDataSet.select(from_json($"settings", settingsSchema) as 'settingsData)
  .select("settingsData.*").as[Settings]
Run Code Online (Sandbox Code Playgroud)

这为每个JSON对象提供了非常整洁的数据集。

处理数据

这是我的问题,例如,如果要比较或计算“设置”和“泵”的两个数据集之间的某些值,则JOIN无法使用结构化流工作。

val joinedData = pump.join(settings)
Run Code Online (Sandbox Code Playgroud)

错误:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Inner join between two streaming DataFrames/Datasets is not supported;
Run Code Online (Sandbox Code Playgroud)

我的方法是否全部错误?还是有其他建议来解决此问题?

谢谢

Mar*_*tin 2

I\xe2\x80\x99ll 用我现在工作的解决方案回答我自己的问题

\n\n

我可以将这些对象作为一个带有嵌套对象的案例类连接在一起,而不是为 JSON 中的每个对象创建案例类:

\n\n
case class RawData(\n  id: String, \n  timestamp: String, \n  pump: Pump, \n  reactors: Array[Reactor], \n  settings: Settings\n)\n\ncase class Pump(current: Float, flow: Float, torque: Float)\ncase class Reactor(id: Int, status: Int)\ncase class Settings(oos: Boolean, pumpTimer: Float, reactorStatusTimer: Float)\n
Run Code Online (Sandbox Code Playgroud)\n\n

为了将其变成可用的数据集,我可以简单地调用

\n\n
val rawDataset = df.select($"value" cast "string" as "json")\n  .select(from_json($"json", Encoders.product[RawData].schema) as \'data)\n  .select("data.*").as[RawData]\n  .withColumn("reactor", explode($"reactors")) // Handles the array of reactors, making one row in the dataset per reactor.\n
Run Code Online (Sandbox Code Playgroud)\n\n

处理完 JSON 并将其放入我的定义模式后,我可以像这样选择每个特定的传感器:

\n\n
val tester = rawDataset.select($"pump.current", $\xe2\x80\x9dsettings.pumpTimer\xe2\x80\x9d)\n
Run Code Online (Sandbox Code Playgroud)\n\n

感谢user6910411为我指明了正确的方向

\n