我正在Spark 2.1.1中使用结构化流。我需要对传入消息(从Kafka来源)应用一些业务逻辑。
本质上,我需要获取消息,获取一些键值,在HBase中查找它们并在数据集上执行更多biz逻辑。最终结果是一个字符串消息,需要将其写出到另一个Kafka队列。
但是,由于传入消息的抽象是一个数据帧(无边界表-结构化流),因此我必须遍历一次触发期间接收到的数据集mapPartitions(由于HBase客户端无法序列化而导致的分区)。
在我的流程中,我需要遍历每一行以执行相同的业务流程。
dataFrame.mapPartitions打电话呢?我觉得它的顺序和迭代!您会推荐一种替代方法吗?
我之前已经成功地将 pyspark 用于 Spark Streaming(Spark 2.0.2)和 Kafka(0.10.1.0),但我的目的更适合结构化流。我尝试在线使用示例:https : //spark.apache.org/docs/2.1.0/structured-streaming-kafka-integration.html
使用以下类似代码:
ds1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
query = ds1
.writeStream
.outputMode('append')
.format('console')
.start()
query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
但是,我总是以以下错误告终:
: org.apache.kafka.common.config.ConfigException:
Missing required configuration "partition.assignment.strategy" which has no default value
Run Code Online (Sandbox Code Playgroud)
我还尝试在创建 ds1 时将其添加到我的选项集中:
.option("partition.assignment.strategy", "range")
Run Code Online (Sandbox Code Playgroud)
但即使明确地为其分配一个值也不能阻止错误,我可以在网上或 Kafka 文档中找到的任何其他值(如“roundrobin”)也没有。
我也用“assign”选项尝试了这个并实现了同样的错误(我们的Kafka主机设置为assign——每个消费者只分配一个分区,我们没有任何重新平衡)。
知道这里发生了什么吗?该文档没有帮助(可能是因为它仍处于实验阶段)。另外,是否有使用 KafkaUtils 进行结构化流处理?或者这是唯一的网关?
apache-kafka apache-spark apache-spark-sql pyspark spark-structured-streaming
我在Spark中有一个StreamingQuery(v2.2.0),即
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.load()
val query = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("parquet")
.option("checkpointLocation", "s3n://bucket/checkpoint/test")
.option("path", "s3n://bucket/test")
.start()
Run Code Online (Sandbox Code Playgroud)
当我运行时,query数据确实在AWS S3上保存并且创建了检查点s3n://bucket/checkpoint/test.但是,我也在日志中收到以下警告:
WARN [oassestreaming.OffsetSeqLog]无法使用FileContext API在路径s3n:// bucket/checpoint/test/offsets管理元数据日志文件.使用FileSystem API代替管理日志文件.失败时日志可能不一致.
我无法理解为什么会出现这种警告.此外,如果发生任何故障,我的检查点会不一致吗?
任何人都可以帮我解决它吗?
我正在编写一个小示例,Spark Structured Streaming其中我试图处理netstat命令的输出,但无法弄清楚如何调用该window函数。
这些是我的 build.sbt 的相关行:
scalaVersion := "2.11.4"
scalacOptions += "-target:jvm-1.8"
libraryDependencies ++= {
val sparkVer = "2.3.0"
Seq(
"org.apache.spark" %% "spark-streaming" % sparkVer % "provided",
"org.apache.spark" %% "spark-streaming-kafka-0-8" % sparkVer % "provided",
"org.apache.spark" %% "spark-core" % sparkVer % "provided" withSources(),
"org.apache.spark" %% "spark-hive" % sparkVer % "provided",
)
}
Run Code Online (Sandbox Code Playgroud)
和代码:
case class NetEntry(val timeStamp: java.sql.Timestamp, val sourceHost: String, val targetHost: String, val status: String)
def convertToNetEntry(x: String): NetEntry = {
// tcp …Run Code Online (Sandbox Code Playgroud) 我正在使用 Spark Streaming,在尝试实现多个写入流时遇到一些问题。下面是我的代码
DataWriter.writeStreamer(firstTableData,"parquet",CheckPointConf.firstCheckPoint,OutputConf.firstDataOutput)
DataWriter.writeStreamer(secondTableData,"parquet",CheckPointConf.secondCheckPoint,OutputConf.secondDataOutput)
DataWriter.writeStreamer(thirdTableData,"parquet", CheckPointConf.thirdCheckPoint,OutputConf.thirdDataOutput)
Run Code Online (Sandbox Code Playgroud)
其中 writeStreamer 定义如下:
def writeStreamer(input: DataFrame, checkPointFolder: String, output: String) = {
val query = input
.writeStream
.format("orc")
.option("checkpointLocation", checkPointFolder)
.option("path", output)
.outputMode(OutputMode.Append)
.start()
query.awaitTermination()
}
Run Code Online (Sandbox Code Playgroud)
我面临的问题是只有第一个表是用 Spark writeStream 写入的,所有其他表都没有发生任何情况。请问您对此有什么想法吗?
我有一个 Spark 结构化流应用程序,它从 kafka 读取数据并将其写入 hdfs。我想根据当前日期动态更改 hdfs 写入路径,但结构化流媒体似乎不能那样工作。它只是创建一个应用程序启动日期的文件夹,即使日期发生变化,也会继续写入同一文件夹。有什么办法可以根据当前日期动态更改路径?
下面是我的 writestream 的样子
val inputFormat = new SimpleDateFormat("yyyy-MM-dd")
val outPath = "maindir/sb_topic/data/loaddate="
val dswWriteStream =dfresult.writeStream
.outputMode(outputMode)
.format(writeformat)
.option("path",outPath+inputFormat.format((new java.util.Date()).getTime())) //hdfs file write path
.option("checkpointLocation", checkpointdir)
.option("maxRecordsPerFile", 999999999)
.trigger(Trigger.ProcessingTime("10 minutes"))
Run Code Online (Sandbox Code Playgroud) 我正在使用 Spark Structured Streaming。此外,我正在与Scala. 我想将配置文件传递给我的 spark 应用程序。此配置文件托管在HDFS. 例如;
spark_job.conf (HOCON)
spark {
appName: "",
master: "",
shuffle.size: 4
etc..
}
kafkaSource {
servers: "",
topic: "",
etc..
}
redisSink {
host: "",
port: 999,
timeout: 2000,
checkpointLocation: "hdfs location",
etc..
}
Run Code Online (Sandbox Code Playgroud)
如何将其传递给 Spark 应用程序?如何hosted HDFS在 Spark 中读取此文件()?
configuration hadoop apache-spark apache-spark-sql spark-structured-streaming
我有一个 kafka 流,正在加载到 Spark。来自 Kafka 主题的消息具有以下属性:bl_iban、blacklisted、timestamp。因此,有 IBANS、关于该 IBAN 是否被列入黑名单 (Y/N) 的标志,并且还有该记录的时间戳。问题是一个 IBAN 可以有多个记录,因为超时的 IBAN 可能会被列入黑名单或“删除”。我想要实现的目标是了解每个 IBANS 的当前状态。然而,我从更简单的目标开始,那就是列出每个最新的 IBAN timestamp(之后我也想添加blacklisted状态),所以我生成了以下代码(其中黑名单代表我从 Kafka 加载的数据集):
blackList = blackList.groupBy("bl_iban")
.agg(col("bl_iban"), max("timestamp"));
Run Code Online (Sandbox Code Playgroud)
之后我尝试使用以下代码将其打印到控制台:
StreamingQuery query = blackList.writeStream()
.format("console")
.outputMode(OutputMode.Append())
.start();
Run Code Online (Sandbox Code Playgroud)
我已经运行我的代码并收到以下错误:
Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark
所以我将水印添加到我的数据集中,如下所示:
blackList = blackList.withWatermark("timestamp", "2 seconds")
.groupBy("bl_iban")
.agg(col("bl_iban"), max("timestamp"));
Run Code Online (Sandbox Code Playgroud)
之后又出现同样的错误。我有什么想法可以解决这个问题吗?
更新:在迈克的帮助下,我成功地摆脱了这个错误。但问题是我仍然无法让我的黑名单发挥作用。我可以看到数据是如何从 Kafka 加载的,但之后从我的组操作中我得到了两个空批次,仅此而已。从Kafka打印的数据:
+-----------------------+-----------+-----------------------+
|bl_iban |blacklisted|timestamp |
+-----------------------+-----------+-----------------------+
|SK047047595122709025789|N …Run Code Online (Sandbox Code Playgroud) java apache-spark spark-streaming spark-structured-streaming
如何在 Spark Java 中使用以下函数?查遍了互联网但找不到合适的例子。
public void foreachPartition(scala.Function1<scala.collection.Iterator<T>,scala.runtime.BoxedUnit> f)
Run Code Online (Sandbox Code Playgroud)
我唯一知道的是它对流程有好处batch of data,所谓的BoxedUnit。
如何获取或batch ID批量处理数据?BoxedUnitdataset
谁能告诉我如何实现这个方法?
我正在使用 Kafka 的 Structured Streaming 源(集成指南),如前所述,它没有提交任何偏移量。
我的目标之一是监控它(检查它是否落后等)。即使它没有提交偏移量,它也会通过不时查询 kafka 并检查下一个要处理的偏移量来处理它们。根据文档,偏移量被写入 HDFS,因此在发生故障时可以恢复,但问题是:
它们存放在哪里?如果不提交偏移量,是否有任何方法可以监视火花流(结构化)的 kafka 消耗(从程序外部;所以 kafka cli 或类似的,每个记录附带的偏移量不适合用例) ?
干杯
offset apache-kafka apache-spark spark-streaming spark-structured-streaming
有人可以解释一下Spark结构化流上foreach writer的需求吗?
当我们以dataFrame的形式获取所有源数据时,我没有使用foreachwriter。
在 ForeachBatch 函数结构化 Straming 中,我想创建微批次中接收的数据帧的临时视图
func(tabdf, epoch_id):
tabaDf.createOrReplaceView("taba")
Run Code Online (Sandbox Code Playgroud)
但我收到以下错误:
org.apache.spark.sql.streaming.StreamingQueryException: Table or view not found: taba
Caused by: org.apache.spark.sql.catalyst.analysis.NoSuchTableException: Table or view 'taba' not found
Run Code Online (Sandbox Code Playgroud)
请任何人帮助我解决这个问题。
spark-streaming apache-spark-sql pyspark spark-structured-streaming
spark-structured-streaming ×12
apache-spark ×10
apache-kafka ×3
java ×2
pyspark ×2
scala ×2
amazon-s3 ×1
hadoop ×1
offset ×1