Spark Structured Streaming - UI 存储内存值增长

Gus*_*s B 5 apache-spark spark-structured-streaming

我正在从 DStreams Spark 应用程序迁移到结构化流应用程序。在测试过程中,我发现 Spark 的 UI 中 Executors 选项卡中的 Storage Memory 不断增长。它甚至超过了分配的内存,同时没有溢出到磁盘,缓存的 RDD 也只有几 MB。我使用的是 Spark 2.4.3 版并使用来自 Kafka 2.1 版的数据。

下面的示例显示了一个带有一个驱动程序和一个执行程序的应用程序。驱动程序分配了 3 GB 内存,执行程序分配了 5 GB(和 3 个内核)。

Spark UI 执行器选项卡

如您所见,UI 显示每个进程(执行程序和驱动程序)消耗大约 8 GB 的内存,而分配的值要小得多。它还表明没有溢出到磁盘。下图还显示缓存的 RDD 的大小约为 100 MB:

Spark UI 存储选项卡

我试图用系统的值来验证 UI 报告的内存使用情况。我使用了 ps 命令,它显示驱动程序消耗大约 2 GB 的内存,执行程序消耗 5 GB,这在分配的值内。

我还使用 Spark 的 REST API 来获取执行程序的状态。响应显示“memoryUsed”值是 UI 中显示的值。这是 JSON 响应:

{
  "id": "driver",
  "hostPort": "ip:41214",
  "isActive": true,
  "rddBlocks": 0,
  "memoryUsed": 7909526598,
  "diskUsed": 0,
  "totalCores": 0,
  "maxTasks": 0,
  "activeTasks": 0,
  "failedTasks": 0,
  "completedTasks": 0,
  "totalTasks": 0,
  "totalDuration": 0,
  "totalGCTime": 0,
  "totalInputBytes": 0,
  "totalShuffleRead": 0,
  "totalShuffleWrite": 0,
  "isBlacklisted": false,
  "maxMemory": 1529452953,
  "addTime": "2019-05-31T14:46:51.563GMT",
  "executorLogs": {},
  "memoryMetrics": {
    "usedOnHeapStorageMemory": 7909526598,
    "usedOffHeapStorageMemory": 0,
    "totalOnHeapStorageMemory": 1529452953,
    "totalOffHeapStorageMemory": 0
  },
  "blacklistedInStages": []
},
{
  "id": "0",
  "hostPort": "ip:40787",
  "isActive": true,
  "rddBlocks": 24,
  "memoryUsed": 7996553955,
  "diskUsed": 0,
  "totalCores": 3,
  "maxTasks": 3,
  "activeTasks": 0,
  "failedTasks": 0,
  "completedTasks": 710401,
  "totalTasks": 710401,
  "totalDuration": 306845440,
  "totalGCTime": 8128264,
  "totalInputBytes": 733395681216,
  "totalShuffleRead": 475652972265,
  "totalShuffleWrite": 354298278067,
  "isBlacklisted": false,
  "maxMemory": 2674812518,
  "addTime": "2019-05-31T14:46:53.680GMT",
  "executorLogs": {
    "stdout": "http://ip:8081/logPage/?appId=app-20190531164651-0027&executorId=0&logType=stdout",
    "stderr": "http://ip:8081/logPage/?appId=app-20190531164651-0027&executorId=0&logType=stderr"
  },
  "memoryMetrics": {
    "usedOnHeapStorageMemory": 7996553955,
    "usedOffHeapStorageMemory": 0,
    "totalOnHeapStorageMemory": 2674812518,
    "totalOffHeapStorageMemory": 0
  },
  "blacklistedInStages": []
}
Run Code Online (Sandbox Code Playgroud)

“memoryUsed”和“usedOnHeapStorageMemory”值似乎与 UI 中显示的值相同。

那么,Spark 如何显示结构化流的已用内存是否存在错误?报告值与系统值不一致。

请注意,在我的应用程序中,我使用带有水印和附加模式的聚合。我认为这可能是问题所在,并且状态未正确清理。但是,我使用query.lastProgress方法来监视流式查询的状态,它表明状态确实已清除。我什至删除了聚合并使用了附加模式,以便查询是无状态的并且行为是相同的。

先感谢您。