将文件从SFTP服务器加载到spark RDD

vin*_*dev 5 scala apache-spark spark-dataframe

如何将文件从SFTP服务器加载到spark RDD。加载此文件后,我需要对数据进行一些过滤。该文件也是csv文件,所以请您帮我决定是否应该使用数据帧或RDD。

him*_*ian 2

您可以spark-sftp通过以下方式在程序中使用库:

对于 Spark 2.x

Maven依赖

<dependency>
    <groupId>com.springml</groupId>
    <artifactId>spark-sftp_2.11</artifactId>
    <version>1.1.0</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)

SBT 依赖性

libraryDependencies += "com.springml" % "spark-sftp_2.11" % "1.1.0"
Run Code Online (Sandbox Code Playgroud)

与 Spark shell 一起使用

可以使用 --packages 命令行选项将此包添加到 Spark。例如,在启动 Spark shell 时包含它:

$ bin/spark-shell --packages com.springml:spark-sftp_2.11:1.1.0
Run Code Online (Sandbox Code Playgroud)

斯卡拉API

// Construct Spark dataframe using file in FTP server
val df = spark.read.
            format("com.springml.spark.sftp").
            option("host", "SFTP_HOST").
            option("username", "SFTP_USER").
            option("password", "****").
            option("fileType", "csv").
            option("inferSchema", "true").
            load("/ftp/files/sample.csv")

// Write dataframe as CSV file to FTP server
df.write.
      format("com.springml.spark.sftp").
      option("host", "SFTP_HOST").
      option("username", "SFTP_USER").
      option("password", "****").
      option("fileType", "csv").
      save("/ftp/files/sample.csv")
Run Code Online (Sandbox Code Playgroud)

对于 Spark 1.x (1.5+)

Maven依赖

<dependency>
    <groupId>com.springml</groupId>
    <artifactId>spark-sftp_2.10</artifactId>
    <version>1.0.2</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)

SBT 依赖性

libraryDependencies += "com.springml" % "spark-sftp_2.10" % "1.0.2"
Run Code Online (Sandbox Code Playgroud)

与 Spark shell 一起使用

可以使用命令行选项将该包添加到 Spark --packages。例如,在启动 Spark shell 时包含它:

$ bin/spark-shell --packages com.springml:spark-sftp_2.10:1.0.2
Run Code Online (Sandbox Code Playgroud)

斯卡拉API

import org.apache.spark.sql.SQLContext

// Construct Spark dataframe using file in FTP server
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.
                    format("com.springml.spark.sftp").
                    option("host", "SFTP_HOST").
                    option("username", "SFTP_USER").
                    option("password", "****").
                    option("fileType", "csv").
                    option("inferSchema", "true").
                    load("/ftp/files/sample.csv")

// Write dataframe as CSV file to FTP server
df.write().
      format("com.springml.spark.sftp").
      option("host", "SFTP_HOST").
      option("username", "SFTP_USER").
      option("password", "****").
      option("fileType", "csv").
      save("/ftp/files/sample.csv")
Run Code Online (Sandbox Code Playgroud)

有关更多信息,spark-sftp您可以访问 github 页面springml/spark-sftp