Emm*_*day 4 python boto3 pyspark
我正在尝试使用 boto3 与 AWS 对话,将数据从 Pyspark RDD 的工作人员发送到 SQS 队列。我需要直接从分区发送数据,而不是收集 RDD 并从驱动程序发送数据。
我可以通过本地的 boto3 和 Spark 驱动程序向 SQS 发送消息;此外,我可以导入 boto3 并在分区上创建 boto3 会话。但是,当我尝试从分区创建客户端或资源时,我收到错误消息。我相信 boto3 没有正确创建客户端,但我不完全确定这一点。我的代码如下所示:
def get_client(x): #the x is required to use pyspark's mapPartitions
import boto3
client = boto3.client('sqs', region_name="us-east-1", aws_access_key_id="myaccesskey", aws_secret_access_key="mysecretaccesskey")
return x
rdd_with_client = rdd.mapPartitions(get_client)
Run Code Online (Sandbox Code Playgroud)
错误:
DataNotFoundError: Unable to load data for: endpoints
Run Code Online (Sandbox Code Playgroud)
更长的回溯:
File "<stdin>", line 4, in get_client
File "./rebuilt.zip/boto3/session.py", line 250, in client
aws_session_token=aws_session_token, config=config)
File "./rebuilt.zip/botocore/session.py", line 810, in create_client
endpoint_resolver = self.get_component('endpoint_resolver')
File "./rebuilt.zip/botocore/session.py", line 691, in get_component
return self._components.get_component(name)
File "./rebuilt.zip/botocore/session.py", line 872, in get_component
self._components[name] = factory()
File "./rebuilt.zip/botocore/session.py", line 184, in create_default_resolver
endpoints = loader.load_data('endpoints')
File "./rebuilt.zip/botocore/loaders.py", line 123, in _wrapper
data = func(self, *args, **kwargs)
File "./rebuilt.zip/botocore/loaders.py", line 382, in load_data
raise DataNotFoundError(data_path=name)
DataNotFoundError: Unable to load data for: endpoints
Run Code Online (Sandbox Code Playgroud)
我还尝试修改我的函数以创建资源而不是显式客户端,以查看它是否可以找到并使用默认客户端设置。在这种情况下,我的代码是:
def get_resource(x):
import boto3
sqs = boto3.resource('sqs', region_name="us-east-1", aws_access_key_id="myaccesskey", aws_secret_access_key="mysecretaccesskey")
return x
rdd_with_client = rdd.mapPartitions(get_resource)
Run Code Online (Sandbox Code Playgroud)
我收到一个指向 has_low_level_client 参数的错误,这是因为客户端不存在而触发的;回溯说:
File "/usr/lib/spark/python/pyspark/rdd.py", line 2253, in pipeline_func
File "/usr/lib/spark/python/pyspark/rdd.py", line 270, in func
File "/usr/lib/spark/python/pyspark/rdd.py", line 689, in func
File "<stdin>", line 4, in session_resource
File "./rebuilt.zip/boto3/session.py", line 329, in resource
has_low_level_client)
ResourceNotExistsError: The 'sqs' resource does not exist.
The available resources are:
-
Run Code Online (Sandbox Code Playgroud)
没有可用的资源,因为我认为没有客户可以容纳它们。
几天来我一直在用头撞这个。任何帮助表示赞赏!
这是因为您将 boto3 捆绑包作为 zip 文件。
“./rebuilt.zip/boto3”
boto3 初始化的作用是下载一堆文件并将其保存在分发文件夹中。因为您的 boto3 位于一个 zip 包中,所以显然这些文件将无法到达那里。
解决方案是,与其将 boto3 分发到 zip 文件中,不如将 boto3 安装在 Spark 环境中。在这里要小心,您可能希望在主节点和工作节点上都安装 boto3,这取决于您实现应用程序的方式。安全的赌注是安装在两者上。
如果您使用的是 EMR,则可以使用 bootstrap 步骤来完成。
| 归档时间: |
|
| 查看次数: |
4199 次 |
| 最近记录: |