小编tas*_*yan的帖子

流查询使用多少个 Kafka 消费者来执行?

我很惊讶地看到 Spark 只使用一个 Kafka 消费者来消费来自 Kafka 的数据,并且这个消费者在驱动程序容器中运行。我更希望看到 Spark 创建与主题中分区数量一样多的消费者,并在执行器容器中运行这些消费者。

例如,我有一个包含 5 个分区的主题事件。我启动了我的 Spark Structured Streaming 应用程序,该应用程序使用该主题并写入 HDFS 上的 Parquet。该应用程序有 5 个执行程序。在检查 Spark 创建的 Kafka 消费者组时,我看到只有一个消费者负责所有 5 个分区。这个消费者在带有驱动程序的机器上运行:

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group spark-kafka-source-08e10acf-7234-425c-a78b-3552694f22ef--1589131535-driver-0

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
events          2          -               0               -               consumer-1-8c3d806d-eb1e-4536-97d5-7c9d19582942 /192.168.100.147  consumer-1
events          1          -               0               -               consumer-1-8c3d806d-eb1e-4536-97d5-7c9d19582942 /192.168.100.147  consumer-1
events          0          -               0               -               consumer-1-8c3d806d-eb1e-4536-97d5-7c9d19582942 /192.168.100.147  consumer-1
events          4          -               0               -               consumer-1-8c3d806d-eb1e-4536-97d5-7c9d19582942 /192.168.100.147  consumer-1
events          3          -               0               - …
Run Code Online (Sandbox Code Playgroud)

apache-kafka spark-structured-streaming

10
推荐指数
1
解决办法
529
查看次数

在Docker容器中运行Spark驱动程序-没有从执行程序到驱动程序的连接返回?

更新:问题已解决。Docker镜像在这里:docker-spark-submit

我在Docker容器中使用胖子运行spark-submit。我的独立Spark集群在3个虚拟机上运行-一个主服务器和两个工作服务器。从工作计算机上的执行程序日志中,我看到该执行程序具有以下驱动程序URL:

“ --driver-url”“ spark://CoarseGrainedScheduler@172.17.0.2:5001”

172.17.0.2实际上是具有驱动程序的容器的地址,而不是运行容器的主机。无法从工作计算机访问该IP,因此工作计算机无法与驱动程序通信。从StandaloneSchedulerBackend的源代码中可以看到,它使用spark.driver.host设置构建driverUrl:

val driverUrl = RpcEndpointAddress(
  sc.conf.get("spark.driver.host"),
  sc.conf.get("spark.driver.port").toInt,
  CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
Run Code Online (Sandbox Code Playgroud)

它没有考虑SPARK_PUBLIC_DNS环境变量-这是正确的吗?在容器中,除容器“内部” IP地址(在此示例中为172.17.0.2)之外,我无法将spark.driver.host设置为其他任何内容。尝试将spark.driver.host设置为主机的IP地址时,出现如下错误:

WARN Utils:服务'sparkDriver'无法在端口5001上绑定。尝试使用端口5002。

我尝试将spark.driver.bindAddress设置为主机的IP地址,但是出现了相同的错误。那么,如何配置Spark使用主机IP地址而不是Docker容器地址与驱动程序通信?

UPD:来自执行程序的堆栈跟踪:

ERROR RpcOutboxMessage: Ask timeout before connecting successfully
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1713)
    at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:66)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:284)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 120 seconds. This timeout is controlled by spark.rpc.askTimeout
    at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216)
    at scala.util.Try$.apply(Try.scala:192)
    at scala.util.Failure.recover(Try.scala:216)
    at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326)
    at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at …
Run Code Online (Sandbox Code Playgroud)

docker mesos apache-spark apache-spark-standalone

5
推荐指数
3
解决办法
4188
查看次数