public synchronized static int get() {
while(cheia()==false){
try{
wait();
}
catch(InterruptedException e){
}
}
if (fila[inicio] != 0) {
int retornaValor = fila[inicio];
fila[inicio] = 0;
inicio++;
if (inicio == size) {
inicio = 0;
}
notifyAll();
return retornaValor;
}
notifyAll();
return 0;
}
Run Code Online (Sandbox Code Playgroud)
为什么wait()和notifyAll()在这段代码中没有运行?
IDE 说: wait() 方法(或 notifyAll)不是静态的?
你能帮助我吗?
我得到了一个可以归结为生产者-消费者模式的应用程序。几个线程正在做一些工作并更新单个数据集,以便更多线程可以使用该数据并使用它做自己的工作。目前,它并不是非常复杂,所有消费线程都在等待数据集,直到其中一个生产者调用一个pulseall。
现在希望有一个消费者线程在任何一组更改时从两个不同的数据集消费。团队希望将重构保持在最低限度,而我有限的线程经验让我在寻找干净的解决方案时遇到了一些问题。
快速而肮脏的解决方案是在单独的对象上进行等待和脉冲,并让消费者线程在继续之前检查其数据集中的更改。似乎没有一种方法可以让一个线程等待两个对象,而不用更强大的线程工具(线程池、任务等)替换通用线程,除非我没有找到正确的东西。
我想在 Kafka 中试验 ACL。所以我在我的 Zookeeper 中为示例主题测试创建了一些 ACL,它允许对User:Bob主体的生产者权限和对User:Alice主体的消费者权限。
现在,当我使用 kafka-console-producer 或消费者时,我如何提及此生产者或消费者的委托人?
我尝试在producer/consumer.properties文件中设置一个像principal=User:Bob这样的新配置,但它说这是一个无法识别的配置。
有人可以帮我解决这个问题吗?或者我理解错了校长的想法吗?这与基于 Kerberos 的身份验证有关吗?
提前致谢,沙比尔
authentication authorization kerberos producer-consumer apache-kafka
例如,假设我有一个包含 4 个分区的主题。我向这个主题发送了 4k 条消息。每个分区获得 1k 条消息。由于外部因素,其中 3 个消费者分别处理了所有 1k 条消息。但是,第 4 个分区只能通过 200 条消息,还剩下 800 条消息需要处理。有没有一种机制可以让我“重新平衡”主题中的数据,说给分区 1-3 200 的分区 4s 数据,让所有带有 200 条消息的分区成为一个进程?
我不是在寻找一种向消费者组添加额外节点并让 kafka 平衡分区的方法。
添加了重新分配分区的输出:
当前分区副本分配
{
"version": 1,
"partitions": [
{
"topic": "MyTopic",
"partition": 0,
"replicas": [
0
],
"log_\ndirs": [
"any"
]
},
{
"topic": "MyTopic",
"partition": 1,
"replicas": [
0
],
"log_dirs": [
"any"
]
},
{
"topic": "MyTopic",
"partition": 4,
"replicas": [
0
],
"log_dirs": [
"any"
]
},
{
"topic": "MyTopic",
"partition": …
Run Code Online (Sandbox Code Playgroud) 我正在使用BlockingCollection
生产者消费者模式,我有一个例外,我想写一个专利 - 只有谷歌的两个结果!预期是"CompleteAdding可能不会与添加到集合中同时使用",并且当我TryAdd
在BlockingCollection上时会发生以下情况:
public void EnqueueTask(T item)
{
if (!_cancellationTokenSource.IsCancellationRequested)
{
_workerQueue.Add(item);
}
}
Run Code Online (Sandbox Code Playgroud)
在CompleteAdding
处理Consumer-Producer包装类时调用它:
public void Dispose()
{
if (!_IsActive)
return;
_IsActive = false;
_cancellationTokenSource.Cancel();
_workerQueue.CompleteAdding();
// Wait for the consumer's thread to finish.
for (int i = 0; i < _workers.Length; ++i)
{
Task t1 = Task.Factory.StartNew(() =>
{
try
{
if (!_workers[i].Join(4000))
LogWriter.Trace("Failed to join thread", "ThreadFailureOnDispose");
}
catch (Exception ex)
{
OnLogged(ex.Message + ex.StackTrace);
}
});
}
// Release any …
Run Code Online (Sandbox Code Playgroud) 我正在尝试使用进程为生产者使用者编写简单的代码.制片人是一个过程.对于消费者,我从Pool获取流程.
from multiprocessing import Manager, Process, Pool
from time import sleep
def writer(queue):
for i in range(10):
queue.put(i)
print 'put 1 size now ',queue.qsize()
sleep(1)
def reader(queue):
print 'in reader'
for i in range(10):
queue.get(1)
print 'got 1 size now ', queue.qsize()
if __name__ == '__main__':
q = Manager().Queue()
p = Process(target=writer, args=(q,))
p.start()
pool = Pool()
c = pool.apply_async(reader,q)
Run Code Online (Sandbox Code Playgroud)
但是我收到了错误,
Process Process-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in …
Run Code Online (Sandbox Code Playgroud) 我试图解决多线程问题,并且我很难了解其行为.
问题是: 有2个线程同时消耗偶数和奇数.我必须介绍它们之间的线程通信,以便在自然顺序中具有"消耗".
这是我的代码
public class EvenOddDemo {
public static void main(String[] args) {
Number n = new Number();
EvenThread et = new EvenThread(n);
OddThread ot = new OddThread(n);
et.start();
ot.start();
}
}
class EvenThread extends Thread {
private Number number;
public EvenThread(Number number) {
this.number = number;
}
@Override
public void run() {
for(int i=0; i<5; i++) {
System.out.println(number.getEven());
}
}
}
class OddThread extends Thread {
private Number number;
public …
Run Code Online (Sandbox Code Playgroud) 我正在尝试多生产者 - 生产者 - 消费者问题的多个消费者使用案例.我正在使用BlockingQueue在多个生产者/消费者之间共享公共队列.
以下是我的代码.
制片人
import java.util.concurrent.BlockingQueue;
public class Producer implements Runnable {
private BlockingQueue inputQueue;
private static volatile int i = 0;
private volatile boolean isRunning = true;
public Producer(BlockingQueue q){
this.inputQueue=q;
}
public synchronized void run() {
//produce messages
for(i=0; i<10; i++)
{
try {
inputQueue.put(new Integer(i));
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Produced "+i);
}
finish();
}
public void finish() {
//you can also clear here if you wanted
isRunning = false;
} …
Run Code Online (Sandbox Code Playgroud) 我需要使用Java和Spring框架使用生产者 - 消费者模式来解决问题.问题是我是Spring框架中的新手(我不知道)所以有人能指导我如何使用Spring做到这一点吗?一些书籍或网站,我可以逐步找到一个关于用Spring实现生产者 - 消费者模式的例子吗?
我正在尝试使用一个场景,其中生产者在缓冲区(state.value)中生成一个值,并且多个消费者正在读取缓冲区并在数组中更新它.下面是代码.
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
pthread_mutex_t mutex;
pthread_cond_t prod, cons;
static int count = 0;
struct shared_state{
int done;
int value;
int value_available;
int *array;
int j;
}state;
void * producer(void *arg){
int a[] = {12,11,10,9,8,7};
int size = sizeof(a)/sizeof(a[0]);
int i = 0;
while(i<size){
pthread_mutex_lock(&mutex);
while(state.value_available)
pthread_cond_wait(&prod, &mutex);
state.value = a[i++];
printf("In producer: %d\n",state.value);
state.value_available = 1;
pthread_cond_signal(&cons);
pthread_mutex_unlock(&mutex);
}
state.done = 1;
count++;
printf("Producer count: %d\n",count);
pthread_exit(NULL);
}
void * consumer(void *arg){
while(!(state.done)){
pthread_mutex_lock(&mutex);
while(!state.value_available)
pthread_cond_wait(&cons,&mutex); …
Run Code Online (Sandbox Code Playgroud)