标签: spark-structured-streaming

查询开始时使用结构化流从 Kafka 主题的开头读取

我使用结构化流来读取kafka主题,使用spark 2.4scala 2.12

我使用检查点来使我的查询具有容错能力。

但是,每次我开始查询时,它都会跳转到当前偏移量,而不会在连接到主题之前读取现有数据。

我缺少 kafka 流的配置吗?

读:

 val df = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "test")
    .option("maxOffsetsPerTrigger","1")
    .option("startingOffset","earliest")
    .option("auto.offset.reset","earliest")
    .load()
val msg = df.select($"value" cast "string", $"topic", $"partition", $"offset")
Run Code Online (Sandbox Code Playgroud)

写:

val query= msg.writeStream
    .foreachBatch(
      (dfbatch: Dataset[Row], batchid: Long) =>
      {
      println(s"IM AT BATCH ID: $batchid")
      dfbatch.show()
      dfbatch.write.csv(s"s3a://abucket/$param")
      }
    )
      .option("checkpointLocation","s3a://checkpoint/")
      .trigger(Trigger.ProcessingTime("10 seconds"))
      .format("console")
      .start()

 query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

编辑:

这是我清空检查点目录后的日志:

0/07/11 18:15:16 INFO CheckpointFileManager: Writing atomically to s3a://checkpoint/metadata using temp file s3a://checkpoint/.metadata.304a751a-68b7-4b8d-858c-3aa5df272db4.tmp
20/07/11 18:15:17 INFO CheckpointFileManager: Renamed temp …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-spark spark-structured-streaming

3
推荐指数
1
解决办法
2864
查看次数

为什么对 Spark Streaming 微批次(使用 kafka 作为源时)有如此多的批评?

由于任何 Kafka Consumer 实际上都是批量消费,为什么与 Kafka Streams(将自己推销为真正的流媒体)相比,Spark Streaming 微批量(当使用 Kafka 作为源时)有如此多的批评?

我的意思是:很多批评都集中在 Spark Streaming 微批处理架构上。通常,人们说 Kafka Streams 是一个真正的“实时”工具,因为它逐一处理事件。

它确实会一一处理事件,但是根据我的理解,它使用(与几乎所有其他库/框架一样)消费者 API。Consumer API 批量轮询主题,以减轻网络负担(间隔可配置)。因此,消费者会做类似的事情:

while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);

        ///// PROCESS A **BATCH** OF RECORDS
        for (ConsumerRecord<String, String> record : records) {

            ///// PROCESS **ONE-BY-ONE**
        }
}
Run Code Online (Sandbox Code Playgroud)

因此,尽管 Spark 的说法是正确的:

  1. 由于其微批量最小间隔将延迟限制为最多 100 毫秒,因此可能具有更高的延迟(请参阅 Spark 结构化流 DOC);
  2. 成组处理记录(作为 RDD 的 DStream 或结构化流中的 DataFrame)。

但:

  1. 可以在 Spark 中逐一处理记录 - 只需循环 RDD/行
  2. Kafka Streams 实际上会轮询一批记录,但会逐一处理它们,因为它在底层实现了 Consumer API。

需要澄清的是,我并不是从“粉丝方面”提出问题(因此,这是一个意见问题),恰恰相反,我真的试图从技术上理解它,以便理解流媒体生态系统中的语义。

欣赏这件事上的每一条信息。

apache-spark spark-structured-streaming

3
推荐指数
1
解决办法
3497
查看次数

防止 Spark 在流/流连接中存储状态

我有两个流数据集,我们将它们称为fastStreamslowStream

fastStream是我通过结构化流 API 从 Kafka 消费的流数据集。我预计每秒可能会收到数千条消息。

slowStream实际上是一个引用(或查找)表,正在由另一个流“更新插入”,并且包含我想要在将fastStream记录保存到表之前加入到每条消息的数据。仅当有人更改元数据时才会slowStream更新,这可能随时发生,但我们预计可能每隔几天更改一次。

