标签: spark-structured-streaming

监控结构化流

我有一个运行良好的结构化流设置,但我希望在它运行时监视它.

我已经构建了一个EventCollector

class EventCollector extends StreamingQueryListener{
  override def onQueryStarted(event: QueryStartedEvent): Unit = {
    println("Start")
  }

  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    println(event.queryStatus.prettyJson)
  }

  override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {
    println("Term")
  }
Run Code Online (Sandbox Code Playgroud)

我已经构建了一个EventCollector并将监听器添加到我的spark会话中

val listener = new EventCollector()
spark.streams.addListener(listener)
Run Code Online (Sandbox Code Playgroud)

然后我解除了查询

val query = inputDF.writeStream
  //.format("console")
  .queryName("Stream")
  .foreach(writer)
  .start()

query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

但是,onQueryProgress永远不会被击中.onQueryStarted确实如此,但我希望以一定的时间间隔获取查询的进度,以监控查询的执行情况.任何人都可以协助吗?

scala apache-spark spark-structured-streaming

10
推荐指数
1
解决办法
2561
查看次数

如何使用from_json与Kafka connect 0.10和Spark Structured Streaming?

我试图重现[Databricks] [1]中的示例并将其应用于Kafka的新连接器并激发结构化流媒体,但我无法使用Spark中的开箱即用方法正确解析JSON ...

注意:该主题以JSON格式写入Kafka.

val ds1 = spark
          .readStream
          .format("kafka")
          .option("kafka.bootstrap.servers", IP + ":9092")
          .option("zookeeper.connect", IP + ":2181")
          .option("subscribe", TOPIC)
          .option("startingOffsets", "earliest")
          .option("max.poll.records", 10)
          .option("failOnDataLoss", false)
          .load()
Run Code Online (Sandbox Code Playgroud)

以下代码不起作用,我相信这是因为列json是一个字符串而且与from_json签名方法不匹配...

    val df = ds1.select($"value" cast "string" as "json")
                .select(from_json("json") as "data")
                .select("data.*")
Run Code Online (Sandbox Code Playgroud)

有小费吗?

[更新]工作示例:https: //github.com/katsou55/kafka-spark-structured-streaming-example/blob/master/src/main/scala-2.11/Main.scala

scala apache-kafka apache-spark apache-kafka-connect spark-structured-streaming

10
推荐指数
1
解决办法
6699
查看次数

Spark结构化流媒体:多个接收器

  1. 我们正在使用结构化流媒体从Kafka消费并将处理后的数据集写入s3.

    我们还希望将处理后的数据写入Kafka继续前进,是否可以通过相同的流式查询来完成?(火花版2.1.1)

  2. 在日志中,我看到了流式查询进度输出,并且我从日志中获得了一个示例持续时间JSON,有人可以提供更清晰的区别addBatch和之间的区别getBatch吗?

  3. TriggerExecution - 处理获取的数据和写入接收器的时间是多少?

    "durationMs" : {
        "addBatch" : 2263426,
        "getBatch" : 12,
        "getOffset" : 273,
       "queryPlanning" : 13,
        "triggerExecution" : 2264288,
        "walCommit" : 552
    },
    
    Run Code Online (Sandbox Code Playgroud)

apache-spark spark-structured-streaming

10
推荐指数
1
解决办法
4399
查看次数

使用Spark Structured Streaming和Trigger.Once

有一个CSV文件的数据湖,全天更新.我正在尝试使用此博客文章中概述Trigger.Once功能创建Spark结构化流工作,以定期写入已写入Parquet数据湖中CSV数据湖的新数据.

这就是我所拥有的:

val df = spark
  .readStream
  .schema(s)
  .csv("s3a://csv-data-lake-files")
Run Code Online (Sandbox Code Playgroud)

以下命令将所有数据写入Parquet湖,但在写完所有数据后没有停止(我必须手动取消作业).

