use*_*301 5 java sockets nonblocking
我有两个线程正在处理用于非阻塞套接字的 Java NIO。这是线程正在做的事情:
线程 1:调用选择器的 select() 方法的循环。如果有任何密钥可用,则相应地处理它们。
线程 2:偶尔会通过调用 register() 向选择器注册一个 SocketChannel。
问题是,除非 select() 的超时时间非常小(比如大约 100 毫秒),否则对 register() 的调用将无限期阻塞。即使通道被配置为非阻塞,并且 javadocs 声明 Selector 对象是线程安全的(但它的选择键不是,我知道)。
所以有人对这个问题有什么想法吗?如果我将所有内容都放在一个线程中,该应用程序就可以完美运行。没有问题发生,但我真的很想有单独的线程。任何帮助表示赞赏。我在下面发布了我的示例代码:
将 select(1000) 更改为 select(100) 它将起作用。将其保留为 select() 或 select(1000) ,它不会。
import java.io.IOException;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class UDPSocket
{
private DatagramChannel clientChannel;
private String dstHost;
private int dstPort;
private static Selector recvSelector;
private static volatile boolean initialized;
private static ExecutorService eventQueue = Executors.newSingleThreadExecutor();
public static void init()
{
initialized = true;
try
{
recvSelector = Selector.open();
}
catch (IOException e)
{
System.err.println(e);
}
Thread t = new Thread(new Runnable()
{
@Override
public void run()
{
while(initialized)
{
readData();
Thread.yield();
}
}
});
t.start();
}
public static void shutdown()
{
initialized = false;
}
private static void readData()
{
try
{
int numKeys = recvSelector.select(1000);
if (numKeys > 0)
{
Iterator i = recvSelector.selectedKeys().iterator();
while(i.hasNext())
{
SelectionKey key = i.next();
i.remove();
if (key.isValid() && key.isReadable())
{
DatagramChannel channel = (DatagramChannel) key.channel();
// allocate every time we receive so that it's a copy that won't get erased
final ByteBuffer buffer = ByteBuffer.allocate(Short.MAX_VALUE);
channel.receive(buffer);
buffer.flip();
final SocketSubscriber subscriber = (SocketSubscriber) key.attachment();
// let user handle event on a dedicated thread
eventQueue.execute(new Runnable()
{
@Override
public void run()
{
subscriber.onData(buffer);
}
});
}
}
Run Code Online (Sandbox Code Playgroud)
}
}
catch (IOException e)
{
System.err.println(e);
}
}
public UDPSocket(String dstHost, int dstPort)
{
try
{
this.dstHost = dstHost;
this.dstPort = dstPort;
clientChannel = DatagramChannel.open();
clientChannel.configureBlocking(false);
}
catch (IOException e)
{
System.err.println(e);
}
}
public void addListener(SocketSubscriber subscriber)
{
try
{
DatagramChannel serverChannel = DatagramChannel.open();
serverChannel.configureBlocking(false);
DatagramSocket socket = serverChannel.socket();
socket.bind(new InetSocketAddress(dstPort));
SelectionKey key = serverChannel.register(recvSelector, SelectionKey.OP_READ);
key.attach(subscriber);
}
catch (IOException e)
{
System.err.println(e);
}
}
public void send(ByteBuffer buffer)
{
try
{
clientChannel.send(buffer, new InetSocketAddress(dstHost, dstPort));
}
catch (IOException e)
{
System.err.println(e);
}
}
public void close()
{
try
{
clientChannel.close();
}
catch (IOException e)
{
System.err.println(e);
}
}
}
Run Code Online (Sandbox Code Playgroud)
import java.nio.ByteBuffer;
public interface SocketSubscriber
{
public void onData(ByteBuffer data);
}
Run Code Online (Sandbox Code Playgroud)
用法示例:
public class Test implements SocketSubscriber
{
public static void main(String[] args) throws Exception
{
UDPSocket.init();
UDPSocket test = new UDPSocket("localhost", 1234);
test.addListener(new Test());
UDPSocket test2 = new UDPSocket("localhost", 4321);
test2.addListener(new Test());
System.out.println("Listening...");
ByteBuffer buffer = ByteBuffer.allocate(500);
test.send(buffer);
buffer.rewind();
test2.send(buffer);
System.out.println("Data sent...");
Thread.sleep(5000);
UDPSocket.shutdown();
}
@Override
public void onData(ByteBuffer data)
{
System.out.println("Received " + data.limit() + " bytes of data.");
}
}
Run Code Online (Sandbox Code Playgroud)
选择器有多个已记录的内部同步级别,您正在遇到所有这些级别。wakeup()在调用之前调用选择器确保register().循环select()正常工作,如果有零个选定的键,这将发生在wakeup().
| 归档时间: |
|
| 查看次数: |
6710 次 |
| 最近记录: |