Kinesis firehose将文件的持久性(在本例中为时间序列JSON)管理到由YYYY/MM/DD/HH划分的文件夹层次结构(直到24编号中的小时)......很棒.
如何使用Spark 2.0然后我可以读取这些嵌套的子文件夹并从所有叶子json文件创建一个静态Dataframe?数据框阅读器是否有"选项"?
我的下一个目标是将其作为流式DF,其中Firehose持久保存到s3中的新文件自然会成为使用Spark 2.0中新结构化流媒体的流数据帧的一部分.我知道这都是实验性的 - 希望有人之前使用S3作为流媒体文件源,其中数据被分成如上所述的文件夹.当然更喜欢直接使用Kinesis流,但是这个连接器上没有2.0的日期,所以Firehose-> S3是临时的.
ND:我正在使用databricks,它将S3安装到DBFS中,但当然可以很容易地成为EMR或其他Spark提供商.很高兴看到一个笔记本电脑,如果一个人可以分享给出一个例子.
干杯!
apache-spark apache-spark-sql databricks spark-structured-streaming
我正在编写一个测试应用程序,它消耗来自Kafka的topcis的消息,然后将数据推送到S3并进入RDBMS表(流程类似于此处所示:https://databricks.com/blog/2017/04/26/processing-data -in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html).所以我从Kafka读取数据然后:
所以我有点像:
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2,topic3")
.option("startingOffsets", "earliest")
.load()
.select(from_json(col("value").cast("string"), schema, jsonOptions).alias("parsed_value"))
Run Code Online (Sandbox Code Playgroud)
(请注意我正在阅读多个Kafka主题).接下来我定义所需的数据集:
Dataset<Row> allMessages = df.select(.....)
Dataset<Row> messagesOfType1 = df.select() //some unique conditions applied on JSON elements
Dataset<Row> messagesOfType2 = df.select() //some other unique conditions
Run Code Online (Sandbox Code Playgroud)
现在为每个数据集我创建查询以开始处理:
StreamingQuery s3Query = allMessages
.writeStream()
.format("parquet")
.option("startingOffsets", "latest")
.option("path", "s3_location")
.start()
StreamingQuery firstQuery = messagesOfType1
.writeStream()
.foreach(new CustomForEachWiriterType1()) // class that extends ForeachWriter[T] and save data into external RDBMS table …Run Code Online (Sandbox Code Playgroud) 所以我有一个Python Stream-sourced DataFrame df,它包含我想要放入带有spark-cassandra-connector的Cassandra表的所有数据.我试过两种方式:
df.write \
.format("org.apache.spark.sql.cassandra") \
.mode('append') \
.options(table="myTable",keyspace="myKeySpace") \
.save()
query = df.writeStream \
.format("org.apache.spark.sql.cassandra") \
.outputMode('append') \
.options(table="myTable",keyspace="myKeySpace") \
.start()
query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
但是我继续分别得到这个错误:
pyspark.sql.utils.AnalysisException: "'write' can not be called on streaming Dataset/DataFrame;
Run Code Online (Sandbox Code Playgroud)
和
java.lang.UnsupportedOperationException: Data source org.apache.spark.sql.cassandra does not support streamed writing.
Run Code Online (Sandbox Code Playgroud)
无论如何我可以将我的Streaming DataFrame发送到我的Cassandra表中吗?
apache-spark pyspark spark-cassandra-connector spark-structured-streaming
为什么以下结构化查询运行多个SQL查询,如Web UI的SQL选项卡中所示?
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
val rates = spark.
readStream.
format("rate").
option("numPartitions", 1).
load.
writeStream.
format("console").
option("truncate", false).
option("numRows", 10).
trigger(Trigger.ProcessingTime(10.seconds)).
queryName("rate-console").
start
Run Code Online (Sandbox Code Playgroud)
使用Apache Spark 2.2:Structured Streaming,我正在创建一个程序,它从Kafka读取数据并将其写入Hive.我正在寻找写入Kafka主题@100记录/秒的批量数据.
蜂巢表创建:
CREATE TABLE demo_user( timeaa BIGINT, numberbb INT, decimalcc DOUBLE, stringdd STRING, booleanee BOOLEAN ) STORED AS ORC ;
Run Code Online (Sandbox Code Playgroud)
通过手动Hive查询插入:
INSERT INTO TABLE demo_user (1514133139123, 14, 26.4, 'pravin', true);
Run Code Online (Sandbox Code Playgroud)
通过spark结构化流媒体代码插入:
SparkConf conf = new SparkConf();
conf.setAppName("testing");
conf.setMaster("local[2]");
conf.set("hive.metastore.uris", "thrift://localhost:9083");
SparkSession session =
SparkSession.builder().config(conf).enableHiveSupport().getOrCreate();
// workaround START: code to insert static data into hive
String insertQuery = "INSERT INTO TABLE demo_user (1514133139123, 14, 26.4, 'pravin', true)";
session.sql(insertQuery);
// workaround END:
// Solution START
Dataset<Row> dataset = readFromKafka(sparkSession); …Run Code Online (Sandbox Code Playgroud) hive apache-spark apache-spark-sql spark-structured-streaming
我在Spark 2.2.0中面临运行带有聚合和分区的结构化流的内存问题:
session
.readStream()
.schema(inputSchema)
.option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
.option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
.csv("s3://test-bucket/input")
.as(Encoders.bean(TestRecord.class))
.flatMap(mf, Encoders.bean(TestRecord.class))
.dropDuplicates("testId", "testName")
.withColumn("year", functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), "YYYY"))
.writeStream()
.option("path", "s3://test-bucket/output")
.option("checkpointLocation", "s3://test-bucket/checkpoint")
.trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS))
.partitionBy("year")
.format("parquet")
.outputMode(OutputMode.Append())
.queryName("test-stream")
.start();
Run Code Online (Sandbox Code Playgroud)
在测试期间,我注意到每次新数据到来时最终使用的内存量都会增加,最后执行程序退出代码137:
ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container marked as failed: container_1520214726510_0001_01_000003 on host: ip-10-0-1-153.us-west-2.compute.internal. Exit status: 137. Diagnostics: Container killed on request. Exit code is 137
Container exited with a non-zero exit code 137
Killed by external signal
Run Code Online (Sandbox Code Playgroud)
我已经创建了一个堆转储,并发现它使用的大部分内存org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider都是从StateStore引用的 …
在我的场景中,我有几个数据集时不时出现,我需要在我们的平台中摄取它们。摄取过程涉及几个转换步骤。其中之一是 Spark。到目前为止,我特别使用火花结构化流媒体。基础设施还涉及 kafka,spark 结构化流从中读取数据。
我想知道是否有一种方法可以检测到某个主题在一段时间内没有其他可消费的东西来决定停止工作。那就是我想在消耗该特定数据集所需的时间内运行它,然后停止它。出于特定原因,我们决定不使用 spark 的批处理版本。
因此,是否有任何超时或可用于检测没有更多数据进入并且所有内容都已处理的东西。
谢谢
apache-kafka apache-spark spark-streaming spark-structured-streaming
我们有一个运行在Spark 2.3.3上的Spark Streaming应用程序
基本上,它将打开一个Kafka流:
kafka_stream = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "mykafka:9092") \
.option("subscribe", "mytopic") \
.load()
Run Code Online (Sandbox Code Playgroud)
kafka主题有2个分区。之后,在列上执行一些基本的过滤操作,一些Python UDF和explode(),例如:
stream = apply_operations(kafka_stream)
Run Code Online (Sandbox Code Playgroud)
其中apply_operations将对数据进行所有工作。最后,我们想将流写入接收器,即:
stream.writeStream \
.format("our.java.sink.Class") \
.option("some-option", "value") \
.trigger(processingTime='15 seconds') \
.start()
Run Code Online (Sandbox Code Playgroud)
为了让此流操作永久运行,我们应用:
spark.streams.awaitAnyTermination()
Run Code Online (Sandbox Code Playgroud)
到底。
到目前为止,一切都很好。一切都会持续数天。但是由于网络问题,该工作终止了几天,现在卡夫卡流中有数百万条消息正在等待追赶。
当我们使用spark-submit重新启动流数据作业时,第一批将太大,并且需要很长时间才能完成。我们认为可能有一种方法可以通过一些参数来限制第一批的大小,但是我们没有找到任何有用的方法。
我们尝试了:
spark.streaming.backpressure.enabled = true以及spark.streaming.backpressure.initialRate = 2000和spark.streaming.kafka.maxRatePerPartition = 1000和spark.streaming.receiver.maxrate = 2000
将spark.streaming.backpressure.pid.minrate设置为较低的值也没有效果
设置option(“ maxOffsetsPerTrigger”,10000)也不起作用
现在,在我们重新启动管道之后,迟早整个Spark Job都会再次崩溃。我们不能简单地扩展用于火花作业的内存或内核。
我们是否有任何东西无法控制一个流批量中处理的事件数量?
apache-spark spark-streaming pyspark spark-structured-streaming
我已经设置了 Spark Structured Streaming (Spark 2.3.2) 来读取 Kafka (2.0.0)。如果消息在 Spark 流作业开始之前进入主题,我将无法从主题的开头开始消费。Spark 流的这种预期行为是否会忽略在 Spark Stream 作业初始运行之前生成的 Kafka 消息(即使使用 .option("stratingOffsets","earliest"))?
在开始流式作业之前,创建test主题(单个代理、单个分区)并向该主题生成消息(在我的示例中为 3 条消息)。
使用以下命令启动 spark-shell: spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2.3.1.0.0-78 --repositories http://repo.hortonworks.com/content/repositories/releases/
执行下面的 spark scala 代码。
// Local
val df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9097")
.option("failOnDataLoss","false")
.option("stratingOffsets","earliest")
.option("subscribe", "test")
.load()
// Sink Console
val ds = df.writeStream.format("console").queryName("Write to console")
.trigger(org.apache.spark.sql.streaming.Trigger.ProcessingTime("10 second"))
.start()
Run Code Online (Sandbox Code Playgroud)
我希望流从 offset=1 开始。但是,它从 offset=3 开始读取。可以看到kafka客户端实际上是在重置起始偏移量:2019-06-18 21:22:57 INFO Fetcher:583 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Resetting offset for partition test-0 to …
apache-spark spark-streaming spark-structured-streaming spark-streaming-kafka
我遇到了一个 Spark 结构化流 (SSS) 应用程序的问题,该应用程序由于程序错误而崩溃,并且在周末没有处理。当我重新启动它时,有许多关于要重新处理的主题的消息(需要加入的 3 个主题各有 250'000 条消息)。
重新启动时,应用程序再次崩溃并出现 OutOfMemory 异常。我从文档中了解到,maxOffsetsPerTrigger在这些情况下,读取流上的配置应该会有所帮助。我更改了 PySpark 代码(在 SSS 2.4.3 上运行),所有 3 个主题都具有以下内容
rawstream = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topicName)
.option("maxOffsetsPerTrigger", 10000L)
.option("startingOffsets", "earliest")
.load()
Run Code Online (Sandbox Code Playgroud)
我的期望是现在 SSS 查询将从每个主题加载 ~33'000 个偏移量并将它们加入第一批。然后在第二批中,它将清除第一批中的状态记录,由于水印而到期(这将清除第一批中的大部分记录),然后从每个主题中再读取约 33k。因此,在大约 8 个批次之后,它应该已经处理了延迟,并具有“合理”的内存量。
但是应用程序仍然因 OOM 而崩溃,当我检查应用程序主 UI 中的 DAG 时,它报告它再次尝试读取所有 250'000 条消息。
还有什么我需要配置的吗?我如何检查这个选项是否真的被使用?(当我检查计划时,不幸的是它被截断了,只是显示(Options: [includeTimestamp=true,subscribe=IN2,inferSchema=true,failOnDataLoss=false,kafka.b...),我不知道如何在点之后显示部分)
apache-spark ×10
spark-structured-streaming ×10
pyspark ×2
apache-kafka ×1
databricks ×1
hive ×1