ben*_*n c 2 java sockets concurrency multithreading producer-consumer
这个问题是在运行使用消费者/生产者设计创建的套接字服务器时出现的,程序cpu time limit exceeded
因日志错误而崩溃。我还发现cpu
使用量比当时更多90%
。这是服务器的代码,它可能出了什么问题,我该如何优化它?
我使用这种queue
方法来避免threads
为每个请求创建如此多的请求。
在主方法中(主线程)
//holds socket instances
ConcurrentLinkedQueue<Socket> queue = new ConcurrentLinkedQueue<>();
//create producer thread
Thread producer = new Thread(new RequestProducer(queue));
//create consumer thread
Thread consumer = new Thread(new RequestConsumer(queue));
producer.start();
consumer.start();
Run Code Online (Sandbox Code Playgroud)
请求生产者线程
//this holds queue instance coming from main thread
ConcurrentLinkedQueue<Socket> queue
//constructor, initiate queue
public RequestProducer(
ConcurrentLinkedQueue<Socket> queue
) {
this.queue = queue;
}
public void run() {
try {
//create serversocket instance on port 19029
ServerSocket serverSocket = new ServerSocket(19029);
while (true) {
try {
//keep accept connections
Socket socket = serverSocket.accept();
//add socket to queue
queue.offer(socket);
} catch (ConnectException ce) {//handle exception
} catch (SocketException e) {//handle exception
}
}
} catch (IOException ex) {//handle exception}
}
Run Code Online (Sandbox Code Playgroud)
请求消费者线程
//this holds queue instance coming from main thread, same as requestproducer
ConcurrentLinkedQueue<Socket> queue
//constructor, initiate queue
public RequestConsumer(
ConcurrentLinkedQueue<Socket> queue
) {
this.queue = queue;
}
public void run() {
try {
Socket socket = null;
while (true) {
//get head of the queue (socket instance)
socket = queue.poll();
if (null != socket) {
//process data stream
String in = DataStreamUtil.parseAsciiSockStream(socket.getInputStream());
//close socket conection
socket.close();
//excecute database insert of processed data
excecuteDbInsert(in);
}
}
} catch (IOException | ParseException ex) {//handle exceptions}
}
Run Code Online (Sandbox Code Playgroud)
数据流解析器
public static String parseAsciiSockStream(InputStream in) throws IOException {
StringBuilder builder = new StringBuilder();
if (null != in) {
byte[] b = new byte[BYTE_STREAM_MAX];
int length = in.read(b);
for (int i = 0; i < length; i++) {
builder.append((char) (int) b[i]);
}
in.close();
}
return builder.toString();
}
Run Code Online (Sandbox Code Playgroud)
由于while(true)
进入消费者的激进循环而超出 CPU 时间限制。下面是如何解决问题的示例。
您可以Thread.sleep(1)
在您的 while 循环中添加 simple到 Consumer 或使用等待/通知模式来限制 CPU 消耗。
请求生产者线程
import java.io.IOException;
import java.net.ConnectException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.concurrent.ConcurrentLinkedQueue;
public class RequestProducer implements Runnable {
//this holds queue instance coming from main thread
final ConcurrentLinkedQueue<Socket> queue;
//constructor, initiate queue
public RequestProducer(
ConcurrentLinkedQueue<Socket> queue
) {
this.queue = queue;
}
public void run() {
try {
//create serversocket instance on port 19029
ServerSocket serverSocket = new ServerSocket(19029);
while (true) {
try {
//keep accept connections
Socket socket = serverSocket.accept();
//add socket to queue
queue.offer(socket);
synchronized (queue) {
System.out.println("notifying");
queue.notify();
}
} catch (ConnectException ce) {//handle exception
} catch (SocketException e) {//handle exception
}
}
} catch (IOException ex) {//handle exception}
}
}
}
Run Code Online (Sandbox Code Playgroud)
请求消费者线程
import java.io.IOException;
import java.net.Socket;
import java.text.ParseException;
import java.util.concurrent.ConcurrentLinkedQueue;
public class RequestConsumer implements Runnable {
//this holds queue instance coming from main thread, same as requestproducer
final ConcurrentLinkedQueue<Socket> queue;
//constructor, initiate queue
public RequestConsumer(
ConcurrentLinkedQueue<Socket> queue
) {
this.queue = queue;
}
public void run() {
try {
Socket socket = null;
while (true) {
//get head of the queue (socket instance)
System.out.println("Waiting for new socket");
synchronized (queue) {
queue.wait();
}
System.out.println("Acquired new socket");
socket = queue.poll();
try {
if (null != socket) {
//process data stream
String in = DataStreamUtil.parseAsciiSockStream(socket.getInputStream());
//close socket conection
socket.close();
//excecute database insert of processed data
//excecuteDbInsert(in);
System.out.println(in);
}
} finally {
if (socket != null) {
socket.close();
}
}
}
} catch (IOException ex) {//handle exceptions}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
Run Code Online (Sandbox Code Playgroud)
数据流解析器
import java.io.IOException;
import java.io.InputStream;
public class DataStreamUtil {
public static String parseAsciiSockStream(InputStream in) throws IOException {
StringBuilder builder = new StringBuilder();
if (null != in) {
byte[] b = new byte[BYTE_STREAM_MAX];
System.out.println("Waiting for input");
int length = in.read(b);
System.out.println("Got input");
for (int i = 0; i < length; i++) {
builder.append((char) (int) b[i]);
}
in.close();
}
return builder.toString();
}
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
969 次 |
最近记录: |