Vas*_*kas 5 amazon-s3 boto pyspark moto
我正在尝试对将数据写入S3,然后从同一S3位置读取相同数据的函数进行单元测试。我正在尝试使用moto
和boto
(2.x)来实现[1]。问题是该服务返回我被禁止访问密钥[2]的信息。moto github存储库中报告了类似的问题(即使错误消息有所不同)[3],但尚未解决。
有没有人成功地在PySpark中测试过模拟的s3读写,以分享一些见解?
[1]
import boto
from boto.s3.key import Key
from moto import mock_s3
_test_bucket = 'test-bucket'
_test_key = 'data.csv'
@pytest.fixture(scope='function')
def spark_context(request):
conf = SparkConf().setMaster("local[2]").setAppName("pytest-pyspark-local-testing")
sc = SparkContext(conf=conf)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", 'test-access-key-id')
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", 'test-secret-access-key')
request.addfinalizer(lambda: sc.stop())
quiet_py4j(sc)
return sc
spark_test = pytest.mark.usefixtures("spark_context")
@spark_test
@mock_s3
def test_tsv_read_from_and_write_to_s3(spark_context):
spark = SQLContext(spark_context)
s3_conn = boto.connect_s3()
s3_bucket = s3_conn.create_bucket(_test_bucket)
k = Key(s3_bucket)
k.key = _test_key
k.set_contents_from_string('')
s3_uri = 's3n://{}/{}'.format(_test_bucket, _test_key)
df = (spark
.read
.csv(s3_uri))
Run Code Online (Sandbox Code Playgroud)
[2]
(...)
E py4j.protocol.Py4JJavaError: An error occurred while calling o33.csv.
E : org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.S3ServiceException: S3 HEAD request failed for '/data.csv' - ResponseCode=403, ResponseMessage=Forbidden
(...)
Run Code Online (Sandbox Code Playgroud)
moto是一个用于模拟aws 资源的库。
1.创建资源:
如果您尝试访问不存在的 S3 存储桶,aws 将返回一个Forbidden error
.
通常,我们甚至需要在测试运行之前创建这些资源。因此,创建一个 pytest 夹具并将autouse
其设置为True
import pytest
import boto3
from moto import mock_s3
@pytest.fixture(autouse=True)
def fixture_mock_s3():
with mock_s3():
conn = boto3.resource('s3', region_name='us-east-1')
conn.create_bucket(Bucket='MYBUCKET') # an empty test bucket is created
yield
Run Code Online (Sandbox Code Playgroud)
- 上面的代码创建了一个名为“MUBUCKET”的模拟 s3 存储桶。桶是空的。
- 存储桶的名称应与原始存储桶的名称相同。
- 使用
autouse
,夹具在测试中自动可用。- 您可以放心地运行测试,因为您的测试将无法访问原始存储桶。
2. 定义并运行涉及资源的测试:
假设您有将文件写入 S3 存储桶的代码
def write_to_s3(filepath: str):
s3 = boto3.resource('s3', region_name='us-east-1')
s3.Bucket('MYBUCKET').upload_file(filepath, 'A/B/C/P/data.txt')
Run Code Online (Sandbox Code Playgroud)
可以通过以下方式进行测试:
from botocore.errorfactory import ClientError
def test_write_to_s3():
dummy_file_path = f"{TEST_DIR}/data/dummy_data.txt"
# The s3 bucket is created by the fixture and not lies empty
# test for emptiness
s3 = boto3.resource('s3', region_name='us-east-1')
bucket = s3.Bucket("MYBUCKET")
objects = list(bucket.objects.filter(Prefix="/"))
assert objects == []
# Now, lets write a file to s3
write_to_s3(dummy_file_path)
# the below assert statement doesn't throw any error
assert s3.head_object(Bucket='MYBUCKET', Key='A/B/C/P/data.txt')
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
675 次 |
最近记录: |