中的每条记录都fastStream将具有一条对应的消息slowStream,我本质上希望使该连接立即与表中的任何数据发生slowStream。我不想等到新数据到达时看看是否会发生潜在的匹配slowStream

我遇到的问题是,根据 Spark 文档:

因此,对于这两个输入流,我们将过去的输入缓冲为流状态,以便我们可以将每个未来的输入与过去的输入进行匹配,并相应地生成连接结果。

我尝试添加水印,fastStream但我认为它没有效果,因为文档表明需要在连接中引用带水印的列

理想情况下我会写这样的东西:

# Apply a watermark to the fast stream
fastStream = spark.readStream \
.format("delta") \
.load("dbfs:/mnt/some_file/fastStream") \
.withWatermark("timestamp", "1 hour") \
.alias("fastStream")

# The slowStream cannot be watermarked since it is only slowly changing
slowStream = spark.readStream \
.format("delta") \
.load("dbfs:/mnt/some_file/slowStream") \
.alias("slowStream")

# Prevent …
Run Code Online (Sandbox Code Playgroud)

apache-spark spark-streaming pyspark spark-structured-streaming

3
推荐指数
1
解决办法
1468
查看次数

我可以将流“分支”为多个并在 pyspark 中并行写入它们吗?

我正在 pyspark 中接收 Kafka 流。目前,我将其按一组字段分组并向数据库写入更新:

df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", config["kafka"]["bootstrap.servers"]) \
        .option("subscribe", topic)

...

