标签: spark-structured-streaming

Spark结构化流-处理每一行

我正在Spark 2.1.1中使用结构化流。我需要对传入消息(从Kafka来源)应用一些业务逻辑。

本质上,我需要获取消息,获取一些键值,在HBase中查找它们并在数据集上执行更多biz逻辑。最终结果是一个字符串消息,需要将其写出到另一个Kafka队列。

但是,由于传入消息的抽象是一个数据帧(无边界表-结构化流),因此我必须遍历一次触发期间接收到的数据集mapPartitions(由于HBase客户端无法序列化而导致的分区)。

在我的流程中,我需要遍历每一行以执行相同的业务流程。

  1. 有没有更好的方法可以帮助我避免dataFrame.mapPartitions打电话呢?我觉得它的顺序和迭代!
  2. 结构化流基本上迫使我从业务流程中生成输出数据帧,而没有一开始。我可以使用其他哪些设计模式来实现最终目标?

您会推荐一种替代方法吗?

scala apache-kafka apache-spark spark-structured-streaming

1
推荐指数
1
解决办法
2036
查看次数

Pyspark Structured Streaming Kafka 配置错误

我之前已经成功地将 pyspark 用于 Spark Streaming(Spark 2.0.2)和 Kafka(0.10.1.0),但我的目的更适合结构化流。我尝试在线使用示例:https : //spark.apache.org/docs/2.1.0/structured-streaming-kafka-integration.html

使用以下类似代码:

ds1 = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
query = ds1
  .writeStream
  .outputMode('append')
  .format('console')
  .start()
query.awaitTermination() 
Run Code Online (Sandbox Code Playgroud)

但是,我总是以以下错误告终:

: org.apache.kafka.common.config.ConfigException: 
Missing required configuration "partition.assignment.strategy" which has no default value
Run Code Online (Sandbox Code Playgroud)

我还尝试在创建 ds1 时将其添加到我的选项集中:

.option("partition.assignment.strategy", "range")
Run Code Online (Sandbox Code Playgroud)

但即使明确地为其分配一个值也不能阻止错误,我可以在网上或 Kafka 文档中找到的任何其他值(如“roundrobin”)也没有。

我也用“assign”选项尝试了这个并实现了同样的错误(我们的Kafka主机设置为assign——每个消费者只分配一个分区,我们没有任何重新平衡)。

知道这里发生了什么吗?该文档没有帮助(可能是因为它仍处于实验阶段)。另外,是否有使用 KafkaUtils 进行结构化流处理?或者这是唯一的网关?

apache-kafka apache-spark apache-spark-sql pyspark spark-structured-streaming

1
推荐指数
1
解决办法
2664
查看次数

结构化流 - 无法使用FileContext API管理AWS S3上的元数据日志文件

我在Spark中有一个StreamingQuery(v2.2.0),即

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "test")
  .load()

val query = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("parquet")
  .option("checkpointLocation", "s3n://bucket/checkpoint/test")
  .option("path", "s3n://bucket/test")
  .start()
Run Code Online (Sandbox Code Playgroud)

当我运行时,query数据确实在AWS S3上保存并且创建了检查点s3n://bucket/checkpoint/test.但是,我也在日志中收到以下警告:

WARN [oassestreaming.OffsetSeqLog]无法使用FileContext API在路径s3n:// bucket/checpoint/test/offsets管理元数据日志文件.使用FileSystem API代替管理日志文件.失败时日志可能不一致.

我无法理解为什么会出现这种警告.此外,如果发生任何故障,我的检查点会不一致吗?

任何人都可以帮我解决它吗?

scala amazon-s3 apache-spark spark-structured-streaming

1
推荐指数
1
解决办法
738
查看次数

在 Spark Structured Streaming 中找不到“窗口”函数

我正在编写一个小示例,Spark Structured Streaming其中我试图处理netstat命令的输出,但无法弄清楚如何调用该window函数。

这些是我的 build.sbt 的相关行:

scalaVersion := "2.11.4"
scalacOptions += "-target:jvm-1.8"

