如何将位于 HDFS 上的类型安全配置文件添加到 spark-submit(集群模式)?

Job*_*obs 5 hadoop hdfs typesafe apache-spark

我有一个 Spark (Spark 1.5.2) 应用程序,它将数据从 Kafka 流式传输到 HDFS。我的应用程序包含两个 Typesafe 配置文件来配置某些内容,例如 Kafka 主题等。

现在我想在集群中使用 spark-submit(集群模式)运行我的应用程序。包含我项目所有依赖项的 jar 文件存储在 HDFS 上。只要我的配置文件包含在 jar 文件中,一切正常。但这对于测试目的是不切实际的,因为我总是必须重建 jar。

因此,我排除了项目的配置文件,并通过“驱动程序类路径”添加了它们。这在客户端模式下有效,但是如果我现在将配置文件移动到 HDFS 并在集群模式下运行我的应用程序,它将找不到设置。您可以在下面找到我的 spark-submit 命令:

/usr/local/spark/bin/spark-submit \
    --total-executor-cores 10 \
    --executor-memory 15g \
    --verbose \
    --deploy-mode cluster\
    --class com.hdp.speedlayer.SpeedLayerApp \
    --driver-class-path hdfs://iot-master:8020/user/spark/config \
    --master spark://spark-master:6066 \
    hdfs://iot-master:8020/user/spark/speed-layer-CONFIG.jar
Run Code Online (Sandbox Code Playgroud)

我已经用 --file 参数尝试过它,但这也不起作用。有谁知道我该如何解决这个问题?

更新:

我做了一些进一步的研究,我发现它可能与 HDFS 路径有关。我将 HDFS 路径更改为“hdfs:///iot-master:8020//user//spark//config 但不幸的是,这也不起作用。但也许这可以帮助你。

下面你也可以看到我在集群模式下运行驱动程序时得到的错误:

Exception in thread "main" java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
    at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.lang.ExceptionInInitializerError
    at com.speedlayer.SpeedLayerApp.main(SpeedLayerApp.scala)
    ... 6 more
Caused by: com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'application'
    at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:145)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164)
...
Run Code Online (Sandbox Code Playgroud)

Noa*_*ish 5

为了达到相同的结果,我发现了以下内容:

  1. --files:仅与运行 spark-submit 命令的机器上的本地文件相关联并转换为conf.addFile(). 所以 hdfs 文件将无法工作,除非您能够hdfs dfs -get <....>在检索文件之前运行。在我的情况下,我想从 oozie 运行它,所以我不知道它将在哪台机器上运行,我不想在我的工作流中添加复制文件操作。
  2. @Yuval_Itzchakov 引用的引用是 --jars ,它只处理 jars,因为它转换为 conf.addJar()

所以据我所知,没有严格的方法可以从 hdfs 加载配置文件。

我的方法是将路径传递给我的应用程序并读取配置文件并将其合并到参考文件中:

private val HDFS_IMPL_KEY = "fs.hdfs.impl"
def loadConf(pathToConf: String): Config = {
   val path = new Path(pathToConf)
   val confFile = File.createTempFile(path.getName, "tmp")
   confFile.deleteOnExit()
   getFileSystemByUri(path.toUri).copyToLocalFile(path, new Path(confFile.getAbsolutePath))

   ConfigFactory.load(ConfigFactory.parseFile(confFile))
}

def getFileSystemByUri(uri: URI) : FileSystem  = {
   val hdfsConf = new Configuration()
   hdfsConf.set(HDFS_IMPL_KEY, classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)
FileSystem.get(uri, hdfsConf)
}
Run Code Online (Sandbox Code Playgroud)

PS错误仅表示ConfigFactory没有找到任何配置文件,因此他找不到您要查找的属性。