我需要制作一个有界队列的字典。以下是我到目前为止所拥有的。
from collections import defaultdict
from collections import deque
d = defaultdict(deque)
...
q = d[name]
Run Code Online (Sandbox Code Playgroud)
创建有界队列:q = deque(maxlen = 10)可以,但我不确定当队列位于字典中时如何执行此操作。谁能帮我制作有界队列的地图?
循环队列显然更好,因为它可以帮助我们利用弹出元素留下的空白空间。它还节省了每次弹出后可能用于横向移动元素的时间。
但是是否存在队列比使用循环队列更受青睐的用例?
队列的定义 = 我们将采用线性数组实现。遵循 FIFO,无覆盖
循环队列的定义=环形缓冲区实现。遵循先进先出。没有覆盖。
出于测试目的,我将作业排入队列。我在另一边打开了一个命令行,我在其中执行:
php工匠队列:工作
上面的命令一直在终端上监听和运行,直到我手动杀死它。
因为我正在从队列中单独测试 Jobs。让命令执行一个或多个条目并退出而不是作为守护进程执行并且必须以某种方式杀死它会非常好......
还要考虑到,在不重新启动守护程序化命令的情况下,不可能编辑代码并使当前执行 queue:work 获取更新。
我怎样才能把它变成一个 LIFO-> 后进先出队列?有什么简单的方法可以做到吗?这是一个 FIFO-> 先进先出队列。
using namespace std;
int main(){
queue<string> q;
cout << "Pushing one two three four\n";
q.push("one");
q.push("two");
q.push("three");
q.push("four");
cout << "Now, retrieve those values in FIFO order.\n";
while(!q.empty()) {
cout << "Popping ";
cout << q.front() << "\n";
q.pop();
}
cout << endl;
return 0;
}
Run Code Online (Sandbox Code Playgroud) 我有一个控制器函数,它调度一个job. 处理此job问题后,最后它会job再次分派相同的内容(使用不同的参数)。总共有5个相同的职位。
Queue driver: database
问题是:我记录了从create()到 的持续时间handle()。由控制器调度的第一个作业花费了 1700 毫秒,而由作业本身调度的其他作业仅花费了 40 毫秒。
Queue driver: sync
当我改用sync队列驱动程序时,所有工作都以闪电般的速度进行。
发现:
create()第一个队列作业从到花费了很长时间handle()。在此之前,队列是空的。可能是队列驱动程序的问题。
请问为什么以及如何解决?谢谢!!
更新:
TestJob在处理作业时添加了一个调度本身。这意味着队列总是有一个TestJob正在处理或等待处理。
重复我原来的工作,它们从完成created()到完成只用了不到 70 毫秒的时间handle()。
结论:
我很确定这是队列驱动程序问题。看起来工作人员在队列为空时就睡着了。请问有人知道修复方法吗?
我一整天都被这个问题困扰,并且无法找到与我想要完成的任务相关的任何解决方案。
我试图将队列传递给子进程中生成的线程。队列在入口文件中创建并作为参数传递给每个子流程。
我正在制作一个模块化程序来a)运行神经网络b)在需要时自动更新网络模型c)将事件/图像从神经网络记录到服务器。我以前的程序只崇拜一个运行多个线程的 CPU 核心,并且速度变得相当慢,因此我决定需要对程序的某些部分进行子处理,以便它们可以在自己的内存空间中运行,以充分发挥其潜力。
子流程:
总共 4 个子流程。
在开发过程中,我需要在每个进程之间进行通信,以便它们都位于同一页面上,并包含来自服务器的事件等。所以据我所知,队列将是最好的选择。
(澄清:来自“多处理”模块的“队列”,而不是“队列”模块)
~~不过~~
每个子进程都会产生自己的线程。例如,第一个子进程将产生多个线程:每个队列一个线程,用于侦听来自不同服务器的事件并将它们传递到程序的不同区域;一个线程监听从神经网络之一接收图像的队列;一个线程监听从网络摄像头接收实时图像的队列;一个线程监听从另一个神经网络接收输出的队列。
我可以毫无问题地将队列传递给子流程,并且可以有效地使用它们。但是,当我尝试将它们传递给每个子进程中的线程时,我收到上述错误。
我对多重处理相当陌生;然而,除了共享内存空间和 GIL 之外,其背后的方法看起来与线程相对相同。
这是来自 Main.py;程序入口。
from lib.client import Client, Image
from multiprocessing import Queue, Process
class Main():
def __init__(self, server):
self.KILLQ = Queue()
self.CAMERAQ = Queue()
self.CLIENT = Client((server, 2005), self.KILLQ, self.CAMERAQ)
self.CLIENT_PROCESS = Process(target=self.CLIENT.do, daemon=True)
self.CLIENT_PROCESS.start()
if __name__ == '__main__':
m = Main('127.0.0.1')
while True:
m.KILLQ.put("Hello world")
Run Code Online (Sandbox Code Playgroud)
这是来自 client.py (在名为 lib 的文件夹中)
class Client():
def __init__(self, connection, killq, cameraq):
self.TCP_IP …Run Code Online (Sandbox Code Playgroud) 我的 Node js 应用程序上有一个 async.queue 实现,但queue.drain 函数最近完全停止触发。
我怀疑该问题与我内部任务函数中的等待语句有关,但我也可以使用异步文档上的示例重现该问题
const async = require('async')
var q = async.queue(function(task, callback) {
console.log('hello ' + task.name);
callback();
}, 1);
q.drain(function() {
console.log('all items have been processed');
});
q.push({name: 'bar'});
q.push({name: 'foo'}, function(err) {
console.log('finished processing foo');
});
Run Code Online (Sandbox Code Playgroud)
这将在我的控制台上输出以下内容,但不会输出排出语句。那么我缺少什么吗?
你好酒吧
你好富
处理完成 foo
在我的项目的一个关键部分中,它基本上允许控制器异步接收对象,放入 a 中,Queue由线程一次从队列中顺序处理一个,然后服务响应,较旧的处理过的对象保留在队列中,直到更新的对象项目插入。
回到过去(几个月前),我Queue解决这个特定业务特定问题的实现是使用 Guava 的EvictingQueue,它现在标记为@Beta,因此应用程序的这一部分可能会在未来的 Guava 版本中中断。
private final Queue<SomeRandomBusinessObject> items = Queues.synchronizedQueue(EvictingQueue.create(queueSize));
Run Code Online (Sandbox Code Playgroud)
是否有任何线程安全和固定大小的替代方案来EvictingQueue实现这一目标?
我使用这个答案中的代码,但是asyncio.exceptions.CancelledError当队列为空时得到。在实际项目中,我将任务添加到消费者的队列中,这就是我使用while True语句的原因
我压缩该代码以使调试更容易:
import asyncio
import traceback
async def consumer(queue: asyncio.Queue):
try:
while True:
number = await queue.get() # here is exception
queue.task_done()
print(f'consumed {number}')
except BaseException:
traceback.print_exc()
async def main():
queue = asyncio.Queue()
for i in range(3):
await queue.put(i)
consumers = [asyncio.create_task(consumer(queue)) for _ in range(1)]
await queue.join()
for c in consumers:
c.cancel()
asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)
和错误:
consumed 0
consumed 1
consumed 2
Traceback (most recent call last):
File "/Users/abionics/Downloads/BaseAsyncScraper/ttt.py", line 8, in consumer
number …Run Code Online (Sandbox Code Playgroud) 在我下面的简单 asyncio 代码中,应用程序有一个任务self.add_item_loop_task不断地向asyncio.Queuenamed 中添加一个整数self.queue,而第二个任务则self.get_item_loop_task不断地等待将某些内容添加到队列中并将print其退出。
但是,这个应用程序0在我运行时只打印一次,然后卡在那里。我相信循环self.get_item_loop_task没有继续。为什么会这样?
import asyncio
class App:
def __init__(self):
self.queue = asyncio.Queue()
async def start(self):
self.add_item_loop_task = asyncio.create_task(self.add_item_loop())
self.get_item_loop_task = asyncio.create_task(self.get_item_loop())
await asyncio.wait(
[
self.add_item_loop_task,
self.get_item_loop_task,
]
)
async def stop(self):
self.add_item_loop_task.cancel()
self.get_item_loop_task.cancel()
async def add_item_loop(self):
i = 0
while True:
await self.queue.put(i)
i += 1
await asyncio.sleep(1)
async def get_item_loop(self):
while True:
item = await self.queue.get()
print(item)
app = App()
try:
asyncio.run(app.start())
except …Run Code Online (Sandbox Code Playgroud) queue ×10
python ×4
laravel ×2
async.js ×1
asynchronous ×1
c++ ×1
coroutine ×1
dictionary ×1
guava ×1
java ×1
javascript ×1
jobs ×1
lifo ×1
node.js ×1
performance ×1
python-3.x ×1
spring-boot ×1
stack ×1
testing ×1