libraryDependencies ++= {

  val sparkVer = "2.3.0"
  Seq(
    "org.apache.spark" %% "spark-streaming" % sparkVer % "provided",
    "org.apache.spark" %% "spark-streaming-kafka-0-8" % sparkVer % "provided",
    "org.apache.spark" %% "spark-core" % sparkVer % "provided" withSources(),
    "org.apache.spark" %% "spark-hive" % sparkVer % "provided",
  )
}
Run Code Online (Sandbox Code Playgroud)

和代码:

case class NetEntry(val timeStamp: java.sql.Timestamp, val sourceHost: String, val targetHost: String, val status: String)

def convertToNetEntry(x: String): NetEntry = {
    // tcp …
Run Code Online (Sandbox Code Playgroud)

spark-streaming spark-structured-streaming

1
推荐指数
1
解决办法
2295
查看次数

带有 Spark 流的多个 writeStream

我正在使用 Spark Streaming,在尝试实现多个写入流时遇到一些问题。下面是我的代码

DataWriter.writeStreamer(firstTableData,"parquet",CheckPointConf.firstCheckPoint,OutputConf.firstDataOutput)
DataWriter.writeStreamer(secondTableData,"parquet",CheckPointConf.secondCheckPoint,OutputConf.secondDataOutput)
DataWriter.writeStreamer(thirdTableData,"parquet", CheckPointConf.thirdCheckPoint,OutputConf.thirdDataOutput)
Run Code Online (Sandbox Code Playgroud)

其中 writeStreamer 定义如下:

def writeStreamer(input: DataFrame, checkPointFolder: String, output: String) = {

  val query = input
                .writeStream
                .format("orc")
                .option("checkpointLocation", checkPointFolder)
                .option("path", output)
                .outputMode(OutputMode.Append)
                .start()

  query.awaitTermination()
}
Run Code Online (Sandbox Code Playgroud)

我面临的问题是只有第一个表是用 Spark writeStream 写入的,所有其他表都没有发生任何情况。请问您对此有什么想法吗?

apache-spark spark-structured-streaming

1
推荐指数
1
解决办法
6324
查看次数

在火花结构化流中动态更改 hdfs 写入路径

我有一个 Spark 结构化流应用程序,它从 kafka 读取数据并将其写入 hdfs。我想根据当前日期动态更改 hdfs 写入路径,但结构化流媒体似乎不能那样工作。它只是创建一个应用程序启动日期的文件夹,即使日期发生变化,也会继续写入同一文件夹。有什么办法可以根据当前日期动态更改路径?

下面是我的 writestream 的样子

 val inputFormat = new SimpleDateFormat("yyyy-MM-dd")
 val outPath = "maindir/sb_topic/data/loaddate="

val dswWriteStream =dfresult.writeStream
    .outputMode(outputMode) 
    .format(writeformat) 
    .option("path",outPath+inputFormat.format((new java.util.Date()).getTime())) //hdfs file write path
    .option("checkpointLocation", checkpointdir) 
    .option("maxRecordsPerFile", 999999999) 
    .trigger(Trigger.ProcessingTime("10 minutes")) 
Run Code Online (Sandbox Code Playgroud)

apache-spark spark-streaming spark-structured-streaming

1
推荐指数
1
解决办法
954
查看次数

如何将托管在 HDFS 中的配置文件传递给 Spark 应用程序?

我正在使用 Spark Structured Streaming。此外,我正在与Scala. 我想将配置文件传递给我的 spark 应用程序。此配置文件托管在HDFS. 例如;

spark_job.conf (HOCON)

spark {
  appName: "",
  master: "",
  shuffle.size: 4 
  etc..
}

kafkaSource {
  servers: "",
  topic: "",
  etc..
}

redisSink {
  host: "",
  port: 999,
  timeout: 2000,
  checkpointLocation: "hdfs location",
  etc..
}
Run Code Online (Sandbox Code Playgroud)

如何将其传递给 Spark 应用程序?如何hosted HDFS在 Spark 中读取此文件()?

configuration hadoop apache-spark apache-spark-sql spark-structured-streaming

1
推荐指数
1
解决办法
1502
查看次数

当无水印的流式 DataFrame/DataSet 上存在流式聚合时,不支持追加输出模式

我有一个 kafka 流,正在加载到 Spark。来自 Kafka 主题的消息具有以下属性:bl_ibanblacklistedtimestamp。因此,有 IBANS、关于该 IBAN 是否被列入黑名单 (Y/N) 的标志,并且还有该记录的时间戳。问题是一个 IBAN 可以有多个记录,因为超时的 IBAN 可能会被列入黑名单或“删除”。我想要实现的目标是了解每个 IBANS 的当前状态。然而,我从更简单的目标开始,那就是列出每个最新的 IBAN timestamp(之后我也想添加blacklisted状态),所以我生成了以下代码(其中黑名单代表我从 Kafka 加载的数据集):

blackList = blackList.groupBy("bl_iban")
                .agg(col("bl_iban"), max("timestamp"));
Run Code Online (Sandbox Code Playgroud)

之后我尝试使用以下代码将其打印到控制台:

StreamingQuery query = blackList.writeStream()
    .format("console")
    .outputMode(OutputMode.Append())
    .start();
Run Code Online (Sandbox Code Playgroud)

我已经运行我的代码并收到以下错误: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark

所以我将水印添加到我的数据集中,如下所示:

blackList = blackList.withWatermark("timestamp", "2 seconds")
                .groupBy("bl_iban")
                .agg(col("bl_iban"), max("timestamp"));
Run Code Online (Sandbox Code Playgroud)

之后又出现同样的错误。我有什么想法可以解决这个问题吗?


更新:在迈克的帮助下,我成功地摆脱了这个错误。但问题是我仍然无法让我的黑名单发挥作用。我可以看到数据是如何从 Kafka 加载的,但之后从我的组操作中我得到了两个空批次,仅此而已。从Kafka打印的数据:

+-----------------------+-----------+-----------------------+
|bl_iban                |blacklisted|timestamp              |
+-----------------------+-----------+-----------------------+
|SK047047595122709025789|N …
Run Code Online (Sandbox Code Playgroud)

java apache-spark spark-streaming spark-structured-streaming

1
推荐指数
1
解决办法
1681
查看次数

Spark中如何使用foreachPartition?

如何在 Spark Java 中使用以下函数?查遍了互联网但找不到合适的例子。

public void foreachPartition(scala.Function1<scala.collection.Iterator<T>,scala.runtime.BoxedUnit> f)
Run Code Online (Sandbox Code Playgroud)

我唯一知道的是它对流程有好处batch of data,所谓的BoxedUnit

如何获取或batch ID批量处理数据?BoxedUnitdataset

谁能告诉我如何实现这个方法?

java apache-spark spark-structured-streaming

1
推荐指数
1
解决办法
5524
查看次数

结构化流式 Kafka 源偏移存储

我正在使用 Kafka 的 Structured Streaming 源(集成指南),如前所述,它没有提交任何偏移量。

我的目标之一是监控它(检查它是否落后等)。即使它没有提交偏移量,它也会通过不时查询 kafka 并检查下一个要处理的偏移量来处理它们。根据文档,偏移量被写入 HDFS,因此在发生故障时可以恢复,但问题是:

它们存放在哪里?如果不提交偏移量,是否有任何方法可以监视火花流(结构化)的 kafka 消耗(从程序外部;所以 kafka cli 或类似的,每个记录附带的偏移量不适合用例) ?

干杯

offset apache-kafka apache-spark spark-streaming spark-structured-streaming

0
推荐指数
1
解决办法
2527
查看次数

Spark结构化流中ForeachWriter的目的是什么?

有人可以解释一下Spark结构化流上foreach writer的需求吗?

当我们以dataFrame的形式获取所有源数据时,我没有使用foreachwriter。

apache-spark spark-structured-streaming

0
推荐指数
1
解决办法
444
查看次数

Spark Structure Streaming 中的临时视图

在 ForeachBatch 函数结构化 Straming 中,我想创建微批次中接收的数据帧的临时视图

func(tabdf, epoch_id):
    tabaDf.createOrReplaceView("taba")
Run Code Online (Sandbox Code Playgroud)

但我收到以下错误:

org.apache.spark.sql.streaming.StreamingQueryException: Table or view not found: taba
Caused by: org.apache.spark.sql.catalyst.analysis.NoSuchTableException: Table or view 'taba' not found
Run Code Online (Sandbox Code Playgroud)

请任何人帮助我解决这个问题。

spark-streaming apache-spark-sql pyspark spark-structured-streaming

0
推荐指数
1
解决办法
3355
查看次数