标签: spark-structured-streaming

Spark结构化流中的多个聚合

我想在Spark Structured Streaming中进行多次聚合.

像这样的东西:

  • 读取输入文件流(来自文件夹)
  • 执行聚合1(带一些转换)
  • 执行聚合2(以及更多转换)

当我在Structured Streaming中运行它时,它给出了一个错误"流数据框架/数据集不支持多个流聚合".

有没有办法在结构化流中进行这样的多重聚合?

apache-spark apache-spark-sql spark-structured-streaming

12
推荐指数
3
解决办法
5009
查看次数

如何使用结构化流媒体从Kafka读取JSON格式的记录?

我正在尝试使用基于DataFrame/Dataset API的Spark-Streaming来加载来自Kafka的数据流的结构化流方法.

我用:

  • 火花2.10
  • 卡夫卡0.10
  • 火花-SQL卡夫卡-0-10

Spark Kafka DataSource定义了底层架构:

|key|value|topic|partition|offset|timestamp|timestampType|
Run Code Online (Sandbox Code Playgroud)

我的数据采用json格式,并存储在列中.我正在寻找一种方法如何从值列中提取底层模式并将接收到的数据帧更新为存储在值中的列?我尝试了下面的方法,但它不起作用:

 val columns = Array("column1", "column2") // column names
 val rawKafkaDF = sparkSession.sqlContext.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers","localhost:9092")
  .option("subscribe",topic)
  .load()
  val columnsToSelect = columns.map( x => new Column("value." + x))
  val kafkaDF = rawKafkaDF.select(columnsToSelect:_*)

  // some analytics using stream dataframe kafkaDF

  val query = kafkaDF.writeStream.format("console").start()
  query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

在这里我得到了Exception,org.apache.spark.sql.AnalysisException: Can't extract value from value#337;因为在创建流时,里面的值是未知的...

你有什么建议吗?

scala apache-kafka apache-spark apache-spark-sql spark-structured-streaming

12
推荐指数
1
解决办法
5339
查看次数

如何在Kafka Direct Stream中使用Spark Structured Streaming?

我遇到了使用Spark的Structured Streaming,它有一个连续消耗S3存储桶并将处理结果写入MySQL数据库的示例.

// Read data continuously from an S3 location
val inputDF = spark.readStream.json("s3://logs")

// Do operations using the standard DataFrame API and write to MySQL
inputDF.groupBy($"action", window($"time", "1 hour")).count()
       .writeStream.format("jdbc")
       .start("jdbc:mysql//...")
Run Code Online (Sandbox Code Playgroud)

如何在Spark Kafka Streaming中使用它

val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)
Run Code Online (Sandbox Code Playgroud)

有没有办法结合这两个例子而不使用stream.foreachRDD(rdd => {})

scala apache-kafka apache-spark spark-structured-streaming

11
推荐指数
1
解决办法
3954
查看次数

为什么在流数据集上使用缓存失败并显示"AnalysisException:必须使用writeStream.start()执行带有流源的查询"?

SparkSession
  .builder
  .master("local[*]")
  .config("spark.sql.warehouse.dir", "C:/tmp/spark")
  .config("spark.sql.streaming.checkpointLocation", "C:/tmp/spark/spark-checkpoint")
  .appName("my-test")
  .getOrCreate
  .readStream
  .schema(schema)
  .json("src/test/data")
  .cache
  .writeStream
  .start
  .awaitTermination
Run Code Online (Sandbox Code Playgroud)

在Spark 2.1.0中执行此示例时出现错误.没有.cache选项,它按预期工作,但.cache我有选择:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[src/test/data]
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:196)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:35)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:33)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:33)
at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:58)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:69)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:67)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:84)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)
at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:102)
at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:65)
at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:89)
at org.apache.spark.sql.Dataset.persist(Dataset.scala:2479)
at org.apache.spark.sql.Dataset.cache(Dataset.scala:2489)
at org.me.App$.main(App.scala:23)
at org.me.App.main(App.scala)
Run Code Online (Sandbox Code Playgroud)