processedDf
  .writeStream
  .trigger(Trigger.Once)
  .format("parquet")
  .option("checkpointLocation", "s3-path-to-checkpoint")
  .start("s3-path-to-parquet-lake")
Run Code Online (Sandbox Code Playgroud)

以下工作也有效,但在写完所有数据后都没有停止(我不得不手动取消工作):

val query = processedDf
  .writeStream
  .trigger(Trigger.Once)
  .format("parquet")
  .option("checkpointLocation", "s3-path-to-checkpoint")
  .start("s3-path-to-parquet-lake")

query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

以下命令在写入任何数据之前停止查询.

val query = processedDf
  .writeStream
  .trigger(Trigger.Once)
  .format("parquet")
  .option("checkpointLocation", "s3-path-to-checkpoint")
  .start("s3-path-to-parquet-lake")

query.stop()
Run Code Online (Sandbox Code Playgroud)

如何配置writeStream查询以等待所有增量数据写入Parquet文件然后停止?

scala apache-spark spark-structured-streaming

10
推荐指数
2
解决办法
2354
查看次数

Spark结构化流kafka转换JSON没有架构(推断架构)

我读过Spark Structured Streaming不支持将Kafka消息作为JSON读取的模式推断.有没有办法像Spark Streaming那样检索模式:

val dataFrame = spark.read.json(rdd.map(_.value()))
dataFrame.printschema 
Run Code Online (Sandbox Code Playgroud)

schema apache-kafka apache-spark spark-structured-streaming

10
推荐指数
3
解决办法
7971
查看次数

Spark 结构化流 - 动态更新数据帧的架构

我有一个简单的结构化流作业,它监视 CSV 文件的目录并写入镶木地板文件 - 两者之间没有转换。

这项工作首先通过使用 读取 CSV 文件来构建数据框readStream(),并使用我通过调用名为buildSchema(). 这是代码:

  var df = spark
    .readStream
    .option("sep", "|")
    .option("header","true")
    .schema(buildSchema(spark, table_name).get) // buildSchema() gets schema for me
    .csv(input_base_dir + table_name + "*")

  logger.info(" new batch indicator")

  if (df.schema != buildSchema(spark, table_name).get) {
    df = spark.sqlContext.createDataFrame(df.collectAsList(), buildSchema(spark, table_name).get)
  }

  val query =
    df.writeStream
      .format("parquet")
      .queryName("convertCSVtoPqrquet for table " + table_name)
      .option("path", output_base_dir + table_name + "/")
      .trigger(ProcessingTime(60.seconds))
      .start()
Run Code Online (Sandbox Code Playgroud)

工作运行良好,但我的问题是,我想始终使用最新的架构来构建我的数据框,或者换句话说,从 CSV 文件中读取。虽然buildSchema()可以获得最新的架构,但我不确定如何定期调用它(或每个 CSV 文件一次),然后使用最新的架构以某种方式重新生成或修改数据框。

测试时,我观察到只有query对象是一批接一批地连续运行;我放置的日志语句和 …

schema apache-spark apache-spark-sql spark-structured-streaming

10
推荐指数
1
解决办法
1063
查看次数

流查询使用多少个 Kafka 消费者来执行?

我很惊讶地看到 Spark 只使用一个 Kafka 消费者来消费来自 Kafka 的数据,并且这个消费者在驱动程序容器中运行。我更希望看到 Spark 创建与主题中分区数量一样多的消费者,并在执行器容器中运行这些消费者。

