我正在使用多处理模块来分割一个非常大的任务.它在大多数情况下都有效,但我必须在设计中遗漏一些明显的东西,因为这样我很难有效地分辨出所有数据的处理时间.
我有两个独立的任务; 一个喂另一个.我想这是生产者/消费者的问题.我在所有进程之间使用共享队列,生成器填充队列,消费者从队列中读取并进行处理.问题在于数据量有限,因此在某些时候每个人都需要知道所有数据都已处理完毕,因此系统可以正常关闭.
使用map_async()函数似乎是有意义的,但由于生产者正在填充队列,我不知道前面的所有项目,所以我必须进入while循环并使用apply_async()并尝试检测何时完成某些事情......丑陋.
我觉得我错过了一些明显的东西.如何更好地设计?
PRODCUER
class ProducerProcess(multiprocessing.Process):
def __init__(self, item, consumer_queue):
self.item = item
self.consumer_queue = consumer_queue
multiprocessing.Process.__init__(self)
def run(self):
for record in get_records_for_item(self.item): # this takes time
self.consumer_queue.put(record)
def start_producer_processes(producer_queue, consumer_queue, max_running):
running = []
while not producer_queue.empty():
running = [r for r in running if r.is_alive()]
if len(running) < max_running:
producer_item = producer_queue.get()
p = ProducerProcess(producer_item, consumer_queue)
p.start()
running.append(p)
time.sleep(1)
Run Code Online (Sandbox Code Playgroud)
消费者
def process_consumer_chunk(queue, chunksize=10000):
for i in xrange(0, chunksize):
try:
# don't wait too long for an …Run Code Online (Sandbox Code Playgroud) 我有一个在 WSGI 下提供的 Flask 应用程序,其中数据库 URI 会随时间变化。URI 每两小时切换到另一个数据库。我利用这段时间填充一个数据库,而另一个数据库则为应用程序提供数据。
我很难弄清楚如何最好地配置会话,以便在切换发生时,客户端将在下一个请求中获得正确的(不同的)数据库。根据我的测试,如果我在顶层初始化数据库,当切换发生时,客户端仍然指向旧数据库。
我想过在页面(索引等)内部设置会话,但是太痛苦了,然后我担心打开和关闭太多的数据库连接并让它们闲置。我想我可以通过在启动时初始化两个会话来使其工作,然后只需选择在每个页面内的切换时使用哪个会话。这似乎效率低下,我相信有更好的方法。
帮助?!
~~~~~~~~~
这是我目前正在做的事情的一般想法,但无法更改请求之间的 URI。顶级代码只运行一次,或者每隔一段时间运行一次。
if now.hour % 2:
db_name = 'db1'
else:
db_name = 'db2'
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = "mysql://foo:poo@localhost:3306/%s" % db_name
def init_db(uri, **kwargs):
engine = create_engine(uri, **kwargs)
Base.metadata.create_all(bind=engine)
global db_session
db_session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine))
Base.query = db_session.query_property()
init_db(app.config['SQLALCHEMY_DATABASE_URI'], pool_recycle=3600)
@app.teardown_request
def shutdown_session(exception=None):
db_session.remove()
@app.route('/')
def index():
...etc...
Run Code Online (Sandbox Code Playgroud)
工作示例 - 美丽。
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = "mysql://foo:poo@localhost:3306/%s"
class SessionManager(object):
def __init__(self, base_uri=None, **kwargs):
self.session = None
self.base_uri …Run Code Online (Sandbox Code Playgroud)