我想要一个长时间运行的进程来返回它在队列(或类似的东西)上的进度,我将把它提供给进度条对话框.完成该过程后,我还需要结果.这里的测试示例失败了RuntimeError: Queue objects should only be shared between processes through inheritance.
import multiprocessing, time
def task(args):
count = args[0]
queue = args[1]
for i in xrange(count):
queue.put("%d mississippi" % i)
return "Done"
def main():
q = multiprocessing.Queue()
pool = multiprocessing.Pool()
result = pool.map_async(task, [(x, q) for x in range(10)])
time.sleep(1)
while not q.empty():
print q.get()
print result.get()
if __name__ == "__main__":
main()
Run Code Online (Sandbox Code Playgroud)
我已经能够得到这个使用单个进程对象的工作(在这里我很 alowed传递一个队列引用),但是我没有一个池来管理许多流程我要启动.有关更好的模式的建议吗?
我很想听到有关MongoDB作为队列服务的真实应用程序体验的更多信息,如果您为此目的使用MongoDB,您可以分享您的想法,以及它的使用环境吗?
我在集群系统上有几个并行运行的线程.每个python线程输出到一个目录mydir.每个脚本在输出检查之前是否存在mydir,如果不存在则创建它:
if not os.path.isdir(mydir):
os.makedirs(mydir)
Run Code Online (Sandbox Code Playgroud)
但这会产生错误:
os.makedirs(self.log_dir)
File "/usr/lib/python2.6/os.py", line 157, in makedirs
mkdir(name,mode)
OSError: [Errno 17] File exists
Run Code Online (Sandbox Code Playgroud)
我怀疑这可能是由于竞争条件,一个工作在另一个工作之前创建了dir.这可能吗?如果是这样,如何避免这种错误?
我不确定这是一个竞争条件,所以想知道Python中的其他问题是否会导致这个奇怪的错误.
我在Laravel上php artisan queue:listen用来运行排队的工作.其中一个工作相当复杂,需要很长时间,因此我收到以下错误:
[Symfony\Component\Process\Exception\ProcessTimedOutException]
The process ""/usr/local/Cellar/php55/5.5.14/bin/php" artisan queue:work
--queue="QUEUE_URL" --delay=0 --memory=128 --sleep=3 --tries=0"
exceeded the timeout of 60 seconds.
Run Code Online (Sandbox Code Playgroud)
我知道我可以运行queue:listen一个任意高的超时值,但是这不是理想的,因为我不希望它的活动时间,一些过程实际上 unreseponsive.我尝试set_time_limit(60)在作业调用的函数内定期调用,但这并没有解决我的问题.
我找到了一个在线提及的线程Symfony\Component\Process\Process->setTimeout(null),但我不知道如何访问该进程对象,或者如果这甚至可以解决问题.
任何帮助将非常感激.
我注意到您可以调用Queue.Synchronize来获取线程安全的队列对象,但Queue <T>上没有相同的方法.有谁知道为什么?看起来有点奇怪.
我有一个典型的生产者 - 消费者问题:
多个生产者应用程序将作业请求写入PostgreSQL数据库上的作业表.
作业请求的状态字段在创建时包含QUEUED.
有多个由当生产者插入一条新记录的规则通知的消费应用:
CREATE OR REPLACE RULE "jobrecord.added" AS
ON INSERT TO jobrecord DO
NOTIFY "jobrecordAdded";
Run Code Online (Sandbox Code Playgroud)
他们将尝试通过将其状态设置为RESERVED来保留新记录.当然,只有消费者才能成功.所有其他消费者不应该保留相同的记录.他们应该保留state = QUEUED的其他记录.
示例:某个生产者将以下记录添加到表jobrecord:
id state owner payload
------------------------
1 QUEUED null <data>
2 QUEUED null <data>
3 QUEUED null <data>
4 QUEUED null <data>
Run Code Online (Sandbox Code Playgroud)
现在,两个消费者A,B想要处理它们.他们同时开始跑步.一个应该保留id 1,另一个应该保留id 2,然后完成的第一个应该保留id 3等等.
在纯多线程世界中,我会使用互斥锁来控制对作业队列的访问,但消费者是可以在不同机器上运行的不同进程.它们只访问同一个数据库,因此所有同步都必须通过数据库进行.
我在PostgreSQL中阅读了很多关于并发访问和锁定的文档,例如http://www.postgresql.org/docs/9.0/interactive/explicit-locking.html 选择Postgresql PostgreSQL中的解锁行并锁定
从这些主题中我了解到,以下SQL语句应该能够满足我的需求:
UPDATE jobrecord
SET owner= :owner, state = :reserved
WHERE id = (
SELECT id from jobrecord …Run Code Online (Sandbox Code Playgroud) 在C#中,如何检查队列是否为空?
我想遍历Queue的元素,我需要知道何时停止.我怎么能做到这一点?
我正在寻找C/CPP中工作窃取队列的正确实现.我环顾了谷歌,但没有找到任何有用的东西.
也许有人熟悉一个好的开源实现?(我不想实施原始学术论文中的伪代码).
我正在编写一个iPhone应用程序,我很惊讶Apple的Foundation Framework中似乎没有NSQueue或NSStack类.我看到从NSMutableArray开始自己滚动很容易,所以除非我错过了什么,否则我会这样做.我错过了什么吗?
我正在试图弄清楚如何移植一个线程程序来使用asyncio.我有很多代码可以围绕几个标准库同步Queues,基本上是这样的:
import queue, random, threading, time
q = queue.Queue()
def produce():
while True:
time.sleep(0.5 + random.random()) # sleep for .5 - 1.5 seconds
q.put(random.random())
def consume():
while True:
value = q.get(block=True)
print("Consumed", value)
threading.Thread(target=produce).start()
threading.Thread(target=consume).start()
Run Code Online (Sandbox Code Playgroud)
一个线程创建值(可能是用户输入),另一个线程用它们做某事.关键是这些线程在有新数据之前一直处于空闲状态,此时它们会唤醒并对其执行某些操作.
我正在尝试使用asyncio实现这种模式,但我似乎无法弄清楚如何让它"去".
我的尝试看起来或多或少都像这样(并且根本不做任何事情).
import asyncio, random
q = asyncio.Queue()
@asyncio.coroutine
def produce():
while True:
q.put(random.random())
yield from asyncio.sleep(0.5 + random.random())
@asyncio.coroutine
def consume():
while True:
value = yield from q.get()
print("Consumed", value)
# do something here to start the coroutines. asyncio.Task()? …Run Code Online (Sandbox Code Playgroud)