我正在寻找一种单生产者、单消费者 FIFO 实现,它的执行速度比正常的锁定-写入-解锁-信号/waitForSignal-lock-read-unlock 东西更快。我正在寻找用 C 或 C++ 编写的大多数 POSIX 操作系统(特定于 x86 很好)支持的东西。
我不想传递比指针大的任何东西。
我不一定喜欢无锁的想法,但我确实想要一些快速而正确的东西。我读到的一篇关于这个主题的论文提到了一种看起来很有趣的双队列方法,但从那时起我就找不到太多关于它的信息。
从我到目前为止所做的研究来看,0mq(据说它的 inproc:// 方案使用无锁结构)看起来是最有吸引力的选择。话虽如此,我想确保在走上这条路之前我没有错过任何东西。
另一种选择可能涉及使用 POSIX 消息队列,但这对于线程 <--> 线程通信来说似乎相当慢;这是真的?
任何单消费者单生产者无锁队列在 C 中的实现?似乎相关,但接受的答案实际上并没有像“过早优化是不好的”那样枚举现有库。
我列出了需要按顺序处理的工作项。有时列表将为空,有时将包含一千个项目。一次只能按顺序处理一个。目前,我正在执行以下操作,这些操作对我来说看起来很愚蠢,因为我正在使用Consumer任务中的Thread.Sleep来等待100ms,然后再检查列表是否为空。这是执行此操作的标准方法,还是我完全错了。
public class WorkItem
{
}
public class WorkerClass
{
CancellationTokenSource cts = new CancellationTokenSource();
CancellationToken ct = new CancellationToken();
List<WorkItem> listOfWorkItems = new List<WorkItem>();
public void start()
{
Task producerTask = new Task(() => producerMethod(ct), ct);
Task consumerTask = new Task(() => consumerMethod(ct), ct);
producerTask.Start();
consumerTask.Start();
}
public void producerMethod(CancellationToken _ct)
{
while (!_ct.IsCancellationRequested)
{
//Sleep random amount of time
Random r = new Random();
Thread.Sleep(r.Next(100, 1000));
WorkItem w = new WorkItem();
listOfWorkItems.Add(w);
}
}
public void consumerMethod(CancellationToken _ct) …
Run Code Online (Sandbox Code Playgroud) 我正在实现一个concurrent_blocking_queue
具有最小功能:
//a thin wrapper over std::queue
template<typename T>
class concurrent_blocking_queue
{
std::queue<T> m_internal_queue;
//...
public:
void add(T const & item);
T& remove();
bool empty();
};
Run Code Online (Sandbox Code Playgroud)
我打算将它用于生产者 - 消费者问题(我猜,它是使用这种数据结构的地方吗?).但我坚持一个问题是:
生产者完成后如何优雅地通知消费者?生产者完成后如何通知队列?通过调用特定的成员函数,比方说done()
?从队列中抛出异常(即从remove
函数中)是一个好主意吗?
我遇到了很多例子,但都有无限循环,好像生产者会永远生产物品.没有讨论停止条件的问题,甚至没有讨论维基文章.
我正在尝试在Java ME中实现一个简单的阻塞队列.在JavaME API中,Java SE的并发实用程序不可用,因此我必须像以前一样使用wait-notify.
这是我的临时实施.我正在使用notify
而不是notifyAll
因为在我的项目中有多个生产者,但只有一个消费者.我故意使用一个对象进行wait-notify来提高可读性,尽管它浪费了一个引用:
import java.util.Vector;
public class BlockingQueue {
private Vector queue = new Vector();
private Object queueLock = new Object();
public void put(Object o){
synchronized(queueLock){
queue.addElement(o);
queueLock.notify();
}
}
public Object take(){
Object ret = null;
synchronized (queueLock) {
while (queue.isEmpty()){
try {
queueLock.wait();
} catch (InterruptedException e) {}
}
ret = queue.elementAt(0);
queue.removeElementAt(0);
}
return ret;
}
}
Run Code Online (Sandbox Code Playgroud)
我的主要问题是关于put
方法.我可以把queue.addElement
线放出synchronized
块吗?如果是这样,性能会改善吗
此外,同样适用于take
:我可以采取两种操作queue
出来的synchronized block …
所以我模拟了我的生产者消费者问题,我有下面的代码.我的问题是:如果消费者处于不变的状态,消费者将如何停止(真实).
在下面的代码中,我添加了
if (queue.peek()==null)
Thread.currentThread().interrupt();
Run Code Online (Sandbox Code Playgroud)
这个例子很好用.但在我的真实世界设计中,这不起作用(有时生产者需要更长的时间来"放置"数据,因此消费者抛出的异常是不正确的.一般来说,我知道我可以放一个'毒'数据比如Object是XYZ,我可以在消费者中检查它.但这种毒药使代码看起来真的很糟糕.想知道是否有人采用不同的方法.
public class ConsumerThread implements Runnable
{
private BlockingQueue<Integer> queue;
private String name;
private boolean isFirstTimeConsuming = true;
public ConsumerThread(String name, BlockingQueue<Integer> queue)
{
this.queue=queue;
this.name=name;
}
@Override
public void run()
{
try
{
while (true)
{
if (isFirstTimeConsuming)
{
System.out.println(name+" is initilizing...");
Thread.sleep(4000);
isFirstTimeConsuming=false;
}
try{
if (queue.peek()==null)
Thread.currentThread().interrupt();
Integer data = queue.take();
System.out.println(name+" consumed ------->"+data);
Thread.sleep(70);
}catch(InterruptedException ie)
{
System.out.println("InterruptedException!!!!");
break;
}
}
System.out.println("Comsumer " + this.name + " finished its job; terminating."); …
Run Code Online (Sandbox Code Playgroud) 我有N个工作人员共享要计算的元素队列.在每次迭代中,每个工作程序从队列中删除一个元素,并且可以生成更多要计算的元素,这些元素将被放在同一个队列中.基本上,每个生产者也是消费者.当队列中没有元素并且所有工作者已经完成计算当前元素时,计算结束(因此不能再生成要计算的元素).我想避免调度员/协调员,所以工人应该协调.允许工人查明暂停条件是否有效的最佳模式是什么,因此代表其他人停止计算?
例如,如果所有线程都只执行此循环,那么当所有元素都被计算出来时,它将导致所有线程被永久阻塞:
while (true) {
element = queue.poll();
newElements[] = compute(element);
if (newElements.length > 0) {
queue.addAll(newElements);
}
}
Run Code Online (Sandbox Code Playgroud) 如果系统中的某些实体可以充当数据或事件的生成者,而其他实体可以充当消费者,那么将这些"正交关注点"外化到生产者和消费者类型类中是否有意义?
我可以看到Haskell管道库使用这种方法,并且欣赏这个问题对于来自Haskell背景的人来说可能看起来非常基本,但是会对Scala视角和示例感兴趣,因为我看不到很多.
即时通讯使用模块kafka-node https://github.com/SOHU-Co/kafka-node
每当我重新启动消费者时,他们都会收到所有旧消息,即使用循环系统(负载均衡)
你有什么想法我怎么能向服务器宣布我消费了一条消息,当我重新启动消费者时他不再发送给我了?
我的代码或配置服务器有些错误?
任何的想法 ?
var kafka = require('kafka-node');
var HighLevelProducer = kafka.HighLevelProducer;
var Client = kafka.Client;
var client = new Client('xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181', 'consumer' + process.pid);
var argv = require('optimist').argv;
var topic = argv.topic || 'test_12345';
var producer = new HighLevelProducer(client);
var time = process.hrtime();
var message, diff,i=0;
producer.on('ready', function () {
setInterval(function(){
var date = new Date();
var dateString = date.getFullYear() + "-" +((date.getMonth()+1)<10 ? '0'+(date.getMonth()+1) : (date.getMonth()+1)) + "-" +(date.getDate()<10 ? '0'+date.getDate() : date.getDate()) + " …
Run Code Online (Sandbox Code Playgroud) 下面的代码中提供了ActiveMQ实现。有时,系统停止工作并变得非常缓慢。当我使用JavaMelody检查线程转储时-我看到太多线程长时间处于Runnable状态,并且没有终止。
ActiveMQ版本 -activemq-all-5.3.0.jar
请参考以下代码:
经纪人:
public class ActiveMQ extends HttpServlet {
private static final long serialVersionUID = -1234568008764323456;
private static final Logger logger = Logger.getLogger(ActiveMQ.class.getName());
public Listener listener;
private String msgBrokerUrl = "tcp://localhost:61602";
public BrokerService broker = null;
public TransportConnector connector = null;
@Override
public void init() throws ServletException {
try {
broker = new BrokerService();
broker.setPersistent(false);
broker.setUseJmx(false);
connector = broker.addConnector(msgBrokerUrl);
broker.setUseShutdownHook(true);
System.out.println("BROKER LOADED");
broker.start();
broker.deleteAllMessages();
listener = new Listener();
} catch (Exception e) {
e.printStackTrace();
}
}
Run Code Online (Sandbox Code Playgroud)
}
听众: …
因为NServiceBus似乎不支持在消息队列中添加优先级机制,所以我想自己实现这一点。
public void Handle(DoAnAction message)
{
_messageManager.Insert(message);
}
Run Code Online (Sandbox Code Playgroud)
public void Run()
{
DoAnAction message;
while (true)
{
if (_messageManager.TryDequeue(out message))
{
doALongCall(message);
}
else
{
Thread.sleep(200);
}
}
}
Run Code Online (Sandbox Code Playgroud)
首先这是一个好主意吗?我不喜欢这样丢失消息的想法。
更新:用例:我们有许多客户可以发送消息DoAnAction。该操作的处理需要一段时间。问题是当1个客户端决定发送200个DoAnAction时,所有其他客户端都必须等待2-3个小时,以便可以处理所有这些消息(FIFO)。相反,我想根据客户端的顺序处理这些消息。
因此,即使客户端A仍有200条消息要处理,当客户端B发送消息时,下一个也会排队。如果客户端B发送3条消息,则队列将如下所示:BA,BA,BA,A,A,A,...
java ×4
concurrency ×3
c# ×2
c++ ×2
queue ×2
.net ×1
apache-kafka ×1
java-me ×1
jms ×1
node.js ×1
nservicebus ×1
performance ×1
scala ×1
task ×1
typeclass ×1