Sta*_*eIT 5 python sqlalchemy task-queue python-rq
我的第一个问题/帖子...请善待....
我正在做一个个人项目,其中一个模块循环运行以收集数据。当数据进入时,它会将数据插入数据库交给队列中的一个函数,在那里一个侦听 rq 工作器接收它并处理该函数。数据库使用 SQLAlchemy 进行管理,这意味着它必须生成引擎、会话并定义数据库表。
代码文件的结构是:
--/home/..../collect-view/ (this is the project folder)
-- DataCollection
-- main_client.py (main loop waiting for user data)
-- collect_data.py (contains the database insertion function)
-- base.py (the base file for SQLAlchemy database definition)
-- tables.py (the file which sets up the table name and definition)
-- app.db (the database file)
Run Code Online (Sandbox Code Playgroud)
注意:数据库文件位于更高级别的目录中,因为它也被另一个位于此级别的应用程序(Flask 应用程序)访问
要实现此代码,“collect_data”必须导入“base”和“tables”,“tables”必须导入“base”。这被证明是一个问题,因为一旦 collect_data 函数(称为“传输”)由工作人员运行,它就无法再找到要导入的文件,并且工作人员会吐出一个异常,说它无法导入“base ”。我在网上搜索了答案,最终在 nvie 的 Github 上找到了一个答案,其中提到使用 --path 选项将工作人员引导到正确的路径。我通过实现它来工作:
$ rq worker rq_worker_data2db --path /home/../../collect_view/DataCollection
Run Code Online (Sandbox Code Playgroud)
然后我遇到了另一个与路径相关的失败,其中工作人员说它找不到我试图将数据插入的数据库表。所以我更改了引擎创建步骤以包含我的完整路径......
base_url = '/home/.../collect_view/'
engine = create_engine ('sqlite:///' + base_url + 'app.db')
Run Code Online (Sandbox Code Playgroud)
这个问题让我更加困惑,因为我的工作人员已经在我的 DataCollection 目录中工作,所以我认为 ('sqlite:///../app.db') 将是定位数据库的正确方法(因为它在没有 rq 工作人员的情况下进行测试)。
所以,经过长时间的解释,我的问题是:在这种情况下管理路径的正确方法是什么?对我来说,我必须使用 /home 的完整路径似乎是错误的......我是否遗漏了关于路径和/或 rq worker(和类似的)如何工作的信息?
从我的代码文件中摘录如下:
main_client.py
from redis import Redis
import rq
from collect_data import transfer
redis_url = Redis.from_url('redis://') #(config['REDIS_URL'])
queue = rq.Queue('rq_worker-data2db', connection=redis_url)
#.....
#.....
def have_data(data):
rq_job = queue.enqueue('collect_data.transfer', data)
#.....
#.....
Run Code Online (Sandbox Code Playgroud)
收集数据.py
from base import Session, engine, Base
from tables import FieldData
import time
from datetime import datetime
def transfer(info):
timestamp_in = datetime.utcnow()
session = Session()
data1 = FieldData(data=info, timestamp=timestamp_in)
session.add(data1)
session.commit()
Run Code Online (Sandbox Code Playgroud)
基础文件
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
base_url = '/home/.../collect_view/'
engine = create_engine ('sqlite:///' + base_url + 'app.db')
Session = sessionmaker(bind=engine)
Base = declarative_base()
Run Code Online (Sandbox Code Playgroud)
表.py
from sqlalchemy import Column, String, Float, Integer, Date, DateTime, Table, ForeignKey
from base import Base
from datetime import datetime
# .....
#.....
class FieldData(Base):
__tablename__ = 'field_data'
id = Column(Integer, primary_key=True)
data = Column(String(20))
timestamp = Column(DateTime, index=True, default=datetime.utcnow)
def __init__(self, data, timestamp):
self.data = data
self.timestamp = timestamp
Run Code Online (Sandbox Code Playgroud)
在终端中,我首先运行 Redis,然后使用以下命令运行工作程序:
$ rq worker rq_worker_data2db --path /home/../../collect_view/DataCollection (其中 rq_worker_data2db 是工作人员名称)
| 归档时间: |
|
| 查看次数: |
1130 次 |
| 最近记录: |