标签: spark-structured-streaming

优雅地停止结构化流查询

我正在使用 Spark 2.1 并尝试优雅地停止流式查询。

StreamingQuery.stop()一个优雅的停止,因为我在文档中没有看到有关此方法的任何详细信息:

void stop() 如果该查询正在运行,则停止执行该查询。此方法会阻塞,直到执行执行的线程停止。自:2.0.0

而在过去的流媒体世界 (DStreams) 中,有一个选项可以停止流的执行,并可以选择确保所有接收到的数据都已被处理:

def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit 停止流的执行,可选择确保所有接收到的数据都已处理。

stopSparkContext 如果为 true,则停止关联的 SparkContext。无论此 StreamingContext 是否已启动,底层 SparkContext 都将停止。

stopGracefully 如果为 true,则通过等待所有接收到的数据的处理完成来优雅地停止

所以问题是如何优雅地停止结构化流查询?

apache-spark spark-structured-streaming

9
推荐指数
4
解决办法
5438
查看次数

如何在Zeppelin中获得控制台流水槽的输出?

从Zeppelin运行时,我正在努力让接收console器使用PySpark Structured Streaming.基本上,我没有看到任何结果打印到屏幕或我发现的任何日志文件.

我的问题:有没有人有一个使用PySpark Structured Streaming和一个产生Apache Zeppelin可见输出的接收器的工作示例?理想情况下它也会使用套接字源,因为它很容易测试.

我正在使用:

  • Ubuntu 16.04
  • 火花2.2.0彬hadoop2.7
  • 齐柏林0.7.3彬所有
  • Python3

我的代码基于structured_network_wordcount.py示例.它从PySpark shell(./bin/pyspark --master local[2])运行时起作用; 我看到每批表.

%pyspark
# structured streaming
from pyspark.sql.functions import *
lines = spark\
    .readStream\
    .format('socket')\
    .option('host', 'localhost')\
    .option('port', 9999)\
    .option('includeTimestamp', 'true')\
    .load()

# Split the lines into words, retaining timestamps
# split() splits each line into an array, and explode() turns the array into multiple rows
words = lines.select(
    explode(split(lines.value, ' ')).alias('word'),
    lines.timestamp
)

# …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark apache-zeppelin spark-structured-streaming

9
推荐指数
1
解决办法
5955
查看次数

如何在Structured Streaming的kafka数据源中为消费者组设置group.id?

我想使用 Spark Structured Streaming 从安全的 kafka 中读取数据。这意味着我需要强制使用特定的 group.id。但是,正如文档中所述,这是不可能的。尽管如此,在 databricks 文档https://docs.azuredatabricks.net/spark/latest/structured-streaming/kafka.html#using-ssl 中,它说这是可能的。这是否仅指 azure 集群?

另外,通过查看 apache/spark repo https://github.com/apache/spark/blob/master/docs/structured-streaming-kafka-integration.md的 master 分支的文档,我们可以理解这样的功能旨在在以后的 Spark 版本中添加。你知道这样一个稳定版本的任何计划,这将允许设置消费者 group.id 吗?

如果没有,Spark 2.4.0 是否有任何解决方法可以设置特定的消费者 group.id?

apache-kafka apache-spark spark-structured-streaming spark-kafka-integration

9
推荐指数
2
解决办法
5504
查看次数

如何有效地更新文件经常被修改的 Impala 表

我们有一个基于 Hadoop 的解决方案 (CDH 5.15),我们在 HDFS 中的某些目录中获取新文件。在 os 这些目录上,我们有 4-5 个 Impala (2.1) 表。在 HDFS 中写入这些文件的过程是 Spark Structured Streaming (2.3.1)

现在,一旦我们将文件写入 HDFS,我们就会运行一些 DDL 查询:

  • ALTER TABLE table1 RECOVER PARTITONS 检测添加到表中的新分区(及其 HDFS 目录和文件)。

  • REFRESH table1 PARTITIONS (partition1=X, partition2=Y),使用每个分区的所有键。

现在,这个 DDL 花费的时间有点长,而且它们在我们的系统中排队,破坏了系统的数据可用性。

所以,我的问题是:有没有办法更有效地进行这种数据整合?

