NIO模型

  • Post author:
  • Post category:其他


NIO的介绍

NIO(New IO)同步非阻塞IO模型,采用了事件驱动的思想来实现一个复用器,来解决大并发的问题

NIO中,将读、写、可连接、可接受等操作在BIO中是阻塞处理,将操作设置为非阻塞,并将这些操作视为事件,当关注某个事件时,将其注册到复用器Selector(本质上使用底层操作系统提供的IO复用器:select、poll,epoll)。由系统来监听事件是否准备就绪, 当socket有读或者可写操作时,系统会通知相应的用户程序来处理,将流读取到缓冲区或者写入到系统中

一个复用器上是可以注册多个用户的连接,有效的用户请求才会来通知用户程序,用户程序才进行介入处理,

复用器(Selector)也叫做多路选择器,作用是检查一个或者多个NIO的channel(Channel),可以实现单线程管理多个channel,也可以管理多个网络请求

channel:通道,用户IO操作的连接,对原有的IO补充,不能直接访问数据需要和缓冲区BUffer结合使用,通道主要有

SocketChannel:主要是用户连接服务端,一般在客户端实现

ServerSocketChannel:监听新进来的TCP连接,对于每一个新用户连接创建一个SocketChannel实例,一般在服务端实现

Buffer:缓冲区

IO流中的数据需要经过缓冲区交给channel

NIO编码

服务端编程

1、实例ServerSocketChannel实例【ServerSocketChannel.

open

()】

2、服务端绑定端口【bind】

3、将serversocketchannel设置为非阻塞【serverSocketChannel.configureBlocking(

false

)】

4、实例话selector实例【Selector.

open

()】

5、将serversocketchannel注册到selector上,并关注可接受事件【serverSocketChannel.register(selector,SelectionKey.


OP_ACCEPT


)】

6、selector阻塞等待至少一个事件发生【selector.select()】

7、有感兴趣事件发生,遍历感兴趣事件集合

8、如果是可接受事件、通过serversocketchannel调用accept获取socketchannel

9、将socketchannel设置为非阻塞

10、将socketchannel注册到selector复用上,并关注可读、可写事件

11、循环等待感兴趣事件集合,即跳转至第6步

12、如果当前是可读事件、通过socketchannel调用read来进行读数据操作

13、关闭资源

/**
 * desc:NIO服务端编程
 */

