标签: spark-structured-streaming

如何获得结构化查询的Kafka偏移量以进行手动和可靠的偏移管理?

Spark 2.2引入了Kafka的结构化流媒体源.据我所知,它依靠HDFS检查点目录来存储偏移并保证"完全一次"的消息传递.

但旧的码头(如https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/)表示Spark Streaming检查点无法跨应用程序恢复或Spark升级,因此不太可靠.作为一种解决方案,有一种做法是支持在支持MySQL或RedshiftDB等事务的外部存储中存储偏移量.

如果我想将Kafka源的偏移存储到事务DB,我如何从结构化流批处理中获得偏移量?

以前,可以通过将RDD转换为HasOffsetRanges:

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges    
Run Code Online (Sandbox Code Playgroud)

但是使用新的Streaming API,我有一个Dataset,InternalRow我找不到一个简单的方法来获取偏移量.Sink API只有addBatch(batchId: Long, data: DataFrame)方法,我怎么能想得到给定批次ID的偏移量?

offset apache-kafka apache-spark apache-spark-sql spark-structured-streaming

22
推荐指数
2
解决办法
6305
查看次数

为什么Spark应用程序失败并出现"ClassNotFoundException:找不到数据源:kafka"作为带有sbt程序集的uber-jar?

我正在尝试运行像https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredKafkaWordCount.scala这样的示例.我从http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html上的Spark Structured Streaming Programming指南开始.

我的代码是

package io.boontadata.spark.job1

import org.apache.spark.sql.SparkSession

object DirectKafkaAggregateEvents {
  val FIELD_MESSAGE_ID = 0
  val FIELD_DEVICE_ID = 1
  val FIELD_TIMESTAMP = 2
  val FIELD_CATEGORY = 3
  val FIELD_MEASURE1 = 4
  val FIELD_MEASURE2 = 5