我们考虑过:

  • 使用ALTER TABLE .. RECOVER PARTITONS但根据文档,它只会刷新新分区。

  • 尝试REFRESH .. PARTITON ...一次与多个分区一起使用,但语句语法不允许这样做。

  • 尝试批处理查询,但 Hive JDBC 驱动器不支持批处理查询。

  • 鉴于系统已经很忙,我们是否应该尝试并行执行这些更新?

  • 你知道的其他方式吗?

谢谢!

胜利者

注意:我们知道哪些分区需要刷新的方式是使用 HDFS 事件,就像 Spark Structured Streaming 我们不知道文件何时被写入一样。

注意#2:另外,用 HDFS 编写的文件有时很小,所以如果可以同时合并这些文件就太好了。

hadoop impala cloudera-cdh spark-structured-streaming

9
推荐指数
1
解决办法
676
查看次数

如何使用“触发一次”触发器控制 Spark 结构化流中每个触发器处理的文件量?

我正在尝试使用 Spark 结构化流处理的功能“触发一次”来模拟批量类似的设置。但是,当我运行初始批次时,我遇到了一些麻烦,因为我有很多历史数据,因此我还使用选项 .option ("cloudFiles.includeExistingFiles", "true")来处理现有文件。

因此,我的初始批次变得非常大,因为我无法控制该批次的文件数量。

我还尝试使用选项cloudFiles.maxBytesPerTrigger,但是,当您使用 Trigger 一次时,这会被忽略 --> https://docs.databricks.com/spark/latest/structed-streaming/auto-loader-gen2.html

当我指定maxFilesPerTrigger选项时,它也会被忽略。它只需要所有可用的文件。

我的代码如下所示:

df = (
  spark.readStream.format("cloudFiles")
    .schema(schemaAsStruct)
    .option("cloudFiles.format", sourceFormat)
    .option("delimiter", delimiter)
    .option("header", sourceFirstRowIsHeader)
    .option("cloudFiles.useNotifications", "true")
    .option("cloudFiles.includeExistingFiles", "true")
    .option("badRecordsPath", badRecordsPath)
    .option("maxFilesPerTrigger", 1)
    .option("cloudFiles.resourceGroup", omitted)
    .option("cloudFiles.region", omitted)
    .option("cloudFiles.connectionString", omitted)
    .option("cloudFiles.subscriptionId", omitted)
    .option("cloudFiles.tenantId", omitted)
    .option("cloudFiles.clientId", omitted)
    .option("cloudFiles.clientSecret", omitted)
    .load(sourceBasePath)
)

# Traceability columns
df = (
  df.withColumn(sourceFilenameColumnName, input_file_name()) 
    .withColumn(processedTimestampColumnName, lit(processedTimestamp))
    .withColumn(batchIdColumnName, lit(batchId))
)

