我使用结构化流来读取kafka主题,使用spark 2.4和scala 2.12
我使用检查点来使我的查询具有容错能力。
但是,每次我开始查询时,它都会跳转到当前偏移量,而不会在连接到主题之前读取现有数据。
我缺少 kafka 流的配置吗?
读:
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("maxOffsetsPerTrigger","1")
.option("startingOffset","earliest")
.option("auto.offset.reset","earliest")
.load()
val msg = df.select($"value" cast "string", $"topic", $"partition", $"offset")
Run Code Online (Sandbox Code Playgroud)
写:
val query= msg.writeStream
.foreachBatch(
(dfbatch: Dataset[Row], batchid: Long) =>
{
println(s"IM AT BATCH ID: $batchid")
dfbatch.show()
dfbatch.write.csv(s"s3a://abucket/$param")
}
)
.option("checkpointLocation","s3a://checkpoint/")
.trigger(Trigger.ProcessingTime("10 seconds"))
.format("console")
.start()
query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
编辑:
这是我清空检查点目录后的日志:
0/07/11 18:15:16 INFO CheckpointFileManager: Writing atomically to s3a://checkpoint/metadata using temp file s3a://checkpoint/.metadata.304a751a-68b7-4b8d-858c-3aa5df272db4.tmp
20/07/11 18:15:17 INFO CheckpointFileManager: Renamed temp …Run Code Online (Sandbox Code Playgroud) 由于任何 Kafka Consumer 实际上都是批量消费,为什么与 Kafka Streams(将自己推销为真正的流媒体)相比,Spark Streaming 微批量(当使用 Kafka 作为源时)有如此多的批评?
我的意思是:很多批评都集中在 Spark Streaming 微批处理架构上。通常,人们说 Kafka Streams 是一个真正的“实时”工具,因为它逐一处理事件。
它确实会一一处理事件,但是根据我的理解,它使用(与几乎所有其他库/框架一样)消费者 API。Consumer API 批量轮询主题,以减轻网络负担(间隔可配置)。因此,消费者会做类似的事情:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
///// PROCESS A **BATCH** OF RECORDS
for (ConsumerRecord<String, String> record : records) {
///// PROCESS **ONE-BY-ONE**
}
}
Run Code Online (Sandbox Code Playgroud)
因此,尽管 Spark 的说法是正确的:
但:
需要澄清的是,我并不是从“粉丝方面”提出问题(因此,这是一个意见问题),恰恰相反,我真的试图从技术上理解它,以便理解流媒体生态系统中的语义。
欣赏这件事上的每一条信息。
我有两个流数据集,我们将它们称为fastStream和slowStream。
这fastStream是我通过结构化流 API 从 Kafka 消费的流数据集。我预计每秒可能会收到数千条消息。
这slowStream实际上是一个引用(或查找)表,正在由另一个流“更新插入”,并且包含我想要在将fastStream记录保存到表之前加入到每条消息的数据。仅当有人更改元数据时才会slowStream更新,这可能随时发生,但我们预计可能每隔几天更改一次。
中的每条记录都fastStream将具有一条对应的消息slowStream,我本质上希望使该连接立即与表中的任何数据发生slowStream。我不想等到新数据到达时看看是否会发生潜在的匹配slowStream。
我遇到的问题是,根据 Spark 文档:
因此,对于这两个输入流,我们将过去的输入缓冲为流状态,以便我们可以将每个未来的输入与过去的输入进行匹配,并相应地生成连接结果。
我尝试添加水印,fastStream但我认为它没有效果,因为文档表明需要在连接中引用带水印的列
理想情况下我会写这样的东西:
# Apply a watermark to the fast stream
fastStream = spark.readStream \
.format("delta") \
.load("dbfs:/mnt/some_file/fastStream") \
.withWatermark("timestamp", "1 hour") \
.alias("fastStream")
# The slowStream cannot be watermarked since it is only slowly changing
slowStream = spark.readStream \
.format("delta") \
.load("dbfs:/mnt/some_file/slowStream") \
.alias("slowStream")
# Prevent …Run Code Online (Sandbox Code Playgroud) apache-spark spark-streaming pyspark spark-structured-streaming
我正在 pyspark 中接收 Kafka 流。目前,我将其按一组字段分组并向数据库写入更新:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", config["kafka"]["bootstrap.servers"]) \
.option("subscribe", topic)
...
df = df \
.groupBy("myfield1") \
.agg(
expr("count(*) as cnt"),
min(struct(col("mycol.myfield").alias("mmm"), col("*"))).alias("minData")
) \
.select("cnt", "minData.*") \
.select(
col("...").alias("..."),
...
col("userId").alias("user_id")
query = df \
.writeStream \
.outputMode("update") \
.foreachBatch(lambda df, epoch: write_data_frame(table_name, df, epoch)) \
.start()
query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
我可以在中间采用相同的链并创建另一个分组,例如
df2 = df \
.groupBy("myfield2") \
.agg(
expr("count(*) as cnt"),
min(struct(col("mycol.myfield").alias("mmm"), col("*"))).alias("minData")
) \
.select("cnt", "minData.*") \
.select(
col("...").alias("..."),
...
col("userId").alias("user_id")
Run Code Online (Sandbox Code Playgroud)
并将其输出并行写入不同的地方? …
目前,我在一个单元格中使用 Spark 数据帧(自动加载器)时遇到一些问题,可能需要一些时间才能写入数据。然后,在下面的单元格中,代码引用第一个表完成的工作。但是,如果由于 Spark 的分布式特性而运行整个笔记本(特别是作为作业),则第二个单元会在第一个单元完全完成之前运行。如何让第二个单元等待 writeStream 完成,而不将它们放在单独的笔记本中。
例子:
小区1
autoload = pysparkDF.writeStream.format('delta')....table('TABLE1')
Run Code Online (Sandbox Code Playgroud)
细胞2
df = spark.sql('select count(*) from TABLE1')
Run Code Online (Sandbox Code Playgroud) 在运行python spark结构流的编程指南[link]中给出的示例
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
我得到以下错误:
TypeError:'Builder'对象不可调用
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
spark = SparkSession.builder()\
.appName("StructuredNetworkWordCount")\
.getOrCreate()
# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark\
.readStream\
.format('socket')\
.option('host', 'localhost')\
.option('port', 9999)\
.load()
# Split the lines into words
words = lines.select(
explode(
split(lines.value, ' ')
).alias('word')
)
# Generate running word count
wordCounts = words.groupBy('word').count()
# Start running the query that prints the running counts to the …Run Code Online (Sandbox Code Playgroud) 我得到了这样的情况,当spark可以流式传输并从Kafka 2-patition主题的一个分区获取消息.
我的主题:
C:\bigdata\kafka_2.11-0.10.1.1\bin\windows>kafka-topics --create --zookeeper localhost:2181 --partitions 2 --replication-factor 1 --topic test4
卡夫卡制片人:
public class KafkaFileProducer {
// kafka producer
Producer<String, String> producer;
public KafkaFileProducer() {
// configs
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
//props.put("group.id", "testgroup");
props.put("batch.size", "16384");
props.put("auto.commit.interval.ms", "1000");
props.put("linger.ms", "0");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("block.on.buffer.full", "true");
// instantiate a producer
producer = new KafkaProducer<String, String>(props);
}
/**
* @param filePath
*/
public void sendFile(String filePath) {
FileInputStream fis;
BufferedReader br = null;
try { …Run Code Online (Sandbox Code Playgroud) 请参阅此处的帖子: 使用python进行Spark结构化流式传输 我想在python 3.5中导入'col'
from pyspark.sql.functions import col
Run Code Online (Sandbox Code Playgroud)
但是我收到一个错误,说未解决的对col的引用。我已经安装了pyspark库,所以想知道是否从pyspark库中删除了“ col”吗?然后我如何导入“ col”。
python apache-spark pyspark pyspark-sql spark-structured-streaming
我正在尝试使用CSV设置Kafka流,以便可以将其流式传输到Spark。但是,我不断
Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html
Run Code Online (Sandbox Code Playgroud)
我的代码看起来像这样
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.execution.streaming.FileStreamSource.Timestamp
import org.apache.spark.sql.types._
object SpeedTester {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.master("local[4]").appName("SpeedTester").config("spark.driver.memory", "8g").getOrCreate()
val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)
import spark.implicits._
val mySchema = StructType(Array(
StructField("incident_id", IntegerType),
StructField("date", StringType),
StructField("state", StringType),
StructField("city_or_county", StringType),
StructField("n_killed", IntegerType),
StructField("n_injured", IntegerType)
))
val streamingDataFrame = spark.readStream.schema(mySchema).csv("C:/Users/zoldham/IdeaProjects/flinkpoc/Data/test")
streamingDataFrame.selectExpr("CAST(incident_id AS STRING) AS key",
"to_json(struct(*)) AS value").writeStream
.format("kafka") …Run Code Online (Sandbox Code Playgroud) 我正在使用Spark结构化流处理来自流数据源的数据,并且正在使用文件接收器。数据将在处理后放入hdfs。
我有一个问题,输出文件是这样的part-00012-8d701427-8289-41d7-9b4d-04c5d882664d-c000.txt。这使我无法在上一个小时获取文件输出。
是否可以将输出文件自定义为timestamp_xxx或类似的内容?或者,我可以按批次输出到不同的路径吗?
spark-structured-streaming ×10
apache-spark ×9
apache-kafka ×4
pyspark ×4
databricks ×1
hdfs ×1
pyspark-sql ×1
python ×1
scala ×1