例如,我有一个包含 5 个分区的主题事件。我启动了我的 Spark Structured Streaming 应用程序,该应用程序使用该主题并写入 HDFS 上的 Parquet。该应用程序有 5 个执行程序。在检查 Spark 创建的 Kafka 消费者组时,我看到只有一个消费者负责所有 5 个分区。这个消费者在带有驱动程序的机器上运行:

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group spark-kafka-source-08e10acf-7234-425c-a78b-3552694f22ef--1589131535-driver-0

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
events          2          -               0               -               consumer-1-8c3d806d-eb1e-4536-97d5-7c9d19582942 /192.168.100.147  consumer-1
events          1          -               0               -               consumer-1-8c3d806d-eb1e-4536-97d5-7c9d19582942 /192.168.100.147  consumer-1
events          0          -               0               -               consumer-1-8c3d806d-eb1e-4536-97d5-7c9d19582942 /192.168.100.147  consumer-1
events          4          -               0               -               consumer-1-8c3d806d-eb1e-4536-97d5-7c9d19582942 /192.168.100.147  consumer-1
events          3          -               0               - …
Run Code Online (Sandbox Code Playgroud)

apache-kafka spark-structured-streaming

10
推荐指数
1
解决办法
529
查看次数

无法找到 LoginModule 类:org.apache.kafka.common.security.plain.PlainLoginModule

环境:Spark 2.3.0、Scala 2.11.12、Kafka(无论最新版本是什么)

\n\n

我有一个安全的 Kafka 系统,我正在尝试将 Spark Streaming Consumer 连接到该系统。以下是我的build.sbt文件:

\n\n
name := "kafka-streaming"\nversion := "1.0"\n\nscalaVersion := "2.11.12"\n\n// still want to be able to run in sbt\n// https://github.com/sbt/sbt-assembly#-provided-configuration\nrun in Compile <<= Defaults.runTask(fullClasspath in Compile, mainClass in (Compile, run), runner in (Compile, run))\n\nfork in run := true\njavaOptions in run ++= Seq(\n    "-Dlog4j.debug=true",\n    "-Dlog4j.configuration=log4j.properties")\n\nassemblyMergeStrategy in assembly := {\n    case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat\n    case PathList("META-INF", _*) => MergeStrategy.discard\n    case _ => MergeStrategy.first\n}\n\nlibraryDependencies ++= Seq(\n    "org.apache.spark" %% "spark-core" % "2.3.0",\n    "org.apache.spark" …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-spark spark-structured-streaming spark-streaming-kafka

10
推荐指数
1
解决办法
5526
查看次数

如何在 foreachBatch 函数中打印/记录输出?

使用表流,我尝试使用 foreachBatch 写入流

df.writestream
.format("delta")
.foreachBatch(WriteStreamToDelta)
...
Run Code Online (Sandbox Code Playgroud)

WriteStreamToDelta 看起来像

def WriteStreamToDelta(microDF, batch_id):
   microDFWrangled = microDF."some_transformations"
   
   print(microDFWrangled.count()) <-- How do I achieve the equivalence of this?

   microDFWrangled.writeStream...
Run Code Online (Sandbox Code Playgroud)

我想查看其中的行数

  1. 笔记本,位于 writeStream 单元下方
  2. 驾驶员日志
  3. 创建一个列表以附加每个微批次的行数。

apache-spark databricks spark-structured-streaming

10
推荐指数
1
解决办法
878
查看次数

优雅地停止结构化流查询

我正在使用 Spark 2.1 并尝试优雅地停止流式查询。

StreamingQuery.stop()一个优雅的停止,因为我在文档中没有看到有关此方法的任何详细信息:

void stop() 如果该查询正在运行,则停止执行该查询。此方法会阻塞,直到执行执行的线程停止。自:2.0.0

而在过去的流媒体世界 (DStreams) 中,有一个选项可以停止流的执行,并可以选择确保所有接收到的数据都已被处理:

def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit 停止流的执行,可选择确保所有接收到的数据都已处理。

stopSparkContext 如果为 true,则停止关联的 SparkContext。无论此 StreamingContext 是否已启动,底层 SparkContext 都将停止。

stopGracefully 如果为 true,则通过等待所有接收到的数据的处理完成来优雅地停止

所以问题是如何优雅地停止结构化流查询?

apache-spark spark-structured-streaming

9
推荐指数
4
解决办法
5438
查看次数