che*_*rit 0 java multithreading java.util.concurrent mqtt
我正在尝试处理从MQ基础结构获得的一些消息.我有两个阻塞队列,sharedQueue和pubQueue.在sharedqueue得到与我从MQ基础设施below.It得到会把消息给邮件填满sharedQueue.
client.setCallback(new CallBack("inst",sharedQueue));
messagemanipulator线程将从中读取sharedQueue,处理它并将响应放入以pubQueue供稍后发布.
新的MessageManipulatorThread(sharedQueue,pubQueue).run();
发布者线程将从pubQueueMQ基础结构中获取消息并将其发布到MQ基础结构.
new PublisherThread(pubQueue).run();
以下是完整代码:
public class ArrayBlockingQueueExample {
private BlockingQueue<String> sharedQueue = new ArrayBlockingQueue<>(64);
private BlockingQueue<String> pubQueue = new ArrayBlockingQueue<>(64);
public static void main(String[] args) throws MqttException, Exception {
new ArrayBlockingQueueExample().startThreads();
}
public void startThreads() throws MqttException, Exception{
MqttClient client = new MQTTClientFactory().getInstance();
client.setCallback(new CallBack("inst", sharedQueue));
new MessageManipulatorThread(sharedQueue,pubQueue).run();
new PublisherThread(pubQueue).run();
}
public MessageManipulatorThread( BlockingQueue<String> sharedQueue , BlockingQueue<String> pubQueue){
this.sharedQueue = sharedQueue;
this.pubQueue = pubQueue;
}
public void run() {
while (true) {
try {
String msg = sharedQueue.take();
System.out.println(Thread.currentThread().getName() + "manipulator runnning => "+msg);
pubQueue.put(msg);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class PublisherThread implements Runnable {
private BlockingQueue<String> sharedQueue;
public PublisherThread(BlockingQueue<String> sharedQueue){
this.sharedQueue = sharedQueue;
}
public void run() {
while (true) {
System.out.println("Running pub");
try {
System.out.println("pub=>"+sharedQueue.take() );
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
问题new PublisherThread(pubQueue).run();永远不会奏效.我猜这是一个线程同步问题.pubQueue应该等到它有任何数据填满了MessageManipulatorThread,但它看起来不像那样.PublisherThread等待pubQueue它是免费的,但它永远不会变得自由!,还有什么我应该做的吗?任何帮助深表感谢.
你用的是Runnable.run()代替Thread.start(),所以这个:
new MessageManipulatorThread(sharedQueue,pubQueue).run();
new PublisherThread(pubQueue).run();
Run Code Online (Sandbox Code Playgroud)
不行.那是因为run()实际上在当前线程中运行runnable的方法而不是创建一个新线程并单独执行它.
相反,这样做:
new Thread(new MessageManipulatorThread(sharedQueue,pubQueue), "MessageManipulatorThread").start();
new Thread(new PublisherThread(pubQueue), "PublisherThread").start();
Run Code Online (Sandbox Code Playgroud)
编辑:
fge在问题中发表以下评论:
你为什么不
ExecutorService用手而不是用手工作呢?
为了阐明他的意思,他的意思是使用一个ExecutorService来处理消息,pubQueue而不是创建一个线程来拉取消息并手动处理它们.该代码如下所示:
ExecutorService executor = Executors.newSingleThreadExecutor();
new Thread(new MessageManipulatorThread(sharedQueue, executor), "MessageManipulatorThread").start();
Run Code Online (Sandbox Code Playgroud)
然后MessageManipulatorThread课程将改为:
public class MessageManipulatorThread implements Runnable {
private BlockingQueue<String> sharedQueue;
private ExecutorService executor;
public MessageManipulatorThread(BlockingQueue<String> sharedQueue, ExecutorService executor){
this.sharedQueue = sharedQueue;
this.executor = executor;
}
public void run() {
while (true) {
try {
String msg = sharedQueue.take();
System.out.println(Thread.currentThread().getName() + "manipulator runnning => "+msg);
executor.execute(new PublisherThread(msg));
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
然后PublisherThread,您将进行更改,以便仅使用进程传递给它的单个消息.这是您尝试做的另一种方法.
这种方法也允许一些灵活性.使用其他方法,PublisherThread一次只能处理一条消息(同步).使用该ExecutorService接口允许您更改实现,这可以允许它一次处理多个消息(异步),只需更改以下内容:
ExecutorService executor = Executors.newSingleThreadExecutor();
Run Code Online (Sandbox Code Playgroud)
对此:
ExecutorService executor = Executors.newFixedThreadPool(10);
Run Code Online (Sandbox Code Playgroud)
该语句允许执行程序启动最多10个线程,这意味着一次最多可处理10条消息.有关Executors创建ExecutorService实现的更多方法,请参阅该类.
| 归档时间: |
|
| 查看次数: |
434 次 |
| 最近记录: |