在Kafka中,我想只使用单个代理,单个主题和一个具有一个生产者和多个消费者的分区(每个消费者从代理获得自己的数据副本).鉴于此,我不希望使用Zookeeper的开销; 我不能只使用经纪人吗?为什么动物园管理员必须?
partitioning producer-consumer broker apache-kafka apache-zookeeper
我有一个单线程生成器,它创建一些任务对象,然后将其添加到ArrayBlockingQueue
(具有固定大小).
我也开始了一个多线程的消费者.这是一个固定的线程池(Executors.newFixedThreadPool(threadCount);
).然后我向这个threadPool提交了一些ConsumerWorker入口,每个ConsumerWorker都对上面提到的ArrayBlockingQueue实例进行了引用.
每个这样的工作人员都会take()
在队列中执行并处理任务.
我的问题是,当没有更多的工作要做时,让工人知道的最佳方法是什么.换句话说,如何告诉Workers,生产者已经完成了对队列的添加,从这一点开始,每个工作人员在看到Queue为空时应该停止.
我现在得到的是一个设置,我的Producer初始化了一个回调,当他完成它的工作(向队列中添加东西)时会触发回调.我还保留了我创建并提交给ThreadPool的所有ConsumerWorkers的列表.当Producer Callback告诉我生产者已完成时,我可以告诉每个工人.此时,他们应该只是继续检查队列是否为空,当它变为空时它们应该停止,从而允许我优雅地关闭ExecutorService线程池.就是这样的
public class ConsumerWorker implements Runnable{
private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;
public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
this.inputQueue = inputQueue;
}
@Override
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(isRunning || !inputQueue.isEmpty()) {
System.out.println("Consumer "+Thread.currentThread().getName()+" START");
try {
Object queueElement = inputQueue.take();
//process queueElement …
Run Code Online (Sandbox Code Playgroud) 我想创建一些Producer/Consumer
线程应用程序.但我不确定在两者之间实现队列的最佳方法是什么.
所以我有两个想法(两者都可能是完全错误的).我想知道哪个更好,如果它们都吮吸那么什么是实现队列的最佳方式.这主要是我在这些例子中实现的队列,我很关心.我正在扩展一个内部类的Queue类,并且是线程安全的.以下是两个示例,每个示例包含4个类.
主类 -
public class SomeApp
{
private Consumer consumer;
private Producer producer;
public static void main (String args[])
{
consumer = new Consumer();
producer = new Producer();
}
}
Run Code Online (Sandbox Code Playgroud)
消费者类 -
public class Consumer implements Runnable
{
public Consumer()
{
Thread consumer = new Thread(this);
consumer.start();
}
public void run()
{
while(true)
{
//get an object off the queue
Object object = QueueHandler.dequeue();
//do some stuff with the object
}
}
}
Run Code Online (Sandbox Code Playgroud)
制片人类 -
public class …
Run Code Online (Sandbox Code Playgroud) 我有一个典型的生产者 - 消费者问题:
多个生产者应用程序将作业请求写入PostgreSQL数据库上的作业表.
作业请求的状态字段在创建时包含QUEUED.
有多个由当生产者插入一条新记录的规则通知的消费应用:
CREATE OR REPLACE RULE "jobrecord.added" AS
ON INSERT TO jobrecord DO
NOTIFY "jobrecordAdded";
Run Code Online (Sandbox Code Playgroud)
他们将尝试通过将其状态设置为RESERVED来保留新记录.当然,只有消费者才能成功.所有其他消费者不应该保留相同的记录.他们应该保留state = QUEUED的其他记录.
示例:某个生产者将以下记录添加到表jobrecord:
id state owner payload
------------------------
1 QUEUED null <data>
2 QUEUED null <data>
3 QUEUED null <data>
4 QUEUED null <data>
Run Code Online (Sandbox Code Playgroud)
现在,两个消费者A,B想要处理它们.他们同时开始跑步.一个应该保留id 1,另一个应该保留id 2,然后完成的第一个应该保留id 3等等.
在纯多线程世界中,我会使用互斥锁来控制对作业队列的访问,但消费者是可以在不同机器上运行的不同进程.它们只访问同一个数据库,因此所有同步都必须通过数据库进行.
我在PostgreSQL中阅读了很多关于并发访问和锁定的文档,例如http://www.postgresql.org/docs/9.0/interactive/explicit-locking.html 选择Postgresql PostgreSQL中的解锁行并锁定
从这些主题中我了解到,以下SQL语句应该能够满足我的需求:
UPDATE jobrecord
SET owner= :owner, state = :reserved
WHERE id = (
SELECT id from jobrecord …
Run Code Online (Sandbox Code Playgroud) 有没有人知道一种pythonic方法迭代a的元素Queue.Queue
而不从队列中删除它们.我有一个生产者/消费者类型的程序,其中要处理的项目通过使用a传递Queue.Queue
,我希望能够打印剩余的项目.有任何想法吗?
当添加或关闭新的消费者/ brorker时,Kafka会触发重新平衡操作.Kafka Rebalancing是一种阻止操作吗?在重新平衡操作正在进行时,Kafka消费者是否受阻?
message-queue producer-consumer apache-kafka kafka-consumer-api
我在$ work处有一个应用程序,我必须在两个以不同频率调度的实时线程之间移动.(实际的调度是我无法控制的.)应用程序是硬实时的(其中一个线程必须驱动硬件接口),因此线程之间的数据传输应该是无锁的,并且无需等待尽可能.
重要的是要注意,只需要传输一个数据块:因为两个线程以不同的速率运行,所以有时会在较慢的线程的两个唤醒之间完成两个较快的线程迭代; 在这种情况下,可以覆盖写缓冲区中的数据,以便较慢的线程只获取最新的数据.
换句话说,双缓冲解决方案代替队列就足够了.这两个缓冲区在初始化期间分配,读取器和写入线程可以调用类的方法来获取指向这些缓冲区之一的指针.
C++代码:
#include <mutex>
template <typename T>
class ProducerConsumerDoubleBuffer {
public:
ProducerConsumerDoubleBuffer() {
m_write_busy = false;
m_read_idx = m_write_idx = 0;
}
~ProducerConsumerDoubleBuffer() { }
// The writer thread using this class must call
// start_writing() at the start of its iteration
// before doing anything else to get the pointer
// to the current write buffer.
T * start_writing(void) {
std::lock_guard<std::mutex> lock(m_mutex);
m_write_busy = true;
m_write_idx = 1 - m_read_idx;
return &m_buf[m_write_idx];
}
// The …
Run Code Online (Sandbox Code Playgroud) c++ concurrency real-time producer-consumer double-buffering
我最近遇到了生产者/消费者模式c#实现.它非常简单,(至少对我来说)非常优雅.
它似乎是在2006年左右设计的,所以我想知道这种实施是否
安全
- 仍然适用
代码如下(原始代码参考http://bytes.com/topic/net/answers/575276-producer-consumer#post2251375)
using System;
using System.Collections;
using System.Threading;
public class Test
{
static ProducerConsumer queue;
static void Main()
{
queue = new ProducerConsumer();
new Thread(new ThreadStart(ConsumerJob)).Start();
Random rng = new Random(0);
for (int i=0; i < 10; i++)
{
Console.WriteLine ("Producing {0}", i);
queue.Produce(i);
Thread.Sleep(rng.Next(1000));
}
}
static void ConsumerJob()
{
// Make sure we get a different random seed from the
// first thread
Random rng = new Random(1);
// We happen to …
Run Code Online (Sandbox Code Playgroud) 如果Queue中没有项目,ConcurrentQueue中的TryDequeue将返回false.
如果队列是空的,我需要我的队列将等待,直到新项目被添加到队列中并且它将新队列出列,并且该过程将继续这样.
我应该在C#4.0中使用monitor.enter,wait,pulse或任何更好的选项
ThreadPoolExecutor的JavaDoc 不清楚是否可以将任务直接添加到BlockingQueue
执行程序的后台.文档称调用executor.getQueue()
"主要用于调试和监视".
我正ThreadPoolExecutor
用自己的方式构建一个BlockingQueue
.我保留对队列的引用,以便我可以直接向其添加任务.返回相同的队列,getQueue()
因此我假设admonition getQueue()
适用于通过我的方式获取的对后备队列的引用.
代码的一般模式是:
int n = ...; // number of threads
queue = new ArrayBlockingQueue<Runnable>(queueSize);
executor = new ThreadPoolExecutor(n, n, 1, TimeUnit.HOURS, queue);
executor.prestartAllCoreThreads();
// ...
while (...) {
Runnable job = ...;
queue.offer(job, 1, TimeUnit.HOURS);
}
while (jobsOutstanding.get() != 0) {
try {
Thread.sleep(...);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
executor.shutdownNow();
Run Code Online (Sandbox Code Playgroud)
queue.offer()
VS executor.execute()
据我了解,典型的用途是通过添加任务executor.execute()
.上面示例中的方法具有阻塞队列的优点,但execute()
如果队列已满并立即失败并拒绝我的任务.我也喜欢提交作业与阻塞队列交互; 对我来说,这感觉更"纯粹"的生产者 …
java concurrency producer-consumer executorservice blockingqueue
java ×3
queue ×3
apache-kafka ×2
c# ×2
concurrency ×2
broker ×1
c++ ×1
monitor ×1
partitioning ×1
postgresql ×1
python ×1
real-time ×1
sql ×1