线程间通信有问题,并通过在整个地方使用"虚拟消息"来"解决"它.这是一个坏主意吗?有哪些可能的解决方案
示例问题我有.
主线程启动一个线程进行处理并将记录插入数据库.主线程读取一个可能很大的文件,并将一个记录(对象)放在一个阻塞队列中.处理线程从队列中读取并确实有效.
如何告诉"处理线程"停止?队列可以是空的但是工作没有完成,主线程现在也没有处理线程完成工作并且不能中断它.
所以处理线程呢
while (queue.size() > 0 || !Thread.currentThread().isInterrupted()) {
MyObject object= queue.poll(100, TimeUnit.MILLISECONDS);
if (object != null) {
String data = object.getData();
if (data.equals("END")) {
break;
}
// do work
}
}
// clean-up
synchronized queue) {
queue.notifyAll();
}
return;
Run Code Online (Sandbox Code Playgroud)
和主线程
// ...start processing thread...
while(reader.hasNext(){
// ...read whole file and put data in queue...
}
MyObject dummy = new MyObject();
dummy.setData("END");
queue.put(dummy);
//Note: empty queue here means work is done
while (queue.size() > 0) {
synchronized …
Run Code Online (Sandbox Code Playgroud) 我有生产者和消费者联系BlockingQueue
.
消费者从队列中等待记录并处理它:
Record r = mQueue.take();
process(r);
Run Code Online (Sandbox Code Playgroud)
我需要从其他线程暂停这个过程一段时间.怎么实现呢?
现在我认为实现它,但它似乎是不好的解决方案:
private Object mLock = new Object();
private boolean mLocked = false;
public void lock() {
mLocked = true;
}
public void unlock() {
mLocked = false;
mLock.notify();
}
public void run() {
....
Record r = mQueue.take();
if (mLocked) {
mLock.wait();
}
process(r);
}
Run Code Online (Sandbox Code Playgroud) 使用BlockingQueue消耗生成的数据时,等待数据出现的最有效方法是什么?
场景:
步骤1)数据列表将是添加时间戳的数据存储.这些时间戳需要按最接近当前时间优先级排序.此列表可能为空.线程将时间戳插入其中.生产
步骤2)我想在另一个线程中使用此处的数据,该线程将从数据中获取时间戳并检查它们是否在当前时间之后.消费者然后生产
步骤3)如果它们在当前时间之后,则将它们发送到另一个线程以供消费和处理.在此处理时间戳数据后,从步骤1数据存储中删除.消费然后编辑原始列表.
在下面的代码中,数据字段引用步骤1中的数据存储.结果是在当前时间之后已发送的时间戳列表.步骤2.然后将结果消耗步骤3.
private BlockingQueue<LocalTime> data;
private final LinkedBlockingQueue<Result> results = new LinkedBlockingQueue<Result>();
@Override
public void run() {
while (!data.isEmpty()) {
for (LocalTime dataTime : data) {
if (new LocalTime().isAfter(dataTime)) {
results.put(result);
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
问题 等待数据列表中可能可能为空的数据的最有效方法是什么?专注于:
while (!data.isEmpty())
Run Code Online (Sandbox Code Playgroud)
以前的问题.
我正在寻找一种单生产者、单消费者 FIFO 实现,它的执行速度比正常的锁定-写入-解锁-信号/waitForSignal-lock-read-unlock 东西更快。我正在寻找用 C 或 C++ 编写的大多数 POSIX 操作系统(特定于 x86 很好)支持的东西。
我不想传递比指针大的任何东西。
我不一定喜欢无锁的想法,但我确实想要一些快速而正确的东西。我读到的一篇关于这个主题的论文提到了一种看起来很有趣的双队列方法,但从那时起我就找不到太多关于它的信息。
从我到目前为止所做的研究来看,0mq(据说它的 inproc:// 方案使用无锁结构)看起来是最有吸引力的选择。话虽如此,我想确保在走上这条路之前我没有错过任何东西。
另一种选择可能涉及使用 POSIX 消息队列,但这对于线程 <--> 线程通信来说似乎相当慢;这是真的?
任何单消费者单生产者无锁队列在 C 中的实现?似乎相关,但接受的答案实际上并没有像“过早优化是不好的”那样枚举现有库。
关于 Ruby 中的条件变量的资源并不多,但大多数都是错误的。就像ruby-doc一样,教程在这里或在这里发布- 他们都可能遇到死锁。
sleep
我们可以通过按给定顺序启动线程并可能在中间放置一些线程来强制同步来解决问题。但这只是推迟了真正的问题。
我将代码重写为经典的生产者-消费者问题:
require 'thread'
queue = []
mutex = Mutex.new
resource = ConditionVariable.new
threads = []
threads << Thread.new do
5.times do |i|
mutex.synchronize do
resource.wait(mutex)
value = queue.pop
print "consumed #{value}\n"
end
end
end
threads << Thread.new do
5.times do |i|
mutex.synchronize do
queue << i
print "#{i} produced\n"
resource.signal
end
sleep(1) #simulate expense
end
end
threads.each(&:join)
Run Code Online (Sandbox Code Playgroud)
有时你会得到这个(但并非总是如此):
0 produced
1 produced
consumed 0
2 produced …
Run Code Online (Sandbox Code Playgroud) 我想用Java中的多线程等待和通知方法编写程序。
该程序有一个堆栈(最大长度 = 5)。生产者永远生成数字并将其放入堆栈中,消费者从堆栈中选取它。
当堆栈已满时,生产者必须等待,当堆栈为空时,消费者必须等待。
问题是它只运行一次,我的意思是一旦它产生 5 个数字,它就会停止,但我将 run 方法放在 while(true) 块中以不间断运行,但它没有。
这是我到目前为止所尝试的。
生产者类别:
package trail;
import java.util.Random;
import java.util.Stack;
public class Thread1 implements Runnable {
int result;
Random rand = new Random();
Stack<Integer> A = new Stack<>();
public Thread1(Stack<Integer> A) {
this.A = A;
}
public synchronized void produce()
{
while (A.size() >= 5) {
System.out.println("List is Full");
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
result = rand.nextInt(10);
System.out.println(result + " produced ");
A.push(result);
System.out.println(A); …
Run Code Online (Sandbox Code Playgroud) 我有一个带有一系列节点的XML文件.每个节点代表一个我需要解析并添加到排序列表中的元素(顺序必须与文件中找到的节点相同).
目前我正在使用顺序解决方案:
struct Graphic
{
bool parse()
{
// parsing...
return parse_outcome;
}
};
vector<unique_ptr<Graphic>> graphics;
void producer()
{
for (size_t i = 0; i < N_GRAPHICS; i++)
{
auto g = new Graphic();
if (g->parse())
graphics.emplace_back(g);
else
delete g;
}
}
Run Code Online (Sandbox Code Playgroud)
因此,只有图形(实际上是派生的类的实例Graphic
,Line,Rectangle等,这就是为什么new
)才能正确解析,它才会被添加到我的数据结构中.
由于我只关心将图形添加到列表中的顺序,我虽然是异步调用解析方法,因此生产者的任务是从文件中读取每个节点并将此图形添加到数据结构中,同时对消费者有每当一个新的图形已准备好被解析解析每个图形的任务.
现在我有几个消费者线程(在main中创建),我的代码如下所示:
queue<pair<Graphic*, size_t>> q;
mutex m;
atomic<size_t> n_elements;
void producer()
{
for (size_t i = 0; i < N_GRAPHICS; i++)
{
auto g = new Graphic();
graphics.emplace_back(g);
q.emplace(make_pair(g, …
Run Code Online (Sandbox Code Playgroud) 我找到了这个代码,它实现了生产者-消费者问题。我在这里发布一段代码。
在给定的代码中,让我们考虑这样一个场景:生产者通过调用生成一个值void add(int num)
,它获取互斥体上的锁mu
,并且 buffer.size()==size_
这使得生产者由于条件变量 而进入等待队列cond
。
与此同时,发生上下文切换,消费者调用函数int remove()
来消费 value ,它尝试获取 mutex 上的锁mu
,但是生产者之前已经获取了锁,因此它失败并且永远不会消费该值,从而导致僵局。
我这里哪里出错了?因为代码在我运行时似乎工作正常,所以调试它对我没有帮助。
谢谢
void add(int num) {
while (true) {
std::unique_lock<std::mutex> locker(mu);
cond.wait(locker, [this](){return buffer_.size() < size_;});
buffer_.push_back(num);
locker.unlock();
cond.notify_all();
return;
}
}
int remove() {
while (true)
{
std::unique_lock<std::mutex> locker(mu);
cond.wait(locker, [this](){return buffer_.size() > 0;});
int back = buffer_.back();
buffer_.pop_back();
locker.unlock();
cond.notify_all();
return back;
}
}
Run Code Online (Sandbox Code Playgroud) 我一直在阅读有关 AMQP 消息传递确认的原理。(https://www.rabbitmq.com/confirms.html)。文章确实很有帮助,写得很好,但有关消费者认知的一件事确实令人困惑,以下是引用:
使用自动确认模式时需要考虑的另一件事是消费者过载。
消费者超载?消息队列由代理处理并保存在 RAM 中(如果我理解正确的话)。这是关于什么过载?消费者是否有某种第二队列?该文章的另一部分更令人困惑:
因此,消费者可能会因交付速度而不知所措,可能会在内存中积累积压并耗尽堆或让操作系统终止其进程。
什么积压?这一切是如何协同工作的?消费者完成哪部分工作(当然除了消费消息和处理消息)?我认为代理正在保持队列活动并转发消息,但现在我正在阅读一些神秘的积压和消费者过载。这真的很令人困惑,有人可以解释一下或者至少指出我的好来源吗?
我试图在 Active MQ(版本 5.15.0)中创建消费者级别超时。考虑消费者选择了一条消息但无法确认,因此在这种情况下,我希望消费者超时,以便其他消费者可以选择收听经纪人的消息。
我的生产者代码在其中设置了两个消费者侦听器:
public class JmsMessageListenerAckExample {
public static void main(String[] args) throws URISyntaxException, Exception {
Connection connection = null;
try {
// Producer
ConnectionFactory factory = createActiveMQConnectionFactory();
connection = factory.createConnection();
Session session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("customerQueue");
String payload = "Important Task";
Message msg = session.createTextMessage(payload);
MessageProducer producer = session.createProducer(queue);
System.out.println("Sending text '" + payload + "'");
producer.send(msg);
// Consumer
MessageConsumer consumer1 = session.createConsumer(queue);
consumer1.setMessageListener(
new AckMessageListener(false, "consumer1"));
Thread.sleep(1000);
System.out.println("Creating new message listener to acknowledge"); …
Run Code Online (Sandbox Code Playgroud) java ×5
c++ ×3
deadlock ×2
mutex ×2
performance ×2
amqp ×1
c++11 ×1
concurrency ×1
java-7 ×1
jms ×1
notify ×1
rabbitmq ×1
ruby ×1
unique-lock ×1
wait ×1