vin*_*dev 5 scala apache-spark spark-dataframe
如何将文件从SFTP服务器加载到spark RDD。加载此文件后,我需要对数据进行一些过滤。该文件也是csv文件,所以请您帮我决定是否应该使用数据帧或RDD。
您可以spark-sftp通过以下方式在程序中使用库:
对于 Spark 2.x
Maven依赖
<dependency>
<groupId>com.springml</groupId>
<artifactId>spark-sftp_2.11</artifactId>
<version>1.1.0</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
SBT 依赖性
libraryDependencies += "com.springml" % "spark-sftp_2.11" % "1.1.0"
Run Code Online (Sandbox Code Playgroud)
与 Spark shell 一起使用
可以使用 --packages 命令行选项将此包添加到 Spark。例如,在启动 Spark shell 时包含它:
$ bin/spark-shell --packages com.springml:spark-sftp_2.11:1.1.0
Run Code Online (Sandbox Code Playgroud)
斯卡拉API
// Construct Spark dataframe using file in FTP server
val df = spark.read.
format("com.springml.spark.sftp").
option("host", "SFTP_HOST").
option("username", "SFTP_USER").
option("password", "****").
option("fileType", "csv").
option("inferSchema", "true").
load("/ftp/files/sample.csv")
// Write dataframe as CSV file to FTP server
df.write.
format("com.springml.spark.sftp").
option("host", "SFTP_HOST").
option("username", "SFTP_USER").
option("password", "****").
option("fileType", "csv").
save("/ftp/files/sample.csv")
Run Code Online (Sandbox Code Playgroud)
对于 Spark 1.x (1.5+)
Maven依赖
<dependency>
<groupId>com.springml</groupId>
<artifactId>spark-sftp_2.10</artifactId>
<version>1.0.2</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
SBT 依赖性
libraryDependencies += "com.springml" % "spark-sftp_2.10" % "1.0.2"
Run Code Online (Sandbox Code Playgroud)
与 Spark shell 一起使用
可以使用命令行选项将该包添加到 Spark --packages。例如,在启动 Spark shell 时包含它:
$ bin/spark-shell --packages com.springml:spark-sftp_2.10:1.0.2
Run Code Online (Sandbox Code Playgroud)
斯卡拉API
import org.apache.spark.sql.SQLContext
// Construct Spark dataframe using file in FTP server
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.
format("com.springml.spark.sftp").
option("host", "SFTP_HOST").
option("username", "SFTP_USER").
option("password", "****").
option("fileType", "csv").
option("inferSchema", "true").
load("/ftp/files/sample.csv")
// Write dataframe as CSV file to FTP server
df.write().
format("com.springml.spark.sftp").
option("host", "SFTP_HOST").
option("username", "SFTP_USER").
option("password", "****").
option("fileType", "csv").
save("/ftp/files/sample.csv")
Run Code Online (Sandbox Code Playgroud)
有关更多信息,spark-sftp您可以访问 github 页面springml/spark-sftp
| 归档时间: |
|
| 查看次数: |
4976 次 |
| 最近记录: |