使用 RQ 工作线程、队列和作业时管理路径的正确方法是什么

Sta*_*eIT 5 python sqlalchemy task-queue python-rq

我的第一个问题/帖子...请善待....

我正在做一个个人项目,其中一个模块循环运行以收集数据。当数据进入时,它会将数据插入数据库交给队列中的一个函数,在那里一个侦听 rq 工作器接收它并处理该函数。数据库使用 SQLAlchemy 进行管理,这意味着它必须生成引擎、会话并定义数据库表。

代码文件的结构是:

--/home/..../collect-view/  (this is the project folder)
    -- DataCollection
        -- main_client.py  (main loop waiting for user data)
        -- collect_data.py (contains the database insertion function)
        -- base.py         (the base file for SQLAlchemy database definition)
        -- tables.py       (the file which sets up the table name and definition)
    -- app.db                  (the database file)
Run Code Online (Sandbox Code Playgroud)

注意:数据库文件位于更高级别的目录中,因为它也被另一个位于此级别的应用程序(Flask 应用程序)访问

要实现此代码,“collect_data”必须导入“base”和“tables”,“tables”必须导入“base”。这被证明是一个问题,因为一旦 collect_data 函数(称为“传输”)由工作人员运行,它就无法再找到要导入的文件,并且工作人员会吐出一个异常,说它无法导入“base ”。我在网上搜索了答案,最终在 nvie 的 Github 上找到了一个答案,其中提到使用 --path 选项将工作人员引导到正确的路径。我通过实现它来工作:

$ rq worker rq_worker_data2db --path /home/../../collect_view/DataCollection
Run Code Online (Sandbox Code Playgroud)

然后我遇到了另一个与路径相关的失败,其中工作人员说它找不到我试图将数据插入的数据库表。所以我更改了引擎创建步骤以包含我的完整路径......

base_url = '/home/.../collect_view/'
engine = create_engine ('sqlite:///' + base_url + 'app.db')
Run Code Online (Sandbox Code Playgroud)

这个问题让我更加困惑,因为我的工作人员已经在我的 DataCollection 目录中工作,所以我认为 ('sqlite:///../app.db') 将是定位数据库的正确方法(因为它在没有 rq 工作人员的情况下进行测试)。

所以,经过长时间的解释,我的问题是:在这种情况下管理路径的正确方法是什么?对我来说,我必须使用 /home 的完整路径似乎是错误的......我是否遗漏了关于路径和/或 rq worker(和类似的)如何工作的信息?

从我的代码文件中摘录如下:

main_client.py

from redis import Redis
import rq
from collect_data import transfer

redis_url = Redis.from_url('redis://')  #(config['REDIS_URL'])
queue = rq.Queue('rq_worker-data2db', connection=redis_url)

#.....
#.....

def have_data(data):

    rq_job = queue.enqueue('collect_data.transfer', data)

#.....
#.....
Run Code Online (Sandbox Code Playgroud)

收集数据.py

from base import Session, engine, Base
from tables import FieldData
import time
from datetime import datetime

def transfer(info):
    timestamp_in = datetime.utcnow()
    session = Session()
    data1 = FieldData(data=info, timestamp=timestamp_in)
    session.add(data1)
    session.commit()
Run Code Online (Sandbox Code Playgroud)

基础文件

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
base_url = '/home/.../collect_view/'
engine = create_engine ('sqlite:///' + base_url + 'app.db')
Session = sessionmaker(bind=engine)
Base = declarative_base()
Run Code Online (Sandbox Code Playgroud)

表.py

from sqlalchemy import  Column, String, Float, Integer, Date, DateTime, Table, ForeignKey
from base import Base
from datetime import datetime
# .....
#.....
class FieldData(Base):
    __tablename__ = 'field_data'
    id = Column(Integer, primary_key=True)
    data = Column(String(20))
    timestamp = Column(DateTime, index=True, default=datetime.utcnow)

    def __init__(self, data, timestamp):
        self.data = data
        self.timestamp = timestamp
Run Code Online (Sandbox Code Playgroud)

在终端中,我首先运行 Redis,然后使用以下命令运行工作程序:

$ rq worker rq_worker_data2db --path /home/../../collect_view/DataCollection (其中 rq_worker_data2db 是工作人员名称)