小编use*_*881的帖子

多处理 - 生产者/消费者设计

我正在使用多处理模块来分割一个非常大的任务.它在大多数情况下都有效,但我必须在设计中遗漏一些明显的东西,因为这样我很难有效地分辨出所有数据的处理时间.

我有两个独立的任务; 一个喂另一个.我想这是生产者/消费者的问题.我在所有进程之间使用共享队列,生成器填充队列,消费者从队列中读取并进行处理.问题在于数据量有限,因此在某些时候每个人都需要知道所有数据都已处理完毕,因此系统可以正常关闭.

使用map_async()函数似乎是有意义的,但由于生产者正在填充队列,我不知道前面的所有项目,所以我必须进入while循环并使用apply_async()并尝试检测何时完成某些事情......丑陋.

我觉得我错过了一些明显的东西.如何更好地设计?

PRODCUER

class ProducerProcess(multiprocessing.Process):
    def __init__(self, item, consumer_queue):
        self.item = item
        self.consumer_queue = consumer_queue
        multiprocessing.Process.__init__(self)

    def run(self):
        for record in get_records_for_item(self.item): # this takes time
            self.consumer_queue.put(record)

def start_producer_processes(producer_queue, consumer_queue, max_running):
    running = []

    while not producer_queue.empty():
        running = [r for r in running if r.is_alive()]
        if len(running) < max_running:
            producer_item = producer_queue.get()
            p = ProducerProcess(producer_item, consumer_queue)
            p.start()
            running.append(p)
        time.sleep(1)
Run Code Online (Sandbox Code Playgroud)

消费者

def process_consumer_chunk(queue, chunksize=10000):
    for i in xrange(0, chunksize):
        try:
            # don't wait too long for an …
Run Code Online (Sandbox Code Playgroud)

python multiprocessing

6
推荐指数
1
解决办法
3086
查看次数

Flask SQLAlchemy 设置动态 URI

我有一个在 WSGI 下提供的 Flask 应用程序,其中数据库 URI 会随时间变化。URI 每两小时切换到另一个数据库。我利用这段时间填充一个数据库,而另一个数据库则为应用程序提供数据。

我很难弄清楚如何最好地配置会话,以便在切换发生时,客户端将在下一个请求中获得正确的(不同的)数据库。根据我的测试,如果我在顶层初始化数据库,当切换发生时,客户端仍然指向旧数据库。

我想过在页面(索引等)内部设置会话,但是太痛苦了,然后我担心打开和关闭太多的数据库连接并让它们闲置。我想我可以通过在启动时初始化两个会话来使其工作,然后只需选择在每个页面内的切换时使用哪个会话。这似乎效率低下,我相信有更好的方法。

帮助?!

~~~~~~~~~

这是我目前正在做的事情的一般想法,但无法更改请求之间的 URI。顶级代码只运行一次,或者每隔一段时间运行一次。

if now.hour % 2:
    db_name = 'db1'
else:
    db_name = 'db2'

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = "mysql://foo:poo@localhost:3306/%s" % db_name

def init_db(uri, **kwargs):
    engine = create_engine(uri, **kwargs)

    Base.metadata.create_all(bind=engine)

    global db_session
    db_session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine))
    Base.query = db_session.query_property()


init_db(app.config['SQLALCHEMY_DATABASE_URI'], pool_recycle=3600)

@app.teardown_request
def shutdown_session(exception=None):
    db_session.remove()

@app.route('/')
def index():
    ...etc...
Run Code Online (Sandbox Code Playgroud)

工作示例 - 美丽。

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = "mysql://foo:poo@localhost:3306/%s"

class SessionManager(object):
    def __init__(self, base_uri=None, **kwargs):
        self.session = None
        self.base_uri …
Run Code Online (Sandbox Code Playgroud)

python uri sqlalchemy flask

5
推荐指数
1
解决办法
3875
查看次数

标签 统计

python ×2

flask ×1

multiprocessing ×1

sqlalchemy ×1

uri ×1