Hon*_*ake 9 multithreading amazon-s3 python-3.x boto3
我有一个大问题,三天内我自己都无法解决。我们有一个应用程序,可以创建 Json 文件并通过 Boto3 库将它们发送到 Amazon S3 服务器。该应用程序是在 Python 3.8 上开发的,没有任何问题。然后Python升级到3.9+,问题就出现了。我们需要在这个应用程序中使用线程,因此我们为其创建了一个新类:
class NewThread(Thread):
def __init__(self, name):
Thread.__init__(self)
self.name = name
def run(self):
global i, listings
if self.name=='control':
# here is control-thread. Code removed for this example
while True:
time.sleep(10)
else:
i += 1
print(f'Thread {self.name} works on {files[i]}')
try:
create_file(files[i])
move_file(c.root+f'json/{files[i].replace(".", "-")}.json', 's3folder')
except Exception as e:
get_exception(e)
Run Code Online (Sandbox Code Playgroud)
函数create_file()
又长又无聊。它创建一个大小为 20-25kb 的 json 文件,并且在其中没有使用任何困难。然后,必须使用函数将文件移动到 S3 move_file()
。这是代码:
# Function for moving files to s3 bucket
def move_file(file, path, bucket=c.s3cfg['bucket'], folder=c.s3cfg['folder']):
s3 = boto3.client('s3', aws_access_key_id=c.s3cfg['access_key'], aws_secret_access_key=c.s3cfg['secret_key'])
name = file.split('/')
name = folder + '/' + path + '/' + name[len(name) - 1]
try:
s3.upload_file(file, bucket, name)
os.remove(file)
except Exception as e:
get_exception(e)
Run Code Online (Sandbox Code Playgroud)
线程由此开始:
def start_thread(count=5):
NewThread(name='control').start()
for i in range(count):
name = f'thread_{i+1}'
threads[name] = NewThread(name=name)
threads[name].start()
time.sleep(0.5)
Run Code Online (Sandbox Code Playgroud)
这是错误消息:
口译员关闭后无法安排新的未来;地点:脚本.py;线路:49;
此行链接到s3.upload_file(file, bucket, name)
代码中。但这个错误并不是每次都会出现。有时它可以在开始错误之前向服务器发送一些文件。Boto3 在单独的非线程脚本中甚至与move_file()
函数都可以很好地工作。这段代码在 Python 3.8 上运行良好。看起来有一些全局变量 shutdown 被设置到True
工作过程中的某个位置。请帮助我理解。
use*_*751 12
我偶然发现了完全相同的问题,而且不是 BOTO3 的问题。移动平均维数:
import threading
import boto3
class worker (threading.Thread):
terminate = False
def __init__(self):
threading.Thread.__init__(self)
def run(self):
# make BOTO3 CLIENT
s3_client = boto3.client(...)
while not self.terminate:
# BOTO3 downloads from global list, not shown in this code
s3_client = boto3.download_file(...)
def stop(self):
self.terminate = True
mythread = worker()
mythread.start()
# **************** THIS IS IMPORTANT
mythread.join()
# **************** /THIS IS IMPORTANT
Run Code Online (Sandbox Code Playgroud)
您的错误很可能是您没有在主线程中等待其他线程的完成。BOTO3 的操作可能需要主线程的资源。
添加mythread.join()
为我解决了它。
归档时间: |
|
查看次数: |
14473 次 |
最近记录: |