Java NIO 系列文章
Java NIO的三大核心组件:Channel(通道)、Buffer(缓冲区)、Selector(选择器)。其中通道和缓冲区,二者的联系也比较密切:数据总是从通道读到缓冲区内,或者从缓冲区写入到通道中。
至此,前面两个组件已经介绍完毕,下面迎来了最后一个非常重要的角色——选择器(Selector)。
选择器以及注册
选择器是什么?
简单的说:选择器的使命是完成IO多路复用。一个通道代表一条连接通路,通过选择器可以同时监控多个通道的IO就绪状态。
选择器和通道的关系是什么?
选择器和通道的关系是监控与被监控的关系。选择器监控通道
选择器提供了独特的API方法,能够选出(select)所监控的通道拥有哪些已经准备好的,处于就绪状态的IO事件。
一般来说,一个单线程处理一个选择器,一个选择器可以监控很多通道。通过选择器,一个单线程可以处理数百、数千、数万、甚至更多的通道。在极端情况下(数万个连接),只用一个线程就可以处理所有的通道,这样会大量地减少线程之间上下文切换的开销
选择器和通道如何建立关系?
通道和选择器的关系是通过register(注册)的方式完成的。通过调用通道的Channel.register(Selector sel,int ops)可以将指定通道的一个或多个IO事件类型注册到选择器中
可供选择器监控的通道IO事件类型,包括以下四种:
可读:SelectionKey.OP_READ
可写:SelectionKey.OP_WRITE
连接:SelectionKey.OP_CONNECT
接收:SelectionKey.OP_A
事件类型的定义在SelectionKey类中, 如果选择器要监控通道的多种事件,可以用“按位或”运算符来实现
//监控通道的多种事件,用“按位或”运算符来实现
int key = SelectionKey.OP_READ | SelectionKey.OP_WRITE ;
复制代码
需要注意:
一个通道,并不一定要支持所有的四种IO事件。例如服务器监听通道ServerSocketChannel,仅仅支持Accept(接收到新连接)IO事件;而SocketChannel传输通道,则不支持Accept(接收到新连接)IO事件
注册到选择器的通道,必须处于非阻塞模式下,否则将抛出IllegalBlockingModeException异常。这意味着,FileChannel文件通道不能与选择器一起使用,因为FileChannel文件通道只有阻塞模式,不能切换到非阻塞模式;而Socket套接字相关的所有通道都可以。
什么是IO事件?
这里的IO事件不是对通道的IO操作,而是通道的某个IO操作的一种就绪状态,表示通道具备完成某个IO操作的条件
比方说,某个SocketChannel通道,完成了和对端的握手连接,则处于“连接就绪”(OP_CONNECT)状态。
再比方说,某个ServerSocketChannel服务器通道,监听到一个新连接的到来,则处于“接收就绪”(OP_ACCEPT)状态。
还比方说,一个有数据可读的SocketChannel通道,处于“读就绪”(OP_READ)状态;一个等待写入数据的,处于“写就绪”(OP_WRITE)状态。
所有通道都可以注册到选择器中吗?
判断一个通道能否被选择器监控或选择,有一个前提:判断它是否继承了抽象类SelectableChannel(可选择通道)。
因此FileChannel文件通道就不能被选择器监控。
SelectionKey选择键
通道和选择器的监控关系注册成功后,就可以选择就绪事件
如何选择就绪事件
调用选择器Selector的select()方法来完成。
一旦在通道中发生了某些IO事件(就绪状态达成),并且是在选择器中注册过的IO事件,就会被选择器选中,并放入SelectionKey选择键的集合中
SelectionKey选择键是什么呢?
简单的说,SelectionKey选择键就是那些被选择器选中的IO事件。
至此 我们可以通过下图来理解 选择器与通道的关系
选择器的使用流程
使用选择器主要有以下三步:
获取选择器实例;
将通道注册到选择器中;
轮询感兴趣的IO就绪事件(选择键集合)。
下面通过一个例子来了解Selector的使用:
Discard服务器的功能很简单:仅仅读取客户端通道的输入数据,读取完成后直接关闭客户端通道;并且读取到的数据直接抛弃掉(Discard
服务端
public class NioDiscardServer{
public static void main(String[] args) throws IOException{
startServer();
}
public static void startServer() throws IOException{
// 1 获取选择器
Selector selector = Selector.open();
// 2 获取通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 3 通道设置为非阻塞
serverSocketChannel.configureBlocking(false);
// 4 绑定连接,在指定的ip:port 上等待连接
serverSocketChannel.bind(new InetSocketAddress(“127.0.0.1”, 8088));
System.out.println(“服务器已经启动”);
// 5 将通道注册到 selector 上,并监听 可接收 事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 6 selector 轮询,当有 IO事件 就绪时,执行while内部操作
while (selector.select() > 0) {
// 7 获取被选中的IO事件集合,并遍历
Iterator selectedKeys = selector.selectedKeys().iterator();
// 8 获取单个IO事件,并处理
while (selectedKeys.hasNext()) {
SelectionKey selectionKey = selectedKeys.next();
// 9 判断IO事件的具体类型
if (selectionKey.isAcceptable()) {
// 10 如果当前IO事件 类型为 “连接就绪” , 就获取客户端连接
SocketChannel socketChannel = serverSocketChannel.accept();
// 11 将客户端连接设置为 非阻塞
socketChannel.configureBlocking(false);
// 12 将该客户端连接 的 “可读就绪”事件 注册到 selector上
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
// 13 如果当前IO事件 类型为 “可读就绪” , 就获取客户端连接
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
//14 通过Buffer 读取数据
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
while (socketChannel.read(byteBuffer) != -1) {
// 切换到读模式
byteBuffer.flip();
System.out.println(new String(byteBuffer.array()));
// 切换到写模式(清空缓冲区)
byteBuffer.clear();
}
socketChannel.close();
}
}
// 15 移除 选择键
selectedKeys.remove();
}
// 16 关闭 连接
serverSocketChannel.close();
}
}
复制代码
客户端
public class NioDiscardClient{
public static void main(String[] args) throws IOException{
startClient();
}
public static void startClient() throws IOException{
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(“127.0.0.1”, 8088));
socketChannel.configureBlocking(false);
while (!socketChannel.finishConnect()) {
// 非阻塞情况下, connect会立即返回, 所以需要 不断的去尝试连接,直到连接成功(自旋)
}
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.put(“hello world”.getBytes());
byteBuffer.flip();
// 从byteBuffer 读取数据 到 写入到socketChannel 中
socketChannel.write(byteBuffer);
socketChannel.shutdownOutput();
socketChannel.close();
}
}
复制代码