  def main(args: Array[String]) {
    if (args.length < 3) {
      System.err.println(s"""
        |Usage: DirectKafkaAggregateEvents <brokers> <subscribeType> <topics>
        |  <brokers> is a list of one or more Kafka brokers
        |  <subscribeType> sample value: subscribe
        |  <topics> is a list of one or more …
Run Code Online (Sandbox Code Playgroud)

scala sbt sbt-assembly apache-spark spark-structured-streaming

21
推荐指数
2
解决办法
2万
查看次数

Spark Strutured Streaming自动将时间戳转换为本地时间

我有UTC和ISO8601的时间戳,但使用结构化流,它会自动转换为本地时间.有没有办法阻止这种转换?我想在UTC中使用它.

我正在从Kafka读取json数据,然后使用from_jsonSpark函数解析它们.

输入:

{"Timestamp":"2015-01-01T00:00:06.222Z"}
Run Code Online (Sandbox Code Playgroud)

流:

SparkSession
  .builder()
  .master("local[*]")
  .appName("my-app")
  .getOrCreate()
  .readStream()
  .format("kafka")
  ... //some magic
  .writeStream()
  .format("console")
  .start()
  .awaitTermination();
Run Code Online (Sandbox Code Playgroud)

架构:

StructType schema = DataTypes.createStructType(new StructField[] {
        DataTypes.createStructField("Timestamp", DataTypes.TimestampType, true),});
Run Code Online (Sandbox Code Playgroud)

输出:

+--------------------+
|           Timestamp|
+--------------------+
|2015-01-01 01:00:...|
|2015-01-01 01:00:...|
+--------------------+
Run Code Online (Sandbox Code Playgroud)

如您所见,小时数自动增加.

PS:我试着尝试from_utc_timestampSpark功能,但没有运气.

java scala apache-spark apache-spark-sql spark-structured-streaming

18
推荐指数
2
解决办法
9282
查看次数

将Spark Structured Streaming与Confluent Schema Registry集成

我在Spark Structured Streaming中使用Kafka Source来接收Confluent编码的Avro记录.我打算使用Confluent Schema Registry,但是与spark结构化流媒体的集成似乎是不可能的.

我已经看到了这个问题,但无法使用Confluent Schema Registry.使用Spark 2.0.2读取来自Kafka的Avro消息(结构化流式传输)

avro apache-kafka apache-spark confluent-schema-registry spark-structured-streaming

15
推荐指数
7
解决办法
6322
查看次数

为什么完整输出模式需要聚合?

我在Apache Spark 2.2中使用最新的结构化流,并得到以下异常:

org.apache.spark.sql.AnalysisException:当流数据框架/数据集上没有流聚合时,不支持完整输出模式;;

为什么完整输出模式需要流聚合?如果Spark允许在流式查询中没有聚合的完整输出模式,会发生什么?

scala> spark.version
res0: String = 2.2.0

import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.SQLContext
implicit val sqlContext: SQLContext = spark.sqlContext
val source = MemoryStream[(Int, Int)]
val ids = source.toDS.toDF("time", "id").
  withColumn("time", $"time" cast "timestamp"). // <-- convert time column from Int to Timestamp
  dropDuplicates("id").
  withColumn("time", $"time" cast "long")  // <-- convert time column back from Timestamp to Int

import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
scala> val q = ids.
     |   writeStream.
     |   format("memory").
     |   queryName("dups").
     |   outputMode(OutputMode.Complete).  // <-- memory …
Run Code Online (Sandbox Code Playgroud)

apache-spark spark-structured-streaming

14
推荐指数
2
解决办法
3506
查看次数

Spark结构化流 - 将静态数据集与流数据集连接起来

我正在Spark structured streaming用来处理从中读取的记录Kafka.这是我想要实现的目标:

(a)每条记录都是一种Tuple2类型(Timestamp, DeviceId).

(b)我创建了一个静态Dataset[DeviceId],其中包含DeviceId预期在Kafka流中看到的所有有效设备ID(类型)的集合.

(c)我需要写一个Spark structured streaming查询

 (i) Groups records by their timestamp into 5-minute windows
 (ii) For each window, get the list of valid device IDs that were **not** seen in that window
Run Code Online (Sandbox Code Playgroud)

例如,假设所有有效设备ID的列表都是,[A,B,C,D,E]并且某个5分钟窗口中的kafka记录包含设备ID [A,B,E].然后,对于该窗口,我正在寻找的看不见的设备ID列表是[C,D].

  1. 如何在Spark结构化流中编写此查询?我尝试使用公开的方法except()join()方法Dataset.但是,他们都抛出了一个运行时异常,抱怨说这些操作都不受支持streaming Dataset.

这是我的代码片段:

val validDeviceIds: Dataset[(DeviceId, Long)] = spark.createDataset[DeviceId](listOfAllDeviceIds.map(id => (id, 0L))) …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql apache-spark-dataset spark-structured-streaming

13
推荐指数
2
解决办法
3717
查看次数

如何从Amazon SQS加载流数据?

我使用Spark 2.2.0.

如何使用pyspark将Amazon SQS流提供给spark结构化流?

这个问题试图通过创建自定义接收器来解决非结构化流和scala的问题.
pyspark中有类似的东西吗?

spark.readStream \
   .format("s3-sqs") \
   .option("fileFormat", "json") \
   .option("queueUrl", ...) \
   .schema(...) \
   .load()
Run Code Online (Sandbox Code Playgroud)

根据Databricks上面的接收器可以用于S3-SQS文件源.但是,对于只有SQS,如何才能采用一种方法.

我尝试从AWS-SQS-Receive_Message理解接收消息.但是,如何直接将流发送到火花流还不清楚.

amazon-sqs apache-spark pyspark-sql spark-structured-streaming

13
推荐指数
1
解决办法
2719
查看次数

Spark结构化流中的多个聚合

我想在Spark Structured Streaming中进行多次聚合.

像这样的东西:

  • 读取输入文件流(来自文件夹)
  • 执行聚合1(带一些转换)
  • 执行聚合2(以及更多转换)

当我在Structured Streaming中运行它时,它给出了一个错误"流数据框架/数据集不支持多个流聚合".

有没有办法在结构化流中进行这样的多重聚合?

apache-spark apache-spark-sql spark-structured-streaming

12
推荐指数
3
解决办法
5009
查看次数

如何使用结构化流媒体从Kafka读取JSON格式的记录?

我正在尝试使用基于DataFrame/Dataset API的Spark-Streaming来加载来自Kafka的数据流的结构化流方法.

我用:

  • 火花2.10
  • 卡夫卡0.10
  • 火花-SQL卡夫卡-0-10

Spark Kafka DataSource定义了底层架构:

|key|value|topic|partition|offset|timestamp|timestampType|
Run Code Online (Sandbox Code Playgroud)

我的数据采用json格式,并存储在列中.我正在寻找一种方法如何从值列中提取底层模式并将接收到的数据帧更新为存储在值中的列?我尝试了下面的方法,但它不起作用:

 val columns = Array("column1", "column2") // column names
 val rawKafkaDF = sparkSession.sqlContext.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers","localhost:9092")
  .option("subscribe",topic)
  .load()
  val columnsToSelect = columns.map( x => new Column("value." + x))
  val kafkaDF = rawKafkaDF.select(columnsToSelect:_*)

  // some analytics using stream dataframe kafkaDF

  val query = kafkaDF.writeStream.format("console").start()
  query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

在这里我得到了Exception,org.apache.spark.sql.AnalysisException: Can't extract value from value#337;因为在创建流时,里面的值是未知的...

你有什么建议吗?

scala apache-kafka apache-spark apache-spark-sql spark-structured-streaming

12
推荐指数
1
解决办法
5339
查看次数

附加模式下的水印聚合查询的空输出

我使用Spark 2.2.0-rc1.

我有一个卡夫卡topic这我查询运行水印聚集,有1 minute水印,发出来consoleappend输出模式.

import org.apache.spark.sql.types._
val schema = StructType(StructField("time", TimestampType) :: Nil)
val q = spark.
  readStream.
  format("kafka").
  option("kafka.bootstrap.servers", "localhost:9092").
  option("startingOffsets", "earliest").
  option("subscribe", "topic").
  load.
  select(from_json(col("value").cast("string"), schema).as("value"))
  select("value.*").
  withWatermark("time", "1 minute").
  groupBy("time").
  count.
  writeStream.
  outputMode("append").
  format("console").
  start
Run Code Online (Sandbox Code Playgroud)

我在Kafka推送以下数据topic:

{"time":"2017-06-07 10:01:00.000"}
{"time":"2017-06-07 10:02:00.000"}
{"time":"2017-06-07 10:03:00.000"}
{"time":"2017-06-07 10:04:00.000"}
{"time":"2017-06-07 10:05:00.000"}
Run Code Online (Sandbox Code Playgroud)

我得到以下输出:

scala> -------------------------------------------
Batch: 0
-------------------------------------------
+----+-----+                                                                    
|time|count|
+----+-----+
+----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+----+-----+                                                                    
|time|count|
+----+-----+
+----+-----+

-------------------------------------------
Batch: 2 …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark spark-structured-streaming

12
推荐指数
2
解决办法
1516
查看次数