我们正在使用Tibco EMS队列发送消息。我们是出版商。每当我们发布消息时,似乎有人正在使用它。当我们问另一个正在订阅的团队时,他们说他们没有消费。
是否可以知道谁连接到EMS队列并使用消息?
我想用单个生产者和多个消费者的模式实现多文件下载.
我有: - 找到要在循环中下载的新链接的代码 - 当找到新链接时 - 它调用下载功能 - 下载功能接受源文件路径和目标文件路径并下载文件.
我想做什么 - 我想同时下载X个文件(我不知道文件的总数) - 在任何时候我都应该能够同时下载X文件 - 只要1个X文件完成下载 - 调用函数应该能够立即添加新的下载 - 然后立即下载
举例非常感谢
我有一个相当香草的网络服务(旧学校asmx).其中一种方法启动了一些与返回给客户端的结果无关的异步处理.希望下面的小片段有意义:
[System.Web.Services.WebMethod]
public List<Foo> SampleWebMethod(string id)
{
// sample db query
var foo = db.Query<Foo>("WHERE id=@0",id);
// kick of async stuff here - for example firing off emails
// dont wait to send result
DoAsyncStuffHere();
return foo;
}
Run Code Online (Sandbox Code Playgroud)
我对DoAsyncStuffHere方法的初始实现使用了ThreadPool.QueueUserWorkItem.所以,它看起来像:
public void DoAsyncStuffHere()
{
ThreadPool.QueueUserWorkItem(delegate
{
// DO WORK HERE
});
}
Run Code Online (Sandbox Code Playgroud)
这种方法在低负载条件下工作正常.但是,我需要能够处理相当高负载的东西.因此,生产者/消费者模式似乎是最好的方式.
我感到困惑的是如何将队列所做的所有工作限制在Web服务的所有实例中的单个线程上.我如何才能最好地设置单个队列以供任何Web服务实例访问?
我用一个阻塞队列编写了一个简单的消费者 - 生产者问题,该队列有多个生产者和多个消费者接受并将整数放在队列中.但是,当我尝试测试它时,结果并不理想,例如队列的大小不正确.我不认为消费者和生产者规模正在同步.此外,我对生产者和消费者都进行了2秒钟的睡眠,但在测试时,每两秒就打印出2个生产者和2个消费者的结果.有谁知道我做错了什么?也许我开始错误的线程?我评论了另一种方式,但结果仍然是错误的.
结果:
run:
Producing 425 Thread-0 size left 0
Consuming 890 Thread-3 size left 0
Consuming 425 Thread-2 size left 0
Producing 890 Thread-1 size left 0
Consuming 192 Thread-2 size left 0
Consuming 155 Thread-3 size left 0
Producing 155 Thread-1 size left 0
Producing 192 Thread-0 size left 0
Consuming 141 Thread-2 size left 1
Producing 141 Thread-0 size left 0
Producing 919 Thread-1 size left 0
Consuming 919 Thread-3 size left 0
Producing 361 Thread-1 …Run Code Online (Sandbox Code Playgroud) 下面的代码几乎与http://docs.python.org/2/library/queue.html上的python官方Queue示例相同.
from Queue import Queue
from threading import Thread
from time import time
import sys
num_worker_threads = int(sys.argv[1])
source = xrange(10000)
def do_work(item):
for i in xrange(100000):
pass
def worker():
while True:
item = q.get()
do_work(item)
q.task_done()
q = Queue()
for item in source:
q.put(item)
start = time()
for i in range(num_worker_threads):
t = Thread(target=worker)
t.daemon = True
t.start()
q.join()
end = time()
print(end - start)
Run Code Online (Sandbox Code Playgroud)
这些是Xeon 12核处理器的结果:
$ ./speed.py 1
12.0873839855
$ ./speed.py 2
15.9101941586
$ ./speed.py …Run Code Online (Sandbox Code Playgroud) 我最近开始使用.NET 4.5中的TPL Dataflow库,并且块的整个概念对我来说是新的.我正在我的应用程序中实现生产者 - 消费者队列,我需要防止重复的消息被放入队列,因此需要检查消息是否已经排队.我正在使用一种BufferBlock<Message>类型(Message是一种自定义类型).BufferBlock具有Count属性但在此问题中没有帮助,因为需要唯一标识消息.
有没有办法检查是否BufferBlock包含一个项目或检查所有项目并检查它们?是否有可能转换BufferBlock为允许迭代项目的东西?我正在按照我在MSDN上看到的一个例子,它没有检查项目是否在队列中,但我认为检查队列的内容是一个非常需要的操作.任何帮助表示赞赏.
.net c# producer-consumer task-parallel-library tpl-dataflow
我正在使用gcc为单处理器32位微控制器编写代码.
我需要从链表中消耗带时间戳的对象.代码的另一部分可以是异步的(可能在ISR中)将它们添加到列表中.
关键部分通过关闭中断和使用该barrier()功能来实现.
我很困惑gcc优化可以通过缓存指向列表项的指针(下一个要删除的最新项目,列表头或空闲列表)来破坏我的代码.我不希望while循环中的任何内容从前一个循环中缓存.内存屏障是否会保护我免受编译器决定在函数启动时加载一次指针而永远不会重新加载它?所有这些列表指针都可以在生产者代码的关键部分中修改(未示出).我试图理解是否pqueue_first应该是一个易失性指针,例如.
据推测,如果没有循环(这是添加到列表的情况),如果函数中的所有代码都在关键部分,我可以吗?
请不要只指向一些关于易失性或关键部分的通用文章,因为我已经阅读了很多部分,但是我很难看到如何将它应用于这个特定的代码.我知道volatile确保编译器每次引用时都会重新加载变量.但我不了解优化的可能范围及其与内存障碍的相互作用.
typedef struct {
EV_EventQueueEntry_t *pqueue_alloc; // allocation (never changes)
EV_EventQueueEntry_t *pqueue_head; // head of active queue (ISR can change it)
EV_EventQueueEntry_t *pqueue_free; // head of free list (ISR can change it)
EV_EventQueueEntry_t *pqueue_first; // soonest item in queue (ISR can change it)
EV_EventQueueEntry_t *pqueue_first_prev; // back pointer from soonest item (ISR can change it)
EV_UInt_t max_event_count;
} EV_EventQueue_t;
void RunLoop(EV_EventQueue_t *pev)
{
while(not timeout)
{
// Enter critical section
disable_interrupts(); …Run Code Online (Sandbox Code Playgroud) c volatile producer-consumer critical-section memory-barriers
我有一个Producer和一个Consumer线程(threading.Thread),它共享一个queue类型Queue.
制片人run:
while self.running:
product = produced() ### I/O operations
queue.put(product)
Run Code Online (Sandbox Code Playgroud)
消费者run:
while self.running or not queue.empty():
product = queue.get()
time.sleep(several_seconds) ###
consume(product)
Run Code Online (Sandbox Code Playgroud)
现在我需要终止主线程中的两个线程,并且queue在终止之前必须为空(全部已消耗)的要求.
目前我正在使用如下代码来终止这两个线程:
主线程stop:
producer.running = False
producer.join()
consumer.running = False
consumer.join()
Run Code Online (Sandbox Code Playgroud)
但我想如果有更多的消费者,这是不安全的.
另外,我不确定是否sleep会向制作人发布时间表以便它可以生产更多产品.事实上,我发现生产者一直"挨饿",但我不确定这是否是根本原因.
有没有一个体面的方法来处理这个案子?
python multithreading producer-consumer python-multithreading
考虑下一段代码.
#include <iostream>
#include <vector>
#include <map>
using namespace std;
map<pthread_t,vector<int>> map_vec;
vector<pair<pthread_t ,int>> how_much_and_where;
pthread_cond_t CV = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
void* writer(void* args)
{
while(*some condition*)
{
int howMuchPush = (rand() % 5) + 1;
for (int i = 0; i < howMuchPush; ++i)
{
// WRITE
map_vec[pthread_self()].push_back(rand() % 10);
}
how_much_and_where.push_back(make_pair(pthread_self(), howMuchPush));
// Wake up the reader - there's something to read.
pthread_cond_signal(&CV);
}
cout << "writer thread: " << pthread_self() << endl;
return nullptr;
} …Run Code Online (Sandbox Code Playgroud) 我正在使用一个名为react-firebase-js的东西来处理firebase auth,但我对反应和提供者 - 消费者想法的理解是有限的.
我开始在顶层构建一个非常大的JSX东西,并且没有警告.但是当我尝试将其分解为组件时,我会在标题和其他一些内容中显示警告.
这没有警告......
// in App.js component
render() {
return (
<header className="App-header">
<img src={logo} className="App-logo" alt="logo" />
<FirebaseAuthConsumer>
{({ isSignedIn, user, providerId }) => {
if (isSignedIn) {
return (
// ui for signed in user
);
} else {
if (this.state.confirmationResult) {
return (
// ui to get a phone number sign in
);
} else {
return (
// ui to verify sms code that was sent
);
}
}
}}
</FirebaseAuthConsumer> …Run Code Online (Sandbox Code Playgroud) javascript producer-consumer firebase reactjs firebase-authentication