我正在使用 Spark 2.1 并尝试优雅地停止流式查询。
是StreamingQuery.stop()一个优雅的停止,因为我在文档中没有看到有关此方法的任何详细信息:
void stop()如果该查询正在运行,则停止执行该查询。此方法会阻塞,直到执行执行的线程停止。自:2.0.0
而在过去的流媒体世界 (DStreams) 中,有一个选项可以停止流的执行,并可以选择确保所有接收到的数据都已被处理:
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit停止流的执行,可选择确保所有接收到的数据都已处理。
stopSparkContext如果为 true,则停止关联的 SparkContext。无论此 StreamingContext 是否已启动,底层 SparkContext 都将停止。
stopGracefully如果为 true,则通过等待所有接收到的数据的处理完成来优雅地停止
所以问题是如何优雅地停止结构化流查询?
从Zeppelin运行时,我正在努力让接收console器使用PySpark Structured Streaming.基本上,我没有看到任何结果打印到屏幕或我发现的任何日志文件.
我的问题:有没有人有一个使用PySpark Structured Streaming和一个产生Apache Zeppelin可见输出的接收器的工作示例?理想情况下它也会使用套接字源,因为它很容易测试.
我正在使用:
我的代码基于structured_network_wordcount.py示例.它从PySpark shell(./bin/pyspark --master local[2])运行时起作用; 我看到每批表.
%pyspark
# structured streaming
from pyspark.sql.functions import *
lines = spark\
.readStream\
.format('socket')\
.option('host', 'localhost')\
.option('port', 9999)\
.option('includeTimestamp', 'true')\
.load()
# Split the lines into words, retaining timestamps
# split() splits each line into an array, and explode() turns the array into multiple rows
words = lines.select(
explode(split(lines.value, ' ')).alias('word'),
lines.timestamp
)
# …Run Code Online (Sandbox Code Playgroud) apache-spark pyspark apache-zeppelin spark-structured-streaming
我想使用 Spark Structured Streaming 从安全的 kafka 中读取数据。这意味着我需要强制使用特定的 group.id。但是,正如文档中所述,这是不可能的。尽管如此,在 databricks 文档https://docs.azuredatabricks.net/spark/latest/structured-streaming/kafka.html#using-ssl 中,它说这是可能的。这是否仅指 azure 集群?
另外,通过查看 apache/spark repo https://github.com/apache/spark/blob/master/docs/structured-streaming-kafka-integration.md的 master 分支的文档,我们可以理解这样的功能旨在在以后的 Spark 版本中添加。你知道这样一个稳定版本的任何计划,这将允许设置消费者 group.id 吗?
如果没有,Spark 2.4.0 是否有任何解决方法可以设置特定的消费者 group.id?
apache-kafka apache-spark spark-structured-streaming spark-kafka-integration
我们有一个基于 Hadoop 的解决方案 (CDH 5.15),我们在 HDFS 中的某些目录中获取新文件。在 os 这些目录上,我们有 4-5 个 Impala (2.1) 表。在 HDFS 中写入这些文件的过程是 Spark Structured Streaming (2.3.1)
现在,一旦我们将文件写入 HDFS,我们就会运行一些 DDL 查询:
ALTER TABLE table1 RECOVER PARTITONS 检测添加到表中的新分区(及其 HDFS 目录和文件)。
REFRESH table1 PARTITIONS (partition1=X, partition2=Y),使用每个分区的所有键。
现在,这个 DDL 花费的时间有点长,而且它们在我们的系统中排队,破坏了系统的数据可用性。
所以,我的问题是:有没有办法更有效地进行这种数据整合?
我们考虑过:
使用ALTER TABLE .. RECOVER PARTITONS但根据文档,它只会刷新新分区。
尝试REFRESH .. PARTITON ...一次与多个分区一起使用,但语句语法不允许这样做。
尝试批处理查询,但 Hive JDBC 驱动器不支持批处理查询。
鉴于系统已经很忙,我们是否应该尝试并行执行这些更新?
谢谢!
胜利者
注意:我们知道哪些分区需要刷新的方式是使用 HDFS 事件,就像 Spark Structured Streaming 我们不知道文件何时被写入一样。
注意#2:另外,用 HDFS 编写的文件有时很小,所以如果可以同时合并这些文件就太好了。
我正在尝试使用 Spark 结构化流处理的功能“触发一次”来模拟批量类似的设置。但是,当我运行初始批次时,我遇到了一些麻烦,因为我有很多历史数据,因此我还使用选项 .option ("cloudFiles.includeExistingFiles", "true")来处理现有文件。
因此,我的初始批次变得非常大,因为我无法控制该批次的文件数量。
我还尝试使用选项cloudFiles.maxBytesPerTrigger,但是,当您使用 Trigger 一次时,这会被忽略 --> https://docs.databricks.com/spark/latest/structed-streaming/auto-loader-gen2.html
当我指定maxFilesPerTrigger选项时,它也会被忽略。它只需要所有可用的文件。
我的代码如下所示:
df = (
spark.readStream.format("cloudFiles")
.schema(schemaAsStruct)
.option("cloudFiles.format", sourceFormat)
.option("delimiter", delimiter)
.option("header", sourceFirstRowIsHeader)
.option("cloudFiles.useNotifications", "true")
.option("cloudFiles.includeExistingFiles", "true")
.option("badRecordsPath", badRecordsPath)
.option("maxFilesPerTrigger", 1)
.option("cloudFiles.resourceGroup", omitted)
.option("cloudFiles.region", omitted)
.option("cloudFiles.connectionString", omitted)
.option("cloudFiles.subscriptionId", omitted)
.option("cloudFiles.tenantId", omitted)
.option("cloudFiles.clientId", omitted)
.option("cloudFiles.clientSecret", omitted)
.load(sourceBasePath)
)
# Traceability columns
df = (
df.withColumn(sourceFilenameColumnName, input_file_name())
.withColumn(processedTimestampColumnName, lit(processedTimestamp))
.withColumn(batchIdColumnName, lit(batchId))
)
def process_batch(batchDF, id):
batchDF.persist()
(batchDF
.write
.format(destinationFormat)
.mode("append")
.save(destinationBasePath + processedTimestampColumnName + "=" …Run Code Online (Sandbox Code Playgroud) python apache-spark pyspark databricks spark-structured-streaming
我正在编写一个测试应用程序,它消耗来自Kafka的topcis的消息,然后将数据推送到S3并进入RDBMS表(流程类似于此处所示:https://databricks.com/blog/2017/04/26/processing-data -in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html).所以我从Kafka读取数据然后:
所以我有点像:
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2,topic3")
.option("startingOffsets", "earliest")
.load()
.select(from_json(col("value").cast("string"), schema, jsonOptions).alias("parsed_value"))
Run Code Online (Sandbox Code Playgroud)
(请注意我正在阅读多个Kafka主题).接下来我定义所需的数据集:
Dataset<Row> allMessages = df.select(.....)
Dataset<Row> messagesOfType1 = df.select() //some unique conditions applied on JSON elements
Dataset<Row> messagesOfType2 = df.select() //some other unique conditions
Run Code Online (Sandbox Code Playgroud)
现在为每个数据集我创建查询以开始处理:
StreamingQuery s3Query = allMessages
.writeStream()
.format("parquet")
.option("startingOffsets", "latest")
.option("path", "s3_location")
.start()
StreamingQuery firstQuery = messagesOfType1
.writeStream()
.foreach(new CustomForEachWiriterType1()) // class that extends ForeachWriter[T] and save data into external RDBMS table …Run Code Online (Sandbox Code Playgroud) 尽管我正在使用withWatermark(),但是当我运行我的spark工作时,我收到以下错误消息:
线程"main"中的异常org.apache.spark.sql.AnalysisException:当没有水印的流式DataFrames/DataSets上有流式聚合时,不支持追加输出模式;;
从我在编程指南中看到的内容,这与预期用法(和示例代码)完全匹配.有谁知道什么可能是错的?
提前致谢!
相关代码(Java 8,Spark 2.2.0):
StructType logSchema = new StructType()
.add("timestamp", TimestampType)
.add("key", IntegerType)
.add("val", IntegerType);
Dataset<Row> kafka = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topics)
.load();
Dataset<Row> parsed = kafka
.select(from_json(col("value").cast("string"), logSchema).alias("parsed_value"))
.select("parsed_value.*");
Dataset<Row> tenSecondCounts = parsed
.withWatermark("timestamp", "10 minutes")
.groupBy(
parsed.col("key"),
window(parsed.col("timestamp"), "1 day"))
.count();
StreamingQuery query = tenSecondCounts
.writeStream()
.trigger(Trigger.ProcessingTime("10 seconds"))
.outputMode("append")
.format("console")
.option("truncate", false)
.start();
Run Code Online (Sandbox Code Playgroud) 我们已经讨论过以下问题:
但是Spark Structured Streaming在Spark2.2中添加了它,它为流媒体带来了很多变化,而且非常出色.
我们可以说Spark Strutured Streaming是流处理,还是批量处理?
现在Apache Fink和之间的最大区别是Apache Spark Structured Streaming什么?
我正在创建一个Spark Structured流应用程序,它将计算每10秒从Kafka收到的数据.
为了能够进行一些计算,我需要在Cassandra数据库中查找有关传感器和放置的一些信息
我有点陷入困境,围绕如何保持整个集群中的Cassandra数据可用,并且不时地以某种方式更新数据,以防我们对数据库表进行了一些更改.
目前,我在使用Datastax Spark-Cassandra-connector本地启动Spark后立即查询数据库
val cassandraSensorDf = spark
.read
.cassandraFormat("specifications", "sensors")
.load
Run Code Online (Sandbox Code Playgroud)
从这里开始,我可以cassandraSensorDs通过加入我的结构化流数据集来使用它.
.join(
cassandraSensorDs ,
sensorStateDf("plantKey") <=> cassandraSensorDf ("cassandraPlantKey")
)
Run Code Online (Sandbox Code Playgroud)
如何在运行结构化流式传输时执行其他查询来更新此Cassandra数据?如何在群集设置中提供查询的数据?
我一直在浏览关于结构化流的spark 2.3.1的文档,但是找不到有关状态存储在状态存储内部如何工作的详细信息。更具体地说,我想知道的是:(1)状态存储区是分布式的吗?(2)如果是,那么每个工人或每个核心如何?
似乎在旧版本的spark中是每个工人,但现在还不知道。我知道它得到了HDFS的支持,但是没有任何东西可以解释内存中存储的实际工作方式。
确实是分布式内存存储吗?我对重复数据删除特别感兴趣,如果数据来自一个大型数据集,那么这需要进行计划,因为所有“不同”数据集最终都将保存在内存中,直到该数据集处理结束。因此,需要根据状态存储的工作方式来计划工作者或主服务器的大小。
没有人有一些信息,指针或建议如何处理吗?
谢谢,Maatari
spark-structured-streaming ×10
apache-spark ×9
pyspark ×2
apache-flink ×1
apache-kafka ×1
cassandra ×1
cloudera-cdh ×1
databricks ×1
hadoop ×1
impala ×1
java ×1
python ×1
scala ×1