EMR 上的 Spark Streaming Kinesis 抛出“将块存储到 Spark 中时出错”

gon*_*iaz 5 amazon-emr apache-spark amazon-kinesis

我们有一个在 EMR (5.16) 上运行的 Spark (2.3.1) Streaming 应用程序,它使用过去 2 年的 AWS Kinesis 的单个分片。最近(过去 2 个月),我们在应用程序中出现随机错误。我们尝试更新到最新版本的Spark和EMR,错误没有解决。

问题
出乎意料,在任何时刻和正常运行时间,并且在 RAM、CPU、网络中显然没有指标的情况下,接收器停止向 Spark 发送消息,我们在日志中出现此错误:

18/09/11 18:20:00 ERROR ReceiverTracker: Deregistered receiver for stream 0: Error while storing block into Spark - java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
at org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:210)
at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:158)
at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:129)
at org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:133)
at org.apache.spark.streaming.kinesis.KinesisReceiver.org$apache$spark$streaming$kinesis$KinesisReceiver$$storeBlockWithRanges(KinesisReceiver.scala:306)
at org.apache.spark.streaming.kinesis.KinesisReceiver$GeneratedBlockHandler.onPushBlock(KinesisReceiver.scala:357)
at org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:297)
at org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:269)
at org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:110)
Run Code Online (Sandbox Code Playgroud)

Spark 应用程序继续正常工作,但没有要处理的记录。

我在互联网上发现的唯一相关的是这个 Github 问题https://github.com/awslabs/amazon-kinesis-client/issues/185

我试过
的配置的最终版本看起来像这样

[
  {
    "Classification": "capacity-scheduler",
    "Properties": {
      "yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"
    }
  },
  {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.yarn.am.attemptFailuresValidityInterval": "1h",
      "spark.yarn.maxAppAttempts": "4",
      "spark.yarn.executor.failuresValidityInterval": "1h",
      "spark.task.maxFailures": "8",
      "spark.task.reaper.enabled": "true",
      "spark.task.reaper.killTimeout": "120s",
      "spark.metrics.conf": "metrics.properties",
      "spark.metrics.namespace": "spark",
      "spark.streaming.stopGracefullyOnShutdown": "true",
      "spark.streaming.receiver.writeAheadLog.enable": "true",
      "spark.streaming.receiver.writeAheadLog.closeFileAfterWrite": "true",
      "spark.streaming.driver.writeAheadLog.closeFileAfterWrite": "true",
      "spark.streaming.backpressure.enabled": "true",
      "spark.streaming.backpressure.initialRate": "500",
      "spark.dynamicAllocation.enabled": "false",
      "spark.default.parallelism": "18",
      "spark.driver.cores": "3",
      "spark.driver.memory": "4G",
      "spark.driver.memoryOverhead": "1024",
      "spark.driver.extraJavaOptions": "-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseCompressedOops",
      "spark.executor.instances": "3",
      "spark.executor.cores": "3",
      "spark.executor.memory": "4G",
      "spark.executor.memoryOverhead": "1024",
      "spark.executor.heartbeatInterval": "20s",
      "spark.io.compression.codec": "zstd",
      "spark.rdd.compress": "true",
      "spark.executor.extraJavaOptions": "-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseCompressedOops",
      "spark.python.worker.memory": "640m",
      "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version": "2",
      "spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored": "true",
      "spark.network.timeout": "240s",
      "spark.locality.wait": "0s",
      "spark.eventLog.enabled": "false"
    }
  },
  {
    "Classification": "emrfs-site",
    "Properties": {
      "fs.s3.consistent.metadata.tableName": "HiddenName",
      "fs.s3.consistent": "true",
      "fs.s3.consistent.retryCount": "1",
      "fs.s3.consistent.retryPeriodSeconds": "2",
      "fs.s3.consistent.retryPolicyType": "fixed",
      "fs.s3.consistent.throwExceptionOnInconsistency": "false"
    }
  }
]
Run Code Online (Sandbox Code Playgroud)

没有任何帮助。

变通方法 要让接收器再次发送消息,我必须转到 Spark UI、作业选项卡并终止活动作业(我无法上传图像,因为我没有足够的声望)。这会触发一个新的应用程序,接收器再次开始接收向 Spark 发送的记录。当然,这不是生产系统的解决方案。

编程解决方法 我编写了一个 StreamingListener 来捕获onReceiverStoppedonReceiverError. 发生这种情况时,它会创建一个信号(外部文件),每次ssc.awaitTerminationOrTimeout(check_interval)处理时都会检查该信号。如果信号已创建,应用程序将引发异常并终止应用程序。这会生成一个新的appattempt和新的 Streaming App 运行。这样做的问题是,第一是它没有解决原始问题,第二是在第一个之后appattemptStreamingListener 不再注册,所以我无法恢复应用程序。