任何的想法?

scala apache-spark apache-spark-sql apache-spark-2.0 spark-structured-streaming

11
推荐指数
1
解决办法
3668
查看次数

如何为Spark结构化流编写JDBC Sink [SparkException:Task not serializable]?

我需要一个JDBC接收器用于我的Spark结构化流数据帧.目前,据我所知,DataFrame的API缺乏writeStreamJDBC实现(在PySpark和Scala(当前Spark版本2.2.0)中都没有).我发现的唯一建议是ForeachWriter根据本文编写自己的Scala类.

所以,我已经修改了一个简单的词从数例如这里通过添加自定义ForeachWriter类,并试图writeStream到PostgreSQL.单词流是从控制台手动生成的(使用NetCat:nc -lk -p 9999)并由Spark从套接字读取.

不幸的是,我得到"任务不可序列化"的错误.

APACHE_SPARK_VERSION = 2.1.0使用Scala版本2.11.8(Java HotSpot(TM)64位服务器VM,Java 1.8.0_112)

我的Scala代码:

//Spark context available as 'sc' (master = local[*], app id = local-1501242382770).
//Spark session available as 'spark'.

import java.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder
  .master("local[*]")
  .appName("StructuredNetworkWordCountToJDBC")
  .config("spark.jars", "/tmp/data/postgresql-42.1.1.jar")
  .getOrCreate()

import spark.implicits._

val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

val words = lines.as[String].flatMap(_.split(" "))

val wordCounts = words.groupBy("value").count()