def process_batch(batchDF, id):
  batchDF.persist()
  
  (batchDF
     .write
     .format(destinationFormat)
     .mode("append")
     .save(destinationBasePath + processedTimestampColumnName + "=" …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark databricks spark-structured-streaming

9
推荐指数
1
解决办法
6916
查看次数

结构化流如何执行单独的流式查询(并行或顺序)?

我正在编写一个测试应用程序,它消耗来自Kafka的topcis的消息,然后将数据推送到S3并进入RDBMS表(流程类似于此处所示:https://databricks.com/blog/2017/04/26/processing-data -in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html).所以我从Kafka读取数据然后:

  • 每条消息都想保存到S3中
  • 一些消息保存到外部数据库中的表A(基于过滤条件)
  • 一些其他消息保存到外部数据库中的表B(其他过滤条件)

所以我有点像:

Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2,topic3")
.option("startingOffsets", "earliest")
.load()
.select(from_json(col("value").cast("string"), schema, jsonOptions).alias("parsed_value"))
Run Code Online (Sandbox Code Playgroud)

(请注意我正在阅读多个Kafka主题).接下来我定义所需的数据集:

Dataset<Row> allMessages = df.select(.....)
Dataset<Row> messagesOfType1 = df.select() //some unique conditions applied on JSON elements
Dataset<Row> messagesOfType2 = df.select() //some other unique conditions
Run Code Online (Sandbox Code Playgroud)

现在为每个数据集我创建查询以开始处理:

StreamingQuery s3Query = allMessages
.writeStream()
.format("parquet")
.option("startingOffsets", "latest")
.option("path", "s3_location")
.start()

StreamingQuery firstQuery = messagesOfType1
.writeStream()
.foreach(new CustomForEachWiriterType1()) // class that extends ForeachWriter[T] and save data into external RDBMS table …
Run Code Online (Sandbox Code Playgroud)

apache-spark spark-structured-streaming

8
推荐指数
1
解决办法
1149
查看次数

使用带水印的附加输出模式时的结构化流异常

尽管我正在使用withWatermark(),但是当我运行我的spark工作时,我收到以下错误消息:

线程"main"中的异常org.apache.spark.sql.AnalysisException:当没有水印的流式DataFrames/DataSets上有流式聚合时,不支持追加输出模式;;

从我在编程指南中看到的内容,这与预期用法(和示例代码)完全匹配.有谁知道什么可能是错的?

提前致谢!

相关代码(Java 8,Spark 2.2.0):

StructType logSchema = new StructType()
        .add("timestamp", TimestampType)
        .add("key", IntegerType)
        .add("val", IntegerType);

Dataset<Row> kafka = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", brokers)
        .option("subscribe", topics)
        .load();

Dataset<Row> parsed = kafka
        .select(from_json(col("value").cast("string"), logSchema).alias("parsed_value"))
        .select("parsed_value.*");

Dataset<Row> tenSecondCounts = parsed
        .withWatermark("timestamp", "10 minutes")
        .groupBy(
            parsed.col("key"),
            window(parsed.col("timestamp"), "1 day"))
        .count();

StreamingQuery query = tenSecondCounts
        .writeStream()
        .trigger(Trigger.ProcessingTime("10 seconds"))
        .outputMode("append")
        .format("console")
        .option("truncate", false)
        .start();
Run Code Online (Sandbox Code Playgroud)

java apache-spark spark-structured-streaming

8
推荐指数
1
解决办法
5557
查看次数

Apache Spark结构流与Apache Flink:有什么区别?

我们已经讨论过以下问题:

但是Spark Structured Streaming在Spark2.2中添加了它,它为流媒体带来了很多变化,而且非常出色.

我们可以说Spark Strutured Streaming是流处理,还是批量处理?

现在Apache Fink和之间的最大区别是Apache Spark Structured Streaming什么?

apache-spark apache-flink spark-structured-streaming

8
推荐指数
1
解决办法
3393
查看次数

将Cassandra查询数据组合/更新到从Kafka收到的结构化流媒体

我正在创建一个Spark Structured流应用程序,它将计算每10秒从Kafka收到的数据.

为了能够进行一些计算,我需要在Cassandra数据库中查找有关传感器和放置的一些信息

我有点陷入困境,围绕如何保持整个集群中的Cassandra数据可用,并且不时地以某种方式更新数据,以防我们对数据库表进行了一些更改.

目前,我在使用Datastax Spark-Cassandra-connector本地启动Spark后立即查询数据库

val cassandraSensorDf = spark
  .read
  .cassandraFormat("specifications", "sensors")
  .load
Run Code Online (Sandbox Code Playgroud)

从这里开始,我可以cassandraSensorDs通过加入我的结构化流数据集来使用它.

.join(
   cassandraSensorDs ,
   sensorStateDf("plantKey") <=> cassandraSensorDf ("cassandraPlantKey")
)
Run Code Online (Sandbox Code Playgroud)

如何在运行结构化流式传输时执行其他查询来更新此Cassandra数据?如何在群集设置中提供查询的数据?

scala cassandra apache-spark spark-structured-streaming

8
推荐指数
1
解决办法
455
查看次数

Spark 2.3.1结构化流状态存储内部工作

我一直在浏览关于结构化流的spark 2.3.1的文档,但是找不到有关状态存储在状态存储内部如何工作的详细信息。更具体地说,我想知道的是:(1)状态存储区是分布式的吗?(2)如果是,那么每个工人或每个核心如何?

似乎在旧版本的spark中是每个工人,但现在还不知道。我知道它得到了HDFS的支持,但是没有任何东西可以解释内存中存储的实际工作方式。

确实是分布式内存存储吗?我对重复数据删除特别感兴趣,如果数据来自一个大型数据集,那么这需要进行计划,因为所有“不同”数据集最终都将保存在内存中,直到该数据集处理结束。因此,需要根据状态存储的工作方式来计划工作者或主服务器的大小。

没有人有一些信息,指针或建议如何处理吗?

谢谢,Maatari

apache-spark spark-structured-streaming

8
推荐指数
1
解决办法
694
查看次数