标签: queue

如何将队列引用传递给pool.map_async()管理的函数?

我想要一个长时间运行的进程来返回它在队列(或类似的东西)上的进度,我将把它提供给进度条对话框.完成该过程后,我还需要结果.这里的测试示例失败了RuntimeError: Queue objects should only be shared between processes through inheritance.

import multiprocessing, time

def task(args):
    count = args[0]
    queue = args[1]
    for i in xrange(count):
        queue.put("%d mississippi" % i)
    return "Done"

def main():
    q = multiprocessing.Queue()
    pool = multiprocessing.Pool()
    result = pool.map_async(task, [(x, q) for x in range(10)])
    time.sleep(1)
    while not q.empty():
        print q.get()
    print result.get()

if __name__ == "__main__":
    main()
Run Code Online (Sandbox Code Playgroud)

我已经能够得到这个使用单个进程对象的工作(在这里我 alowed传递一个队列引用),但是我没有一个池来管理许多流程我要启动.有关更好的模式的建议吗?

python queue pool multiprocessing

37
推荐指数
2
解决办法
2万
查看次数

MongoDB作为队列服务?

我很想听到有关MongoDB作为队列服务的真实应用程序体验的更多信息,如果您为此目的使用MongoDB,您可以分享您的想法,以及它的使用环境吗?

social queue mongodb web

37
推荐指数
2
解决办法
3万
查看次数

制作目录时出现Python"FileExists"错误

我在集群系统上有几个并行运行的线程.每个python线程输出到一个目录mydir.每个脚本在输出检查之前是否存在mydir,如果不存在则创建它:

if not os.path.isdir(mydir):
    os.makedirs(mydir)
Run Code Online (Sandbox Code Playgroud)

但这会产生错误:

os.makedirs(self.log_dir)                                             
  File "/usr/lib/python2.6/os.py", line 157, in makedirs
mkdir(name,mode)
OSError: [Errno 17] File exists
Run Code Online (Sandbox Code Playgroud)

我怀疑这可能是由于竞争条件,一个工作在另一个工作之前创建了dir.这可能吗?如果是这样,如何避免这种错误?

我不确定这是一个竞争条件,所以想知道Python中的其他问题是否会导致这个奇怪的错误.

python filesystems queue file-io cluster-computing

37
推荐指数
3
解决办法
5万
查看次数

Laravel队列进程超时错误

我在Laravel上php artisan queue:listen用来运行排队的工作.其中一个工作相当复杂,需要很长时间,因此我收到以下错误:

[Symfony\Component\Process\Exception\ProcessTimedOutException]                                                                                                                                                                              
The process ""/usr/local/Cellar/php55/5.5.14/bin/php" artisan queue:work  
--queue="QUEUE_URL" --delay=0 --memory=128 --sleep=3 --tries=0" 
exceeded the timeout of 60 seconds.
Run Code Online (Sandbox Code Playgroud)

我知道我可以运行queue:listen一个任意高的超时值,但是这不是理想的,因为我希望它的活动时间,一些过程实际上 unreseponsive.我尝试set_time_limit(60)在作业调用的函数内定期调用,但这并没有解决我的问题.

我找到了一个在线提及的线程Symfony\Component\Process\Process->setTimeout(null),但我不知道如何访问该进程对象,或者如果这甚至可以解决问题.

任何帮助将非常感激.

php queue timeout symfony laravel

37
推荐指数
2
解决办法
3万
查看次数

为什么.NET中没有通用的同步队列?

我注意到您可以调用Queue.Synchronize来获取线程安全的队列对象,但Queue <T>上没有相同的方法.有谁知道为什么?看起来有点奇怪.

.net generics queue synchronization

35
推荐指数
3
解决办法
1万
查看次数

作业队列为具有多个使用者的SQL表(PostgreSQL)

我有一个典型的生产者 - 消费者问题:

多个生产者应用程序将作业请求写入PostgreSQL数据库上的作业表.

作业请求的状态字段在创建时包含QUEUED.

多个由当生产者插入一条新记录的规则通知的消费应用:

CREATE OR REPLACE RULE "jobrecord.added" AS
  ON INSERT TO jobrecord DO 
  NOTIFY "jobrecordAdded";
Run Code Online (Sandbox Code Playgroud)

他们将尝试通过将其状态设置为RESERVED来保留新记录.当然,只有消费者才能成功.所有其他消费者不应该保留相同的记录.他们应该保留state = QUEUED的其他记录.

示例:某个生产者将以下记录添加到表jobrecord:

id state  owner  payload
------------------------
1 QUEUED null   <data>
2 QUEUED null   <data>
3 QUEUED null   <data>
4 QUEUED null   <data>
Run Code Online (Sandbox Code Playgroud)

现在,两个消费者A,B想要处理它们.他们同时开始跑步.一个应该保留id 1,另一个应该保留id 2,然后完成的第一个应该保留id 3等等.

在纯多线程世界中,我会使用互斥锁来控制对作业队列的访问,但消费者是可以在不同机器上运行的不同进程.它们只访问同一个数据库,因此所有同步都必须通过数据库进行.

我在PostgreSQL中阅读了很多关于并发访问和锁定的文档,例如http://www.postgresql.org/docs/9.0/interactive/explicit-locking.html 选择Postgresql PostgreSQL中的解锁行并锁定

从这些主题中我了解到,以下SQL语句应该能够满足我的需求:

UPDATE jobrecord
  SET owner= :owner, state = :reserved 
  WHERE id = ( 
     SELECT id from jobrecord …
Run Code Online (Sandbox Code Playgroud)

sql postgresql queue producer-consumer

35
推荐指数
4
解决办法
2万
查看次数

如何检查队列是否为空?

在C#中,如何检查队列是否为空?

我想遍历Queue的元素,我需要知道何时停止.我怎么能做到这一点?

c# queue is-empty

35
推荐指数
3
解决办法
6万
查看次数

在C/C++中实现工作窃取队列?

我正在寻找C/CPP中工作窃取队列的正确实现.我环顾了谷歌,但没有找到任何有用的东西.

也许有人熟悉一个好的开源实现?(我不想实施原始学术论文中的伪代码).

c++ algorithm queue multithreading work-stealing

34
推荐指数
2
解决办法
1万
查看次数

iOS SDK是否提供队列和堆栈?

我正在编写一个iPhone应用程序,我很惊讶Apple的Foundation Framework中似乎没有NSQueue或NSStack类.我看到从NSMutableArray开始自己滚动很容易,所以除非我错过了什么,否则我会这样做.我错过了什么吗?

queue stack ios

34
推荐指数
5
解决办法
3万
查看次数

与工作者"线程"相同的asyncio.Queues

我正在试图弄清楚如何移植一个线程程序来使用asyncio.我有很多代码可以围绕几个标准库同步Queues,基本上是这样的:

import queue, random, threading, time

q = queue.Queue()

def produce():
    while True:
        time.sleep(0.5 + random.random())  # sleep for .5 - 1.5 seconds
        q.put(random.random())

def consume():
    while True: 
        value = q.get(block=True)
        print("Consumed", value)

threading.Thread(target=produce).start()
threading.Thread(target=consume).start()
Run Code Online (Sandbox Code Playgroud)

一个线程创建值(可能是用户输入),另一个线程用它们做某事.关键是这些线程在有新数据之前一直处于空闲状态,此时它们会唤醒并对其执行某些操作.

我正在尝试使用asyncio实现这种模式,但我似乎无法弄清楚如何让它"去".

我的尝试看起来或多或少都像这样(并且根本不做任何事情).

import asyncio, random

q = asyncio.Queue()

@asyncio.coroutine
def produce():
    while True: 
        q.put(random.random())
        yield from asyncio.sleep(0.5 + random.random())

@asyncio.coroutine
def consume():
    while True:
        value = yield from q.get()
        print("Consumed", value)

# do something here to start the coroutines. asyncio.Task()? …
Run Code Online (Sandbox Code Playgroud)

python queue python-3.x python-asyncio

34
推荐指数
2
解决办法
1万
查看次数