如何在以本地模式运行的 pyspark 中从 S3 中读取数据?

Jar*_*red 4 python amazon-s3 apache-spark pyspark

我正在使用 PyCharm 2018.1,使用 Python 3.4 和通过 pip 在 vi​​rtualenv 中安装的 Spark 2.3。本地主机上没有安装hadoop,所以没有安装Spark(因此没有SPARK_HOME、HADOOP_HOME等)

当我尝试这个时:

from pyspark import SparkConf
from pyspark import SparkContext
conf = SparkConf()\
    .setMaster("local")\
    .setAppName("pyspark-unittests")\
    .set("spark.sql.parquet.compression.codec", "snappy")
sc = SparkContext(conf = conf)
inputFile = sparkContext.textFile("s3://somebucket/file.csv")
Run Code Online (Sandbox Code Playgroud)

我得到:

py4j.protocol.Py4JJavaError: An error occurred while calling o23.partitions.
: java.io.IOException: No FileSystem for scheme: s3
Run Code Online (Sandbox Code Playgroud)

在本地模式下运行 pyspark 时如何从 s3 读取数据,而无需在本地安装完整的 Hadoop?

FWIW - 当我以非本地模式在 EMR 节点上执行它时,这很有效。

以下不起作用(相同的错误,尽管它确实解决并下载了依赖项):

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "org.apache.hadoop:hadoop-aws:3.1.0" pyspark-shell'
from pyspark import SparkConf
from pyspark import SparkContext
conf = SparkConf()\
    .setMaster("local")\
    .setAppName("pyspark-unittests")\
    .set("spark.sql.parquet.compression.codec", "snappy")
sc = SparkContext(conf = conf)
inputFile = sparkContext.textFile("s3://somebucket/file.csv")
Run Code Online (Sandbox Code Playgroud)

相同(坏)结果:

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars "/path/to/hadoop-aws-3.1.0.jar" pyspark-shell'
from pyspark import SparkConf
from pyspark import SparkContext
conf = SparkConf()\
    .setMaster("local")\
    .setAppName("pyspark-unittests")\
    .set("spark.sql.parquet.compression.codec", "snappy")
sc = SparkContext(conf = conf)
inputFile = sparkContext.textFile("s3://somebucket/file.csv")
Run Code Online (Sandbox Code Playgroud)

Tar*_*ani 5

所以格伦尼的回答很接近,但不是你的情况。关键是选择正确版本的依赖项。如果你看一下虚拟环境

罐子

一切都指向一个版本 which 2.7.3,您还需要使用哪个版本

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "org.apache.hadoop:hadoop-aws:2.7.3" pyspark-shell'
Run Code Online (Sandbox Code Playgroud)

您应该通过检查venv/Lib/site-packages/pyspark/jars项目虚拟环境中的路径来验证您的安装使用的版本

之后,您可以s3a默认使用或s3通过定义相同的处理程序类

# Only needed if you use s3://
sc._jsc.hadoopConfiguration().set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set('fs.s3a.access.key', 'awsKey')
sc._jsc.hadoopConfiguration().set('fs.s3a.secret.key', 'awsSecret')
s3File = sc.textFile("s3a://myrepo/test.csv")

print(s3File.count())
print(s3File.id())
Run Code Online (Sandbox Code Playgroud)

输出如下

输出火花