通过线程工作时出现错误“解释器关闭后无法安排新的 future”

Hon*_*ake 9 multithreading amazon-s3 python-3.x boto3

我有一个大问题,三天内我自己都无法解决。我们有一个应用程序,可以创建 Json 文件并通过 Boto3 库将它们发送到 Amazon S3 服务器。该应用程序是在 Python 3.8 上开发的,没有任何问题。然后Python升级到3.9+,问题就出现了。我们需要在这个应用程序中使用线程,因此我们为其创建了一个新类:

class NewThread(Thread):
    def __init__(self, name):
        Thread.__init__(self)
        self.name = name

    def run(self):
        global i, listings
        if self.name=='control':
            # here is control-thread. Code removed for this example
            while True:
                time.sleep(10)
        else:
            i += 1
            print(f'Thread {self.name} works on {files[i]}')
            try:
                create_file(files[i])
                move_file(c.root+f'json/{files[i].replace(".", "-")}.json', 's3folder')
            except Exception as e:
                get_exception(e)
Run Code Online (Sandbox Code Playgroud)

函数create_file()又长又无聊。它创建一个大小为 20-25kb 的 json 文件,并且在其中没有使用任何困难。然后,必须使用函数将文件移动到 S3 move_file()。这是代码:

# Function for moving files to s3 bucket
def move_file(file, path, bucket=c.s3cfg['bucket'], folder=c.s3cfg['folder']):
    s3 = boto3.client('s3', aws_access_key_id=c.s3cfg['access_key'], aws_secret_access_key=c.s3cfg['secret_key'])
    name = file.split('/')
    name = folder + '/' + path + '/' + name[len(name) - 1]
    try:
        s3.upload_file(file, bucket, name)
        os.remove(file)
    except Exception as e:
        get_exception(e)
Run Code Online (Sandbox Code Playgroud)

线程由此开始:

def start_thread(count=5):
    NewThread(name='control').start()
    for i in range(count):
        name = f'thread_{i+1}'
        threads[name] = NewThread(name=name)
        threads[name].start()
        time.sleep(0.5)
Run Code Online (Sandbox Code Playgroud)

这是错误消息:

口译员关闭后无法安排新的未来;地点:脚本.py;线路:49;

此行链接到s3.upload_file(file, bucket, name)代码中。但这个错误并不是每次都会出现。有时它可以在开始错误之前向服务器发送一些文件。Boto3 在单独的非线程脚本中甚至与move_file()函数都可以很好地工作。这段代码在 Python 3.8 上运行良好。看起来有一些全局变量 shutdown 被设置到True工作过程中的某个位置。请帮助我理解。

use*_*751 12

我偶然发现了完全相同的问题,而且不是 BOTO3 的问题。移动平均维数:

import threading
import boto3

class worker (threading.Thread):
    terminate = False
    def __init__(self):
        threading.Thread.__init__(self)
    def run(self):
        # make BOTO3 CLIENT
        s3_client = boto3.client(...)
        while not self.terminate:
            # BOTO3 downloads from global list, not shown in this code
            s3_client = boto3.download_file(...)
    def stop(self):
        self.terminate = True

mythread = worker()
mythread.start()
# **************** THIS IS IMPORTANT
mythread.join()
# **************** /THIS IS IMPORTANT
Run Code Online (Sandbox Code Playgroud)

您的错误很可能是您没有在主线程中等待其他线程的完成。BOTO3 的操作可能需要主线程的资源。

添加mythread.join()为我解决了它。