df = df \
        .groupBy("myfield1") \
        .agg(
            expr("count(*) as cnt"),
            min(struct(col("mycol.myfield").alias("mmm"), col("*"))).alias("minData")
        ) \
        .select("cnt", "minData.*") \
        .select(
            col("...").alias("..."),
            ...
            col("userId").alias("user_id")

query = df \
        .writeStream \
        .outputMode("update") \
        .foreachBatch(lambda df, epoch: write_data_frame(table_name, df, epoch)) \
        .start()

query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

我可以在中间采用相同的链并创建另一个分组,例如

df2 = df \
        .groupBy("myfield2") \
        .agg(
            expr("count(*) as cnt"),
            min(struct(col("mycol.myfield").alias("mmm"), col("*"))).alias("minData")
        ) \
        .select("cnt", "minData.*") \
        .select(
            col("...").alias("..."),
            ...
            col("userId").alias("user_id")
Run Code Online (Sandbox Code Playgroud)

并将其输出并行写入不同的地方? …

apache-kafka pyspark spark-structured-streaming

3
推荐指数
1
解决办法
674
查看次数

PySpark 等待笔记本中完成 (Databricks)

目前,我在一个单元格中使用 Spark 数据帧(自动加载器)时遇到一些问题,可能需要一些时间才能写入数据。然后,在下面的单元格中,代码引用第一个表完成的工作。但是,如果由于 Spark 的分布式特性而运行整个笔记本(特别是作为作业),则第二个单元会在第一个单元完全完成之前运行。如何让第二个单元等待 writeStream 完成,而不将它们放在单独的笔记本中。

例子:

小区1

autoload = pysparkDF.writeStream.format('delta')....table('TABLE1')
Run Code Online (Sandbox Code Playgroud)

细胞2

df = spark.sql('select count(*) from TABLE1')
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark databricks spark-structured-streaming

3
推荐指数
1
解决办法
3939
查看次数

TypeError:'Builder'对象不可调用Spark结构化流

在运行python spark结构流的编程指南[link]中给出的示例
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

我得到以下错误:
TypeError:'Builder'对象不可调用

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession.builder()\
    .appName("StructuredNetworkWordCount")\
    .getOrCreate()

# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark\
   .readStream\
   .format('socket')\
   .option('host', 'localhost')\
   .option('port', 9999)\
   .load()

# Split the lines into words
words = lines.select(
   explode(
       split(lines.value, ' ')
   ).alias('word')
)

# Generate running word count
wordCounts = words.groupBy('word').count()

# Start running the query that prints the running counts to the …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql spark-structured-streaming

2
推荐指数
1
解决办法
2834
查看次数

Spark Structured Stream仅从Kafka的一个分区获取消息

我得到了这样的情况,当spark可以流式传输并从Kafka 2-patition主题的一个分区获取消息.

我的主题: C:\bigdata\kafka_2.11-0.10.1.1\bin\windows>kafka-topics --create --zookeeper localhost:2181 --partitions 2 --replication-factor 1 --topic test4

卡夫卡制片人:

public class KafkaFileProducer {

// kafka producer
Producer<String, String> producer;

public KafkaFileProducer() {

    // configs
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    //props.put("group.id", "testgroup");
    props.put("batch.size", "16384");
    props.put("auto.commit.interval.ms", "1000");
    props.put("linger.ms", "0");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("block.on.buffer.full", "true");

    // instantiate a producer
    producer = new KafkaProducer<String, String>(props);
}

/**
 * @param filePath
 */
public void sendFile(String filePath) {
    FileInputStream fis;
    BufferedReader br = null;

    try { …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-spark spark-structured-streaming

2
推荐指数
1
解决办法
1098
查看次数

尝试从python 3.5中的pyspark.sql.functions导入col时无法解析的引用

请参阅此处的帖子: 使用python进行Spark结构化流式传输 我想在python 3.5中导入'col'

from pyspark.sql.functions import col
Run Code Online (Sandbox Code Playgroud)

但是我收到一个错误,说未解决的对col的引用。我已经安装了pyspark库,所以想知道是否从pyspark库中删除了“ col”吗?然后我如何导入“ col”。

python apache-spark pyspark pyspark-sql spark-structured-streaming

2
推荐指数
2
解决办法
3000
查看次数

Spark 2.3.0无法找到数据源:kafka

我正在尝试使用CSV设置Kafka流,以便可以将其流式传输到Spark。但是,我不断

Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html
Run Code Online (Sandbox Code Playgroud)

我的代码看起来像这样

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.execution.streaming.FileStreamSource.Timestamp
import org.apache.spark.sql.types._

object SpeedTester {
  def main(args: Array[String]): Unit = {

  val spark = SparkSession.builder.master("local[4]").appName("SpeedTester").config("spark.driver.memory", "8g").getOrCreate()
  val rootLogger = Logger.getRootLogger()
  rootLogger.setLevel(Level.ERROR)
  import spark.implicits._
  val mySchema = StructType(Array(
    StructField("incident_id", IntegerType),
    StructField("date", StringType),
    StructField("state", StringType),
    StructField("city_or_county", StringType),
    StructField("n_killed", IntegerType),
    StructField("n_injured", IntegerType)
  ))

  val streamingDataFrame = spark.readStream.schema(mySchema).csv("C:/Users/zoldham/IdeaProjects/flinkpoc/Data/test")
  streamingDataFrame.selectExpr("CAST(incident_id AS STRING) AS key",
  "to_json(struct(*)) AS value").writeStream
    .format("kafka") …
Run Code Online (Sandbox Code Playgroud)

scala apache-kafka apache-spark spark-structured-streaming

2
推荐指数
1
解决办法
7944
查看次数

如何定义Spark结构化的流式文件接收器文件路径或文件名?

我正在使用Spark结构化流处理来自流数据源的数据,并且正在使用文件接收器。数据将在处理后放入hdfs。

我有一个问题,输出文件是这样的part-00012-8d701427-8289-41d7-9b4d-04c5d882664d-c000.txt。这使我无法在上一个小时获取文件输出。

是否可以将输出文件自定义为timestamp_xxx或类似的内容?或者,我可以按批次输出到不同的路径吗?

hdfs apache-spark spark-structured-streaming

2
推荐指数
1
解决办法
1947
查看次数