在 docker 容器中配置 Pyspark AWS 凭证

Mar*_*llm 5 python-3.x docker pyspark aws-glue

我正在使用 Docker 通过 pyspark 开发本地 AWS 粘合作业。Song_data.py 文件包含 AWS 胶水作业。我使用我的 AWS 凭证配置了 Spark 会话,但下面的错误表明情况并非如此。在该文件中,我使用粘合上下文方法设置了 4 个不同的 try 语句来创建动态框架。这是粘合作业文件 (song_data.py):

from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark import SQLContext
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from configparser import ConfigParser
from pyspark import SparkConf

config = ConfigParser()
config.read_file(open('/app/config/aws.cfg'))

conf = (
    SparkConf()
        .set('spark.hadoop.fs.s3a.access.key', config.get('AWS', 'KEY'))
        .set('spark.hadoop.fs.s3a.secret.key', config.get('AWS', 'SECRET'))
        .set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
)

sc = SparkContext(conf=conf)
spark = SparkSession(sc)
glueContext = GlueContext(spark)

conf_dict = spark.sparkContext.getConf().getAll()
print(conf_dict)

try:
    print('Attempt 1: spark.read.json')
    url = 's3a://sparkify-dend-analytics/song_data/A/A/A/TRAAAAW128F429D538.json'
    spark.read.json(url).show(1)
except Exception as e:
    print(e)

try:
    print('Attempt 2: create_dynamic_frame.from_options')
    song_df = glueContext.create_dynamic_frame.from_options(
            connection_type='s3',
            connection_options={"paths": [ "s3a://sparkify-dend-analytics/song_data/"]},
            format='json')

    print ('Count: ', song_df.count())
    print('Schema: ')
    song_df.printSchema()
except Exception as e:
    print(e)

try:
    print('Attempt 3: create_dynamic_frame.from_catalog')
    song_df = glueContext.create_dynamic_frame.from_catalog(
            database='sparkify',
            table_name='song_data')

    print ('Count: ', song_df.count())
    print('Schema: ')
    song_df.printSchema()
except Exception as e:
    print(e)

try:
    print('Attempt 4: create_dynamic_frame_from_catalog')
    song_df = glueContext.create_dynamic_frame_from_catalog(
            database='sparkify',
            table_name='song_data')

    print ('Count: ', song_df.count())
    print('Schema: ')
    song_df.printSchema()
except Exception as e:
    print(e)
Run Code Online (Sandbox Code Playgroud)

我用来运行粘合作业的命令是:gluesparksubmit glue_etl_scripts/song_data.py --JOB-NAME test。以下是每个 try 语句的错误输出的简短版本:

尝试1:spark.read.json()

WARN FileStreamSink: Error while looking for metadata directory.
An error occurred while calling o87.json.
: org.apache.hadoop.fs.s3a.AWSClientIOException: doesBucketExist on sparkify-dend-analytics: 
com.amazonaws.AmazonClientException: No AWS Credentials provided by 
DefaultAWSCredentialsProviderChain : com.amazonaws.SdkClientException: Unable to load AWS 
credentials from any provider in the chain: [EnvironmentVariableCredentialsProvider: Unable to 
load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and 
AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)), SystemPropertiesCredentialsProvider: Unable to load 
AWS credentials from Java system properties (aws.accessKeyId and aws.secretKey), 
WebIdentityTokenCredentialsProvider: You must specify a value for roleArn and roleSessionName,
 com.amazonaws.auth.profile.ProfileCredentialsProvider@401a5902: profile file cannot be null, 
com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper@2b6e2cf9: Failed to connect to service 
endpoint: ]: No AWS Credentials provided by DefaultAWSCredentialsProviderChain : 
com.amazonaws.SdkClientException: Unable to load AWS credentials from any provider in the chain: 
[EnvironmentVariableCredentialsProvider: Unable to load AWS credentials from environment 
variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)),
 SystemPropertiesCredentialsProvider: Unable to load AWS credentials from Java system properties
 (aws.accessKeyId and aws.secretKey), WebIdentityTokenCredentialsProvider: You must specify a 
value for roleArn and roleSessionName, 
com.amazonaws.auth.profile.ProfileCredentialsProvider@401a5902: profile file cannot be null, 
com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper@2b6e2cf9: Failed to connect to service 
endpoint: ]
Run Code Online (Sandbox Code Playgroud)

尝试2:create_dynamic_frame.from_options()

WARN InstanceMetadataServiceResourceFetcher: Fail to retrieve token 
com.amazonaws.SdkClientException: Failed to connect to service endpoint: 
....
Caused by: java.net.ConnectException: Connection refused (Connection refused)
....
An error occurred while calling o125.getDynamicFrame.
: org.apache.hadoop.fs.s3a.AWSClientIOException: (same AWSClientIOException as above) 
..... 
Caused by: com.amazonaws.SdkClientException: Unable to load AWS credentials from any provider in the chain:
Run Code Online (Sandbox Code Playgroud)

尝试3:create_dynamic_frame.from_catalog()

WARN InstanceMetadataServiceResourceFetcher: Fail to retrieve token 
com.amazonaws.SdkClientException: Failed to connect to service endpoint: 
.....
Caused by: java.net.ConnectException: Connection refused (Connection refused)
Run Code Online (Sandbox Code Playgroud)

尝试4:create_dynamic_frame_from_catalog()

与尝试3相同

当我打印出 Spark 会话的配置字典时,AWS 访问权限和密钥是有效的。这是运行时打印的 Spark 配置字典spark.sparkContext.getConf().getAll()

[('spark.app.name', 'song_data.py'), ('spark.driver.host', '73d3647fdf5b'), 
('spark.hadoop.fs.s3a.secret.key', 'xxxxxxx'), ('spark.submit.pyFiles', '/glue/aws-glue-
libs/PyGlue.zip'), ('spark.executor.id', 'driver'), ('spark.driver.extraClassPath', '/glue/aws-
glue-libs/jarsv1/*'), ('spark.app.id', 'local-1593063861647'), ('spark.driver.port', '40655'), 
('spark.executor.extraClassPath', '/glue/aws-glue-libs/jarsv1/*'), ('spark.rdd.compress', 
'True'), ('spark.hadoop.fs.s3a.access.key', 'xxxxxxx'), ('spark.files', 'file:///glue/aws-glue-
libs/PyGlue.zip'), ('spark.serializer.objectStreamReset', '100'), ('spark.master', 'local[*]'), 
('spark.submit.deployMode', 'client'), ('fs.s3.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')]
Run Code Online (Sandbox Code Playgroud)

如果需要 Dockerfile 或任何其他代码,请告诉我。