读取驱动程序通过spark-submit发送的文件

Lit*_*les 33 apache-spark

我正在发送一个Spark作业,通过运行在远程集群上运行

spark-submit ... --deploy-mode cluster --files some.properties ...
Run Code Online (Sandbox Code Playgroud)

我想some.properties通过驱动程序代码读取文件的内容,即在创建Spark上下文和启动RDD任务之前.该文件将复制到远程驱动程序,但不会复制到驱动程序的工作目录.

我所知道的解决这个问题的方法是:

  1. 将文件上传到HDFS
  2. 将文件存储在app jar中

两者都不方便,因为在提交开发机器上经常更改此文件.

有没有办法--files在驱动程序代码main方法中读取使用该标志上载的文件?

Ton*_*res 21

是的,您可以访问通过--files参数上传的文件.

这就是我能够访问通过以下方式传入的文件--files:

./bin/spark-submit \
--class com.MyClass \
--master yarn-cluster \
--files /path/to/some/file.ext \
--jars lib/datanucleus-api-jdo-3.2.6.jar,lib/datanucleus-rdbms-3.2.9.jar,lib/datanucleus-core-3.2.10.jar \
/path/to/app.jar file.ext
Run Code Online (Sandbox Code Playgroud)

在我的Spark代码中:

val filename = args(0)
val linecount = Source.fromFile(filename).getLines.size
Run Code Online (Sandbox Code Playgroud)

我相信这些文件会被下载到与放置jar相同的目录中的worker上,这就是为什么只是传递文件名而不是绝对路径Source.fromFile.

  • 哦对不起,我忽略了你没有以YARN模式运行.有一个名为[`SparkFiles`]的助手类(https://spark.apache.org/docs/1.4.1/api/scala/index.html#org.apache.spark.SparkFiles$).`SparkFiles.get(filename)`将返回下载`filename`的路径,但是在初始化Spark上下文之前你将无法使用它.在Spark上下文初始化之前,是否有任何特殊原因需要读取文件? (4认同)
  • 这是我试过的.该文件将复制到工作者的工作目录,但不会复制到驱动程序的目录中.也许它在YARN模式下有所不同. (2认同)
  • 如果路径位于 S3 等对象存储中而不是文件系统中怎么办?或者这与“--files”不兼容? (2认同)

小智 7

和选项支持使用 指定--files文件--archives#,就像 Hadoop 一样。

例如,您可以指定:--files localtest.txt#appSees.txt这会将您本地命名的文件上传localtest.txt到 Spark 工作目录,但这将通过名称链接到appSees.txt,并且您的应用程序appSees.txt在 YARN 上运行时应使用该名称来引用它。

这适用于纱线/客户端和纱线/集群模式下的 Spark 流应用程序。


小智 6

使用后spark-submit --help,会发现该选项仅针对执行器的工作目录,而不针对驱动程序。

--files FILES: Comma-separated list of files to be placed in the working directory of each executor.


Pra*_*hoo 5

经过调查,我找到了解决上述问题的一种方法。在spark-submit期间发送any.properties配置,并由Spark驱动程序在SparkSession初始化之前和之后使用它。希望对您有帮助。

any.properties

spark.key=value
spark.app.name=MyApp
Run Code Online (Sandbox Code Playgroud)

SparkTest.java

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

public class SparkTest{

  public Static void main(String[] args){

    String warehouseLocation = new File("spark-warehouse").getAbsolutePath();

    Config conf = loadConf();
    System.out.println(conf.getString("spark.key"));

    // Initialize SparkContext and use configuration from properties
    SparkConf sparkConf = new SparkConf(true).setAppName(conf.getString("spark.app.name"));

    SparkSession sparkSession = 
    SparkSession.builder().config(sparkConf).config("spark.sql.warehouse.dir", warehouseLocation)
                .enableHiveSupport().getOrCreate();

    JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());

  }


  public static Config loadConf() {

      String configFileName = "any.properties";
      System.out.println(configFileName);
      Config configs = ConfigFactory.load(ConfigFactory.parseFile(new java.io.File(configFileName)));
      System.out.println(configs.getString("spark.key")); // get value from properties file
      return configs;
   }
}
Run Code Online (Sandbox Code Playgroud)

Spark提交:

spark-submit --class SparkTest --master yarn --deploy-mode client --files any.properties,yy-site.xml --jars ...........
Run Code Online (Sandbox Code Playgroud)