我有一个运行良好的结构化流设置,但我希望在它运行时监视它.
我已经构建了一个EventCollector
class EventCollector extends StreamingQueryListener{
override def onQueryStarted(event: QueryStartedEvent): Unit = {
println("Start")
}
override def onQueryProgress(event: QueryProgressEvent): Unit = {
println(event.queryStatus.prettyJson)
}
override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {
println("Term")
}
Run Code Online (Sandbox Code Playgroud)
我已经构建了一个EventCollector并将监听器添加到我的spark会话中
val listener = new EventCollector()
spark.streams.addListener(listener)
Run Code Online (Sandbox Code Playgroud)
然后我解除了查询
val query = inputDF.writeStream
//.format("console")
.queryName("Stream")
.foreach(writer)
.start()
query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
但是,onQueryProgress永远不会被击中.onQueryStarted确实如此,但我希望以一定的时间间隔获取查询的进度,以监控查询的执行情况.任何人都可以协助吗?
我试图重现[Databricks] [1]中的示例并将其应用于Kafka的新连接器并激发结构化流媒体,但我无法使用Spark中的开箱即用方法正确解析JSON ...
注意:该主题以JSON格式写入Kafka.
val ds1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", IP + ":9092")
.option("zookeeper.connect", IP + ":2181")
.option("subscribe", TOPIC)
.option("startingOffsets", "earliest")
.option("max.poll.records", 10)
.option("failOnDataLoss", false)
.load()
Run Code Online (Sandbox Code Playgroud)
以下代码不起作用,我相信这是因为列json是一个字符串而且与from_json签名方法不匹配...
val df = ds1.select($"value" cast "string" as "json")
.select(from_json("json") as "data")
.select("data.*")
Run Code Online (Sandbox Code Playgroud)
有小费吗?
[更新]工作示例:https: //github.com/katsou55/kafka-spark-structured-streaming-example/blob/master/src/main/scala-2.11/Main.scala
scala apache-kafka apache-spark apache-kafka-connect spark-structured-streaming
我们正在使用结构化流媒体从Kafka消费并将处理后的数据集写入s3.
我们还希望将处理后的数据写入Kafka继续前进,是否可以通过相同的流式查询来完成?(火花版2.1.1)
在日志中,我看到了流式查询进度输出,并且我从日志中获得了一个示例持续时间JSON,有人可以提供更清晰的区别addBatch和之间的区别getBatch吗?
TriggerExecution - 处理获取的数据和写入接收器的时间是多少?
"durationMs" : {
"addBatch" : 2263426,
"getBatch" : 12,
"getOffset" : 273,
"queryPlanning" : 13,
"triggerExecution" : 2264288,
"walCommit" : 552
},
Run Code Online (Sandbox Code Playgroud)有一个CSV文件的数据湖,全天更新.我正在尝试使用此博客文章中概述的Trigger.Once功能创建Spark结构化流工作,以定期写入已写入Parquet数据湖中CSV数据湖的新数据.
这就是我所拥有的:
val df = spark
.readStream
.schema(s)
.csv("s3a://csv-data-lake-files")
Run Code Online (Sandbox Code Playgroud)
以下命令将所有数据写入Parquet湖,但在写完所有数据后没有停止(我必须手动取消作业).
processedDf
.writeStream
.trigger(Trigger.Once)
.format("parquet")
.option("checkpointLocation", "s3-path-to-checkpoint")
.start("s3-path-to-parquet-lake")
Run Code Online (Sandbox Code Playgroud)
以下工作也有效,但在写完所有数据后都没有停止(我不得不手动取消工作):
val query = processedDf
.writeStream
.trigger(Trigger.Once)
.format("parquet")
.option("checkpointLocation", "s3-path-to-checkpoint")
.start("s3-path-to-parquet-lake")
query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
以下命令在写入任何数据之前停止查询.
val query = processedDf
.writeStream
.trigger(Trigger.Once)
.format("parquet")
.option("checkpointLocation", "s3-path-to-checkpoint")
.start("s3-path-to-parquet-lake")
query.stop()
Run Code Online (Sandbox Code Playgroud)
如何配置writeStream查询以等待所有增量数据写入Parquet文件然后停止?
我读过Spark Structured Streaming不支持将Kafka消息作为JSON读取的模式推断.有没有办法像Spark Streaming那样检索模式:
val dataFrame = spark.read.json(rdd.map(_.value()))
dataFrame.printschema
Run Code Online (Sandbox Code Playgroud) 我有一个简单的结构化流作业,它监视 CSV 文件的目录并写入镶木地板文件 - 两者之间没有转换。
这项工作首先通过使用 读取 CSV 文件来构建数据框readStream(),并使用我通过调用名为buildSchema(). 这是代码:
var df = spark
.readStream
.option("sep", "|")
.option("header","true")
.schema(buildSchema(spark, table_name).get) // buildSchema() gets schema for me
.csv(input_base_dir + table_name + "*")
logger.info(" new batch indicator")
if (df.schema != buildSchema(spark, table_name).get) {
df = spark.sqlContext.createDataFrame(df.collectAsList(), buildSchema(spark, table_name).get)
}
val query =
df.writeStream
.format("parquet")
.queryName("convertCSVtoPqrquet for table " + table_name)
.option("path", output_base_dir + table_name + "/")
.trigger(ProcessingTime(60.seconds))
.start()
Run Code Online (Sandbox Code Playgroud)
工作运行良好,但我的问题是,我想始终使用最新的架构来构建我的数据框,或者换句话说,从 CSV 文件中读取。虽然buildSchema()可以获得最新的架构,但我不确定如何定期调用它(或每个 CSV 文件一次),然后使用最新的架构以某种方式重新生成或修改数据框。
测试时,我观察到只有query对象是一批接一批地连续运行;我放置的日志语句和 …
schema apache-spark apache-spark-sql spark-structured-streaming
我很惊讶地看到 Spark 只使用一个 Kafka 消费者来消费来自 Kafka 的数据,并且这个消费者在驱动程序容器中运行。我更希望看到 Spark 创建与主题中分区数量一样多的消费者,并在执行器容器中运行这些消费者。
例如,我有一个包含 5 个分区的主题事件。我启动了我的 Spark Structured Streaming 应用程序,该应用程序使用该主题并写入 HDFS 上的 Parquet。该应用程序有 5 个执行程序。在检查 Spark 创建的 Kafka 消费者组时,我看到只有一个消费者负责所有 5 个分区。这个消费者在带有驱动程序的机器上运行:
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group spark-kafka-source-08e10acf-7234-425c-a78b-3552694f22ef--1589131535-driver-0
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
events 2 - 0 - consumer-1-8c3d806d-eb1e-4536-97d5-7c9d19582942 /192.168.100.147 consumer-1
events 1 - 0 - consumer-1-8c3d806d-eb1e-4536-97d5-7c9d19582942 /192.168.100.147 consumer-1
events 0 - 0 - consumer-1-8c3d806d-eb1e-4536-97d5-7c9d19582942 /192.168.100.147 consumer-1
events 4 - 0 - consumer-1-8c3d806d-eb1e-4536-97d5-7c9d19582942 /192.168.100.147 consumer-1
events 3 - 0 - …Run Code Online (Sandbox Code Playgroud) 环境:Spark 2.3.0、Scala 2.11.12、Kafka(无论最新版本是什么)
\n\n我有一个安全的 Kafka 系统,我正在尝试将 Spark Streaming Consumer 连接到该系统。以下是我的build.sbt文件:
name := "kafka-streaming"\nversion := "1.0"\n\nscalaVersion := "2.11.12"\n\n// still want to be able to run in sbt\n// https://github.com/sbt/sbt-assembly#-provided-configuration\nrun in Compile <<= Defaults.runTask(fullClasspath in Compile, mainClass in (Compile, run), runner in (Compile, run))\n\nfork in run := true\njavaOptions in run ++= Seq(\n "-Dlog4j.debug=true",\n "-Dlog4j.configuration=log4j.properties")\n\nassemblyMergeStrategy in assembly := {\n case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat\n case PathList("META-INF", _*) => MergeStrategy.discard\n case _ => MergeStrategy.first\n}\n\nlibraryDependencies ++= Seq(\n "org.apache.spark" %% "spark-core" % "2.3.0",\n "org.apache.spark" …Run Code Online (Sandbox Code Playgroud) apache-kafka apache-spark spark-structured-streaming spark-streaming-kafka
使用表流,我尝试使用 foreachBatch 写入流
df.writestream
.format("delta")
.foreachBatch(WriteStreamToDelta)
...
Run Code Online (Sandbox Code Playgroud)
WriteStreamToDelta 看起来像
def WriteStreamToDelta(microDF, batch_id):
microDFWrangled = microDF."some_transformations"
print(microDFWrangled.count()) <-- How do I achieve the equivalence of this?
microDFWrangled.writeStream...
Run Code Online (Sandbox Code Playgroud)
我想查看其中的行数
我正在使用 Spark 2.1 并尝试优雅地停止流式查询。
是StreamingQuery.stop()一个优雅的停止,因为我在文档中没有看到有关此方法的任何详细信息:
void stop()如果该查询正在运行,则停止执行该查询。此方法会阻塞,直到执行执行的线程停止。自:2.0.0
而在过去的流媒体世界 (DStreams) 中,有一个选项可以停止流的执行,并可以选择确保所有接收到的数据都已被处理:
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit停止流的执行,可选择确保所有接收到的数据都已处理。
stopSparkContext如果为 true,则停止关联的 SparkContext。无论此 StreamingContext 是否已启动,底层 SparkContext 都将停止。
stopGracefully如果为 true,则通过等待所有接收到的数据的处理完成来优雅地停止
所以问题是如何优雅地停止结构化流查询?
spark-structured-streaming ×10
apache-spark ×9
apache-kafka ×4
scala ×3
schema ×2
databricks ×1