我很惊讶地看到 Spark 只使用一个 Kafka 消费者来消费来自 Kafka 的数据,并且这个消费者在驱动程序容器中运行。我更希望看到 Spark 创建与主题中分区数量一样多的消费者,并在执行器容器中运行这些消费者。
例如,我有一个包含 5 个分区的主题事件。我启动了我的 Spark Structured Streaming 应用程序,该应用程序使用该主题并写入 HDFS 上的 Parquet。该应用程序有 5 个执行程序。在检查 Spark 创建的 Kafka 消费者组时,我看到只有一个消费者负责所有 5 个分区。这个消费者在带有驱动程序的机器上运行:
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group spark-kafka-source-08e10acf-7234-425c-a78b-3552694f22ef--1589131535-driver-0
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
events 2 - 0 - consumer-1-8c3d806d-eb1e-4536-97d5-7c9d19582942 /192.168.100.147 consumer-1
events 1 - 0 - consumer-1-8c3d806d-eb1e-4536-97d5-7c9d19582942 /192.168.100.147 consumer-1
events 0 - 0 - consumer-1-8c3d806d-eb1e-4536-97d5-7c9d19582942 /192.168.100.147 consumer-1
events 4 - 0 - consumer-1-8c3d806d-eb1e-4536-97d5-7c9d19582942 /192.168.100.147 consumer-1
events 3 - 0 - …Run Code Online (Sandbox Code Playgroud) 更新:问题已解决。Docker镜像在这里:docker-spark-submit
我在Docker容器中使用胖子运行spark-submit。我的独立Spark集群在3个虚拟机上运行-一个主服务器和两个工作服务器。从工作计算机上的执行程序日志中,我看到该执行程序具有以下驱动程序URL:
“ --driver-url”“ spark://CoarseGrainedScheduler@172.17.0.2:5001”
172.17.0.2实际上是具有驱动程序的容器的地址,而不是运行容器的主机。无法从工作计算机访问该IP,因此工作计算机无法与驱动程序通信。从StandaloneSchedulerBackend的源代码中可以看到,它使用spark.driver.host设置构建driverUrl:
val driverUrl = RpcEndpointAddress(
sc.conf.get("spark.driver.host"),
sc.conf.get("spark.driver.port").toInt,
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
Run Code Online (Sandbox Code Playgroud)
它没有考虑SPARK_PUBLIC_DNS环境变量-这是正确的吗?在容器中,除容器“内部” IP地址(在此示例中为172.17.0.2)之外,我无法将spark.driver.host设置为其他任何内容。尝试将spark.driver.host设置为主机的IP地址时,出现如下错误:
WARN Utils:服务'sparkDriver'无法在端口5001上绑定。尝试使用端口5002。
我尝试将spark.driver.bindAddress设置为主机的IP地址,但是出现了相同的错误。那么,如何配置Spark使用主机IP地址而不是Docker容器地址与驱动程序通信?
UPD:来自执行程序的堆栈跟踪:
ERROR RpcOutboxMessage: Ask timeout before connecting successfully
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1713)
at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:66)
at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:284)
at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 120 seconds. This timeout is controlled by spark.rpc.askTimeout
at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216)
at scala.util.Try$.apply(Try.scala:192)
at scala.util.Failure.recover(Try.scala:216)
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326)
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at …Run Code Online (Sandbox Code Playgroud)