public class Server {
    public static void main(String[] args) {
        ServerSocketChannel serverSocketChannel = null;
        try {
            //创建ServerSocketchannel实例
            serverSocketChannel = ServerSocketChannel.open();

            //绑定端口
            serverSocketChannel.bind(new InetSocketAddress(7777));
            System.out.println("服务端启动啦");

            //设置ServerSocketchannel为非阻塞  true:表示则塞 false:非阻塞
            serverSocketChannel.configureBlocking(false);


            //创建复用器selector实例
            Selector selector = Selector.open();

            //将serverSocketChannel注册到selector中,并关注的是可接受事件
            /**
             * register(Selector sel, int ops)
             * 第一个参数:复用器实例
             * 第二个参数:表述关注的事件
             * 存在4种事件
             * 1、可读事件 read
             * 2、可写事件  write
             * 3、可接收事件 accept
             * 4、可连接事件 connect
             *
             */
            serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT | SelectionKey.OP_WRITE);


            //selector阻塞等待事件发生
            while (selector.select() >= 0) {
                //获取已发生事件集合
                Iterator <SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    //删除已完成事件
                    iterator.remove();
                    if (selectionKey.isValid() && selectionKey.isAcceptable()) {
                        System.out.println("有接收事件发生");
                        //有可接受事件发生
                        //SelectableChannel 所有通道的父类,ServerSocketChannel、SocketChannel
                        ServerSocketChannel serverSocketChannel1 = (ServerSocketChannel) selectionKey.channel();
                        //通过调用accept获取SocketChannel实例
                        SocketChannel socketChannel = serverSocketChannel1.accept();
                        System.out.println("有新用户连接啦"+socketChannel.getRemoteAddress());

                        //设置为非阻塞
                        socketChannel.configureBlocking(false);

                        //将socketChannel注册到selector复用器上,并关注读事件
                        socketChannel.register(selector, SelectionKey.OP_READ);
                    }

                    if (selectionKey.isValid() && selectionKey.isReadable()) {
                        System.out.println("有可读事件发生");
                        //有可读事件发生
                        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();

                        //读取数据
                        //创建一个缓存实例
                        ByteBuffer buffer = ByteBuffer.allocate(100);
                        //将数据从channel写入缓存
                        int read = socketChannel.read(buffer);
                        //读写模式切换
                        buffer.flip();

                        //创建有效大小的byte数组
                        byte[] bytes = new byte[read];
                        buffer.get(bytes);

                        String msg = new String(bytes);
                        System.out.println("数据为:"+msg);

                    }
                }
            }

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            //关闭资源
            System.out.println("服务端关闭啦");
            if (serverSocketChannel != null) {
                try {
                    serverSocketChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

客户端编码流程

1、创建socketchannel实例

2、将socketchannel设置非阻塞

3、创建Selector复用器实例

4、将socketchannel实例注册到Selector实例中,并关注可连接事件

5、socketchannel主动调用connect连接服务端,会立即返回结果

6、如果结果为false:当前连接还未完成,通过selector.select由系统监听事件发生,

7、进行读写事件

8、关闭资源

/**
 * desc:NIO客户端
 *
 */

public class Client {
    public static void main(String[] args) {
        SocketChannel socketChannel = null;
        try {
            //创建socketChannel实例
            socketChannel = SocketChannel.open();

            //将socketChannel设置为非阻塞
            socketChannel.configureBlocking(false);

            //创建Selector复用器实例
            Selector selector = Selector.open();

            //将socketChannel注册到Selector实例中,并关注可连接事件
            socketChannel.register(selector,SelectionKey.OP_CONNECT);

            //主动连接服务端,该方法不会阻塞,会立即返回结果,要么成功,要么未连接成功,则交由os等待连接过程
            if (!socketChannel.connect(new InetSocketAddress("127.0.0.1", 7777))) {
                //返回为false,表示连接还未连接成功
                System.out.println("connect还未完成!");

                //等待系统返回就绪事件
                selector.select();

                //进行完成连接操作
                socketChannel.finishConnect();
            }
            System.out.println("客户端连接服务端成功");


            //进行读写事件
            byte[] bytes = "Hello Tulun".getBytes();
            //创建buffer
            ByteBuffer buffer = ByteBuffer.allocate(bytes.length);

            //往Buffer写数据
            buffer.put(bytes);

            //读写模式切换
            buffer.flip();

            //往socketchannel通道中写数据
            socketChannel.write(buffer);



        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            //关闭资源
            if (socketChannel != null) {
                System.out.println("客户端关闭啦");
                try {
                    socketChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

问题:

问题1:客户端为什么主动connect连接?

在BIO中connect操作是可阻塞的方法,在NIO中设置为非阻塞,交给复用器来监听事件是否完成(connect可连接事件),内核帮助监听事件是否完成,必须先触发事件,触发之后内核才能监听事件是否完成,客户端来主动连接服务端,在NIO中connect是非阻塞,当前connect操作立即放回(true:连接已完成 false:当前事件已经触发了,但连接还没完成,就需要等待内核来关注事件是否完成)

问题2:为什么客户端断开连接服务端一致循环接收到可读事件为空?

客户端断开连接,服务端会接收到-1,占用空间,服务端认为有数据接收,,就会一直认为有可读事件服务端需要处理。

判断通道接收为-1表示结束接收

问题3:为什么写操作没有注册到复用器?

写操作在NIO中也是一个事件,但是写事件需要主动发起写操作,一般写完会立即write操作不会进行阻塞,即通常写操作不需要注册

NIO重要组件

channel:通道

Java中NIO的channel类似流,

但和流有不同

-通道即可以从通道中读取数据,又可以写数据到通道,但流的读写通常是单向的

-通道中的数据需要先读到一个buffer,或者要从一个buffer写入

channel示例:


channel主要实现类

SocketChannel:通过TCP来读写网络中的数据,一般是客户端的实现

ServerSocketChannel:监听新进来的TCP连接,对每一个新连接创建一个SocketChannel,一般是服务端的实现

DatagramChannel:通过UDP读写网络中的数据通道

FileChannel:用于读取、写入等操作文件的通道

FileChanle使用示例:

        RandomAccessFile randomAccessFile = new RandomAccessFile("/Users/gongdezhe/Desktop/download/test", "rw");
        //可以进行读写操作
        FileChannel fileChannel = randomAccessFile.getChannel();

        //写数据
        byte[] bytes = "hello".getBytes();

        //将数据交给buffer
        ByteBuffer byteBuffer = ByteBuffer.allocate(40);
        byteBuffer.put(bytes); //将数据写入到buffer

        byteBuffer.flip();

        //写入通道
        fileChannel.write(byteBuffer);

        //关闭通道
        fileChannel.close();

Buffer:缓存

Java中的NIO的Buffer用与和Channel交互

Java堆NIO中的内存缓存包装成BUffer对象,提供了一系列方法,方便开始使用

基本使用,

以读数据为例:

1、从channel将数据写入buffer

2、调用buffer.flip()进行读写切换

3、从buffer中读取数据

4、对 buffer清空clear()和compact()

Buffer 实现

Buffer底层是通过特定类型(long,double…)的数组存储数据

buffer底层是借助4个指针来操作的


// Invariants: mark <= position <= limit <= capacity



private int


mark

= -1;


private int


position

= 0;


private int


limit

;


private int


capacity

;

capacity:buffer是有固定大小的,即capacity表示底层数组的大小

position:取决于读写模式

写操作:将数据写入buffer,每写入数据position会移动一位,初始值为0,最大值为capacity

读操作:会从某个特定位置读,当读写模式切换后,position会被重置为0,读取数据时,position移动表示下一个读取的位置

limit:写模式下,表示最多王buffer中写入的数据,最大写入的位置capacity,此时limit=capacity;

读模式下,limit表示最多能读取到的数据

mark:

读写过程中指针变化

Buffer 类型:

Buffer是一个抽象类,其实现的子类有:

ByteBuffer

,

CharBuffer

,

DoubleBuffer

,

FloatBuffer

,

IntBuffer

,

LongBuffer

,

ShortBuffer

buffer缓冲区的分配:

Buffer分配空间分两种,一种是在堆上开辟的空间,一种是在堆外开辟的空间(零拷贝技术)

Buffer对象的创建:

以ByteBuffer为例介绍:

ByteBuffer allocate(

int

capacity):在堆上创建指定大小的缓冲区

ByteBuffer wrap(

byte

[] array):通过byte数组创建一个缓冲区

ByteBuffer wrap(

byte

[] array,

int

offset,

int

length),截取bytebuffer部分内容创建缓冲区

ByteBuffer allocateDirect(

int

capacity) 在堆外内存创建一个指定大小的缓冲区

Buffer中的方法

向buffer中写数据

1、从channel写到buffer

channel.read(buffer);

2、通过buffer的put方法

buffer.put()

flip()读写模式切换:通过flip操作将limit置为position,然后将position置为0

从buffer中读数据:

有两种方式:

1、从buffer中读取到channel

channel.write(buffer)

2、通过buffer的get操作处理

buffer.get()

selector:复用器

作用是检查一个或者多个NIO Channel的状态是否是可读、可写..,可以实现单线程管理多个channel,也可以管理多个网络请求

Selector的使用

1、创建selector的实例

Selector selector = Selector.

open

();

2、将通道注册到复用器上


//将socketChannel设置为非阻塞


socketChannel.configureBlocking(

false

);


//将socketChannel注册到Selector实例中,并关注可连接事件


socketChannel.register(selector,SelectionKey.


OP_CONNECT


);

注册的channel是非阻塞的,

SelectableChannel抽象类中提供了configureBlocking设置当前的通道是阻塞的还是非阻塞的

3、使用复用器来监听事件是否完成


//等待系统返回就绪事件


selector.select();

select方法是会阻塞的,直至内核监听到注册的感兴趣事件发生才返回

4、遍历感兴趣事件集合

selector.selectedKeys().iterator()

通过selectedKeys返回的是SelectionKey的set集合

SelectionKey中提供的方法:

public abstract SelectableChannel channel();
//返回该SelectionKey对应通道
public abstract Selector selector();
//返回该SelectionKey注册的选择器
public abstract boolean isValid();
//判断该SelectionKey是否有效
public abstract void cancel();
//撤销该SelectionKey
public abstract int interestOps();
//返回SelectionKey的关注操作符
//设置该SelectionKey的关注键,返回更改后新的SelectionKey
public abstract SelectionKey interestOps(int ops);
public abstract int readyOps();
//返回SelectionKey的预备操作符

//这里readyOps()方法返回的是该SelectionKey的预备操作符
//判断该SelectionKey的预备操作符是否是OP_READ
public final boolean isReadable() {
    return (readyOps() & OP_READ) != 0;
}
//判断该SelectionKey的预备操作符是否是OP_WRITE
public final boolean isWritable() {
    return (readyOps() & OP_WRITE) != 0;
}
//判断该SelectionKey的预备操作符是否是OP_CONNECT
public final boolean isConnectable() {
    return (readyOps() & OP_CONNECT) != 0;
}
//判断该SelectionKey的预备操作符是否是OP_ACCEPT
public final boolean isAcceptable() {
    return (readyOps() & OP_ACCEPT) != 0;
}

//设置SelectionKey的附件
public final Object attach(Object ob) {
    return attachmentUpdater.getAndSet(this, ob);
}
//返回SelectionKey的附件
public final Object attachment() {
    return attachment;
}

interestOps():对应的是该SelectionKey上注册的关注事件,通过register来完成感兴趣事件的集合

readyOps():通道上实际准备就绪的事件,

Selector抽象类中提供的方法:

Set<SelectionKey> keys()方法返回所有的注册的通道实例,一个selector上是可以注册多个通道,即返回所有注册的通道SelectionKey

Set<SelectionKey> selectedKeys():返回的是所有准备就绪的通道实例

选择过程:

select获取准备就绪的时间,返回就是已经有就是时间发生

int select():阻塞到至少有一个通道上注册的事件就绪了

int select(long timeout);和select()一样,会则塞等待事件就绪,最长阻塞时间为timeout,

int selectNow():非阻塞,调用会立即返回

返回结果为int类型表示有多少通道已经就绪,是自上次调用select后已经处于就绪的通道数量。上次未处理的通道是不计入当前数量

停止选择过程:

wakeup():通过调用wakeup()方法让调用select()处于阻塞的方法立即返回

close():关闭selector实例,使得任意一个处于select则塞的线程都会被唤醒,会将selector上所有的channel都被注销(cancel)

选择过程:

在程序执行过程中,可以调用key.cancel()方法,该方法不会立即注销channel通道等操作,而是将SelectionKey加入到已取消的集合中,在调用select()等方法时才会进行通道注销等操作

Java中Selector复用器本质是对操作系统提供的IO复用模型(select()、poll()、epoll())封装


BIO和NIO区别:

1、BIO使用过程中为了解决并发量问题必须引入多线程

2、NIO中只需要使用少量的线程,就可以进行高并发量的处理,NIO和BIO最大的不同,引入了复用器(IO复用模型:select、poll、epoll),就可以同时关注多个用户的连接的问题



版权声明:本文为xkyjwcc原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。