Jro*_*Jro 11 java producer-consumer
从我的主要开始,我开始了两个名为生产者和消费者的线程。两者都包含while(true)循环。生产者循环是 UDP 服务器,因此它不需要睡眠。我的问题出在消费者循环中。消费者循环从链接队列中删除对象并将其传递给函数以进行进一步处理。根据研究,在循环中使用线程睡眠不是一个好习惯,因为有时 O/S 不会在设置时间结束时释放。如果我在应用程序理想时删除线程睡眠,它会将 CPU 拖到 20% 到 30%。
class Producer implements Runnable {
private DatagramSocket dsocket;
FError fer = new FError();
int port =1548;
ConcurrentLinkedQueue<String> queue;
Producer(ConcurrentLinkedQueue<String> queue){
this.queue = queue;
}
@Override
public void run() {
try {
// Create a socket to listen on the port.
dsocket = new DatagramSocket(port);
// Create a buffer to read datagrams into.
byte[] buffer = new byte[30000];
// Create a packet to receive data into the buffer
DatagramPacket packet = new DatagramPacket(buffer,
buffer.length);
while (true) {
try {
// Wait to receive a datagram
dsocket.receive(packet);
//Convert the contents to a string,
String msg = new String(buffer, 0, packet.getLength());
int ltr = msg.length();
// System.out.println("MSG =" + msg);
if(ltr>4)
{
SimpleDateFormat sdfDate = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss");//dd/MM/yyyy
Date now = new Date();
String strDate = sdfDate.format(now);
//System.out.println(strDate);
queue.add(msg + "&" + strDate);
// System.out.println("MSG =" + msg);
}
// Reset the length of the packet before reusing it.
packet.setLength(buffer.length);
} catch (IOException e) {
fer.felog("svr class", "producer", "producer thread",e.getClass().getName() + ": " + e.getMessage());
dsocket.close();
break;
}
}
} catch (SocketException e) {
fer.felog("svr class", "producer","Another App using the udp port " + port, e.getClass().getName() + ": " + e.getMessage());
}
}
}
Run Code Online (Sandbox Code Playgroud)
class Consumer implements Runnable {
String str;
ConcurrentLinkedQueue<String> queue;
Consumer(ConcurrentLinkedQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
while ((str = queue.poll()) != null) {
call(str); // do further processing
}
} catch (IOException e) {
ferpt.felog("svr class", "consumer", "consumer thread", e.getClass().getName() + ": " + e.getMessage());
break;
}
try {
Thread.sleep(500);
} catch (InterruptedException ex) {
ferpt.felog("svr class", "consumer","sleep", ex.getClass().getName() + ": " + ex.getMessage());
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
小智 12
extend Runnable您可以更改代码以包含ScheduledExecutorService每半秒运行一次队列轮询而不是使线程休眠,而不是让 Consumer进入。这方面的一个例子是
public void schedule() {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleAtFixedRate(() -> {
String str;
try {
while ((str = queue.poll()) != null) {
call(str); // do further processing
}
} catch (IOException e) {
ferpt.felog("svr class", "consumer", "consumer thread", e.getClass().getName() + ": " + e.getMessage());
}
}, 0, 500, TimeUnit.MILLISECONDS);
}
Run Code Online (Sandbox Code Playgroud)
解决问题的正确方法是使用阻塞队列。它为您提供了几个优势:
这是一个小演示,您可以使用它:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProdConsTest {
public static void main(String[] args) throws InterruptedException {
final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
final Runnable producer = () -> {
for (int i = 0; i < 1000; i++) {
try {
System.out.println("Producing: " + i);
queue.put(i);
//Adjust production speed by modifying the sleep time
Thread.sleep(100);
} catch (InterruptedException e) {
//someone signaled us to terminate
break;
}
}
};
final Runnable consumer = () -> {
while (true) {
final Integer integer;
try {
//Uncomment to simulate slow consumer:
//Thread.sleep(1000);
integer = queue.take();
} catch (InterruptedException e) {
//someone signaled us to terminate
break;
}
System.out.println("Consumed: " + integer);
}
};
final Thread consumerThread = new Thread(consumer);
consumerThread.start();
final Thread producerThread = new Thread(producer);
producerThread.start();
producerThread.join();
consumerThread.interrupt();
consumerThread.join();
}
}
Run Code Online (Sandbox Code Playgroud)
现在取消注释sleep()消费者中的 并观察应用程序发生了什么。如果您使用的是基于计时器的解决方案(例如建议的解决方案ScheduledExecutorService或您忙于等待),那么使用快速生产者,队列将无法控制地增长并最终使您的应用程序崩溃