Flask - 作业不作为后台进程运行

dat*_*den 6 python flask flask-sqlalchemy concurrent.futures

我正在尝试运行一个Flask包含以下内容的应用:

  1. 即时生成API请求
  2. 将每个请求上载到SQLalchemy数据库
  3. 运行作业12作为后台进程

为此,我有以下代码:

from flask import Flask
from flask import current_app
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
import queue

app = Flask(__name__)
q = queue.Queue()

def build_cache(): 
    # 1. Yielding API requests on the fly
    track_and_features = spotify.query_tracks() # <- a generator
    while True:
        q.put(next(track_and_features))


def upload_cache(tracks_and_features):
    # 2. Uploading each request to a `SQLalchemy` database
    with app.app_context():      
        Upload_Tracks(filtered_dataset=track_and_features)

    return "UPLOADING TRACKS TO DATABASE" 


@app.route('/cache')
def cache():
    # 3. Do `1` and `2` as a background process
    with concurrent.futures.ThreadPoolExecutor() as executor:

        future_to_track = {executor.submit(build_cache): 'TRACKER DONE'}

        while future_to_track:
            # check for status of the futures which are currently working
            done, not_done = concurrent.futures.wait(
                                                future_to_track, 
                                                timeout=0.25,
                                                return_when=concurrent.futures.FIRST_COMPLETED) 

            # if there is incoming work, start a new future
            while not q.empty():

                # fetch a track from the queue
                track = q.get()

                # Start the load operation and mark the future with its TRACK
                future_to_track[executor.submit(upload_cache, track)] = track
            # process any completed futures
            for future in done:
                track = future_to_track[future]
                try:
                    data = future.result()
                except Exception as exc:
                    print('%r generated an exception: %s' % (track, exc))

                del future_to_track[future]

    return 'Cacheing playlist in the background...'
Run Code Online (Sandbox Code Playgroud)

所有上述工作,但不作为背景过程.应用程序在cache()调用时挂起,仅在完成此过程后才会恢复.

我用它来运行它 gunicorn -c gconfig.py app:app -w 4 --threads 12

我究竟做错了什么?


编辑:如果按顺序简化,请调试这个,并简单地写:

# 1st background process
def build_cache():
    # only ONE JOB
    tracks_and_features = spotify.query_tracks() # <- not a generator               
    while True:
         print next(tracks_and_features)

# background cache
@app.route('/cache')
def cache():
    executor.submit(build_cache)
    return 'Cacheing playlist in the background...'
Run Code Online (Sandbox Code Playgroud)

然后该过程在后台运行.

但是,如果我添加另一份工作:

def build_cache():

    tracks_and_features = spotify.query_tracks()
    while True:
        Upload_Tracks(filtered_dataset=next(tracks_and_features) #SQLalchemy db
Run Code Online (Sandbox Code Playgroud)

背景不再起作用.

简而言之:

背景只有在我一次运行一个作业时才有效(这是首先使用队列的想法背后的限制).

似乎问题是将后台进程绑定到SQLalchemy,不知道.完全迷失在这里.

Jam*_*Lim 4

仍然不确定你的意思

我的意思是应用程序等待登录时发出的所有请求,然后才转到主页。它应该立即进入主页并在后台发出请求

这里有几个问题:

  • 您的队列对于进程来说是全局的,即每个gunicorn工作人员只有一个队列;您可能希望队列绑定到您的请求,以便多个请求不会共享内存中的同一队列。考虑使用上下文局部变量
  • 如果UploadTracks正在写入数据库,则表上可能存在锁。检查索引并检查数据库中的锁等待。
  • SQLAlchemy 可能配置了一个小型连接池,第二个UploadTracks连接池正在等待第一个连接池返回其连接。

在第一个示例中,端点在返回之前等待所有 future 完成,而在第二个示例中,端点在将任务提交给执行器后立即返回。如果您希望 Flask 在任务仍在后台线程中运行时快速响应,请删除with concurrent.futures.ThreadPoolExecutor() as executor:并在模块顶部构造一个全局线程池。

使用with,上下文管理器会在退出之前等待所有提交的任务,但我不确定这是否是您的主要问题。