小编jam*_*ann的帖子

从多个 Kafka 主题读取 Spark 结构化流应用程序

我有一个 Spark 结构化流应用程序 (v2.3.2),它需要从许多 Kafka 主题中读取数据,进行一些相对简单的处理(主要是聚合和一些连接)并将结果发布到许多其他 Kafka 主题。因此在同一个应用程序中处理多个流。

我想知道如果我只设置 1 个订阅多个主题的直接 readStream,然后使用选择拆分流,那么从资源的角度(内存、执行程序、线程、Kafka 侦听器等)是否会有所不同,而不是 1每个主题的 readStream。

就像是

df = spark.readStream.format("kafka").option("subscribe", "t1,t2,t3")
...
t1df = df.select(...).where("topic = 't1'")...
t2df = df.select(...).where("topic = 't2'")...
Run Code Online (Sandbox Code Playgroud)

对比

t1df = spark.readStream.format("kafka").option("subscribe", "t1")
t2df = spark.readStream.format("kafka").option("subscribe", "t2")
Run Code Online (Sandbox Code Playgroud)

一个比另一个更“有效”吗?我找不到任何关于这是否有所作为的文档。

谢谢!

apache-kafka apache-spark spark-structured-streaming

11
推荐指数
1
解决办法
3184
查看次数

Spark 结构化流 maxOffsetsPerTrigger 似乎不起作用

我遇到了一个 Spark 结构化流 (SSS) 应用程序的问题,该应用程序由于程序错误而崩溃,并且在周末没有处理。当我重新启动它时,有许多关于要重新处理的主题的消息(需要加入的 3 个主题各有 250'000 条消息)。

重新启动时,应用程序再次崩溃并出现 OutOfMemory 异常。我从文档中了解到,maxOffsetsPerTrigger在这些情况下,读取流上的配置应该会有所帮助。我更改了 PySpark 代码(在 SSS 2.4.3 上运行),所有 3 个主题都具有以下内容

 rawstream = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", bootstrapServers)
    .option("subscribe", topicName)
    .option("maxOffsetsPerTrigger", 10000L)
    .option("startingOffsets", "earliest")
    .load()
Run Code Online (Sandbox Code Playgroud)

我的期望是现在 SSS 查询将从每个主题加载 ~33'000 个偏移量并将它们加入第一批。然后在第二批中,它将清除第一批中的状态记录,由于水印而到期(这将清除第一批中的大部分记录),然后从每个主题中再读取约 33k。因此,在大约 8 个批次之后,它应该已经处理了延迟,并具有“合理”的内存量。

但是应用程序仍然因 OOM 而崩溃,当我检查应用程序主 UI 中的 DAG 时,它报告它再次尝试读取所有 250'000 条消息。

还有什么我需要配置的吗?我如何检查这个选项是否真的被使用?(当我检查计划时,不幸的是它被截断了,只是显示(Options: [includeTimestamp=true,subscribe=IN2,inferSchema=true,failOnDataLoss=false,kafka.b...),我不知道如何在点之后显示部分)

apache-spark spark-structured-streaming

8
推荐指数
1
解决办法
630
查看次数

在 Solaris 上解释 prstat 与 'prstat -m' 的差异

我最近一直在使用 prstat 和 prstat -ma 来调查性能问题,我想我已经基本了解了 Solaris 10 中采样与微状态会计的差异。所以我不希望两者总是显示完全相同数字。

今天我遇到了一个场合,其中 2 显示出如此不同的输出,我在解释它们和理解输出时遇到了问题。这台机器是一个负载很重的 8-CPU Solaris 10,有几个大型 WebSphere 进程和一个 Oracle 数据库。今天系统实际上停止了大约 15 分钟(平均负载 >700)。我很难获得任何 prstat 信息,但能够从“prtstat 1 1”和“prtstat -m 1 1”中获得一些输出,这些输出很快一个接一个地发布。

输出的顶行:

prstat 1 1:

    PID 用户名大小 RSS 状态 PRI NICE 时间 CPU 进程/NLWP
  8379 是 3208M 2773M cpu5 60 0 5:29:13 19% java/145
  7123 是 3159M 2756M 运行 59 0 5:26:45 7.7% java/109
  5855 app1 1132M 26M cpu2 60 0 0:01:01 7.7% java/18
 16503 监视器 494M 286M 运行 59 19 1:01:08 …

solaris

5
推荐指数
1
解决办法
1万
查看次数