class JDBCSink(url: …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark spark-structured-streaming

11
推荐指数
1
解决办法
4343
查看次数

结构化流媒体 - Grafana中的度量标准

我正在使用结构化流来从Kafka读取数据并创建各种聚合度量.我使用了Graphite sink metrics.properties.我见过较旧的Spark版本中的应用程序具有与流相关的指标.我没有看到结构化流媒体的流媒体相关指标.我究竟做错了什么?

例如 - 无法找到未处理的批次或运行批次或上次完成的批次总延迟.

我通过设置启用了流量指标:

SparkSession.builder().config("spark.sql.streaming.metricsEnabled",true)
Run Code Online (Sandbox Code Playgroud)

即便如此,我只得到3个指标:

  • driver.spark.streaming.inputrate
  • driver.spark.streaming.latency
  • driver.spark.streaming.processingrate

这些指标在它们之间存在差距.它也会在应用程序启动后很晚才开始显示.如何获得与grafana广泛的流媒体相关指标?

我检查StreamingQueryProgress.我们只能使用此方法以编程方式创建自定义指标.有没有办法可以消耗Spark流已经发送到我提到的接收器的指标?

graphite apache-spark apache-spark-sql spark-structured-streaming

11
推荐指数
1
解决办法
782
查看次数

结构化流不会将DF写入文件接收器,引用/_spark_metadata/9.compact不存在

我正在EMR 5.11.1,Spark 2.2.1中构建一个Kafka摄取模块.我的目的是使用结构化流来消费Kafka主题,进行一些处理,并以镶木地板格式存储到EMRFS/S3.

控制台接收器按预期工作,文件接收器不起作用.

spark-shell:

val event = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", <server list>)
.option("subscribe", <topic>)
.load()

val eventdf = event.select($"value" cast "string" as "json")
.select(from_json($"json", readSchema) as "data")
.select("data.*")

val outputdf = <some processing on eventdf>
Run Code Online (Sandbox Code Playgroud)

这有效:

val console_query = outputdf.writeStream.format("console")
.trigger(Trigger.ProcessingTime(10.seconds))
.outputMode(OutputMode.Append)
.start 
Run Code Online (Sandbox Code Playgroud)

这不是:

val filesink_query = outputdf.writeStream
.partitionBy(<some column>)
.format("parquet")
.option("path", <some path in EMRFS>)
.option("checkpointLocation", "/tmp/ingestcheckpoint")
.trigger(Trigger.ProcessingTime(10.seconds))
.outputMode(OutputMode.Append)
.start //fails
Run Code Online (Sandbox Code Playgroud)

我试过的东西不起作用:

  1. sc.hadoopConfiguration.set("parquet.enable.summary-metadata","false")
  2. 将格式更改为CSV而不是镶木地板
  3. 将输出模式更改为完成(仅支持追加)
  4. 不同的触发间隔
  5. readStream上的.option("failOnDataLoss",false)

一些挖掘源代码的人把我带到了这里:https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog. scala ,它表示缺少.compact文件应触发默认值.

因此尝试:spark.conf.set("spark.sql.streaming.fileSink.log.cleanupDelay",60000)以确保在新批处理创建组合元数据文件之前未删除旧批处理元数据

使这个错误烦人的原因是它并不总是可重现的.在不更改代码中的单个字符的情况下,写入镶木地板有时会起作用,或者不起作用.我已经尝试清理检查点位置,spark/hdfs日志等,以防火花内部的"状态"导致此问题.

这是错误堆栈跟踪:

query: …
Run Code Online (Sandbox Code Playgroud)

amazon-s3 amazon-emr apache-spark spark-structured-streaming

11
推荐指数
1
解决办法
2670
查看次数

从多个 Kafka 主题读取 Spark 结构化流应用程序

我有一个 Spark 结构化流应用程序 (v2.3.2),它需要从许多 Kafka 主题中读取数据,进行一些相对简单的处理(主要是聚合和一些连接)并将结果发布到许多其他 Kafka 主题。因此在同一个应用程序中处理多个流。

我想知道如果我只设置 1 个订阅多个主题的直接 readStream,然后使用选择拆分流,那么从资源的角度(内存、执行程序、线程、Kafka 侦听器等)是否会有所不同,而不是 1每个主题的 readStream。

就像是

df = spark.readStream.format("kafka").option("subscribe", "t1,t2,t3")
...
t1df = df.select(...).where("topic = 't1'")...
t2df = df.select(...).where("topic = 't2'")...
Run Code Online (Sandbox Code Playgroud)

对比

t1df = spark.readStream.format("kafka").option("subscribe", "t1")
t2df = spark.readStream.format("kafka").option("subscribe", "t2")
Run Code Online (Sandbox Code Playgroud)

一个比另一个更“有效”吗?我找不到任何关于这是否有所作为的文档。

谢谢!

apache-kafka apache-spark spark-structured-streaming

11
推荐指数
1
解决办法
3184
查看次数

使用 Kafka SASL/PLAIN 身份验证的 Spark 结构化流

有没有办法将 Spark 结构化流作业连接到受 SASL/PLAIN 身份验证保护的 Kafka 集群?

我在想类似的事情:

val df2 = spark.read.format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_PLAINTEXT")
    .option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=...")
    .option("subscribe", "topic1")
    .load();
Run Code Online (Sandbox Code Playgroud)

看起来,虽然 Spark Structured Streaming 可以识别该kafka.bootstrap.servers选项,但它无法识别其他与 SASL 相关的选项。有不同的方法吗?

apache-kafka apache-spark spark-structured-streaming

11
推荐指数
1
解决办法
9282
查看次数

为什么format("kafka")失败并且"找不到数据源:kafka".(即使是超级罐)?

我使用HDP-2.6.3.0和Spark2包2.​​2.0.

我正在尝试使用Structured Streaming API编写Kafka使用者,但是在将作业提交到群集后我收到以下错误:

Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:553)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:89)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:89)
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:198)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:90)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:90)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150)
at com.example.KafkaConsumer.main(KafkaConsumer.java:21)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:782)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$22$anonfun$apply$14.apply(DataSource.scala:537)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$22$anonfun$apply$14.apply(DataSource.scala:537)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$22.apply(DataSource.scala:537)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$22.apply(DataSource.scala:537)
at scala.util.Try.orElse(Try.scala:84)
at …
Run Code Online (Sandbox Code Playgroud)

uberjar apache-spark apache-spark-sql spark-structured-streaming

10
推荐指数
2
解决办法
8348
查看次数