【JavaNIO】Selector

  • Post author:
  • Post category:java


学习内容主要基于B站UP主,青空の霞光的视频,加上自己的理解。



多路复用网络通信

NIO框架的三大核心:Buffer、

Channel



Selector



传统阻塞IO网络通信

使用socket家里TCP连接进行网络通信

public class Server { 
    public static void main(String[] args) {
        try(ServerSocket server = new ServerSocket(8080);) {       
            System.out.println("等待客户端连接");
            //通过此socket获取客户端连接
            //当没有客户端连接,线程会阻塞,直到有客户端连接为止
            Socket socket = server.accept(); 
            System.out.println("客户端已连接,IP为:" + socket.getInetAddress().getHostAddress());
            System.out.println("读取客户端数据:");
            //通过socket获取输入流
            BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            System.out.println(reader.readLine());

            socket.close();

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
public class Client {
    public static void main(String[] args) {
        try(Socket socket = new Socket("localhost", 8080)) {
            System.out.println("已连接到服务器");
            //通过socket获取输出流
            OutputStreamWriter writer = new OutputStreamWriter(socket.getOutputStream());
            writer.write("haha\n");
            writer.flush();

            System.out.println("数据已发送");
        } catch (Exception e) {
            System.out.println("服务器连接失败");
            e.printStackTrace();
        }
    }
}

也可以通过

Channel

来进行通信

public class Server {

    public static void main(String[] args) {
        try(ServerSocketChannel serverChannel = ServerSocketChannel.open()) {
            //绑定到8080端口
            serverChannel.bind(new InetSocketAddress(8080));
            //阻塞等待新的连接
            SocketChannel socket = serverChannel.accept();
            //由于通道,两端信息明确,可获取远端地址和本地地址
            System.out.println("客户端已连接,IP地址为:" + socket.getRemoteAddress());

            //使用缓冲区进行数据接收
            ByteBuffer buffer = ByteBuffer.allocate(128);
            socket.read(buffer); //SocketChannel同时实现了读写通道接口,可以直接双向操作
            buffer.flip();
            System.out.println("接收客户端数据:" + new String(buffer.array(), 0, buffer.remaining()));

            //向通道中写入数据
            socket.write(ByteBuffer.wrap("已收到".getBytes()));

            //关闭
            socket.close();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

对于传统的Socket进行网络通信,如果有多个客户端,服务器需要同时处理,则需要为每个客户端单独创建一个线程来进行处理。

在这里插入图片描述

随着客户端的增加,如果要保持持续通信,就不能摧毁这些线程(很多时候只是保持连接,阻塞等待读写操作,IO频率很低,白白占用一条线程)。但是线程不能无限制的进行创建,这样,可以利用JavaNIO提供的多路复用编程模型。

在这里插入图片描述

服务器不是单纯通过

accept()

方法来创建连接的机制,而是根据客户的状态,

Selector

进行不断轮询,只有客户端在对应的状态时(开始读写操作),才会创建线程或进行处理,而不是创建之后一直保持连接,即使没有任何读写操作。



选择器Selector

选择器是当具体有某一个状态(比如读、写、请求)已经就绪时,才会进行处理,而不是让程序主动的进行等待。


Selector

的实现方案,比如很多个用户连接到服务器:


常见的IO多路复用模型:


  • select

    :当这些连接出现具体的某个状态时,只知道已经就绪,但不知道是哪个连接已经就绪,每次调用都进行线性遍历所有连接,时间复杂度为

    O(n)

    ,并存在最大连接数限制

  • poll

    :同上,但底层采用链表,故没有最大连接数限制

  • epoll

    :采用事件通知方式,当某个连接就绪,能直接进行精准通知,时间复杂度

    O(1)

    ,Java在Linux环境下采用这种模式实现

    • epoll是根据每个fd上的

      callback

      函数实现的,只要就绪就会直接回调

      callback

      函数,实现精准通知,只有Linux支持这种方式


如何让网络实现多路复用?

需要监听的事件:


  • SelectionKey.OP_CONNECT

    :连接就绪,表示客户端与服务器的连接已经建立成功

  • SelectionKey.OP_ACCEPT

    :接收连接事件,表示服务器监听到了客户连接,可以接收此连接

  • SelectionKey.OP_READ

    :读就绪事件,表示通道中已经有了可读的数据,可以执行读操作了

  • SelectionKey.OP_WRITE

    :写就绪事件,表示已经可以向通道写数据了


Server端

public static void main(String[] args) {
    try(ServerSocketChannel serverChannel = ServerSocketChannel.open();
        Selector selector = Selector.open()) { //开启一个新的selector

        serverChannel.bind(new InetSocketAddress(8080));

        //要使用选择器进行操作,必须使用非阻塞方式,阻塞方式会直接卡在accept()
        serverChannel.configureBlocking(false);

        //将选择器注册到serverSocketChannel中,后边是选择需要监听的事件,只有发生对应事件时才会进行选择,多个事件用 | 连接
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        //无限循环等待新的用户网络操作
        while (true) {
            //每次先择都可能会选出多个已经就绪的网络操作,没有操作时回暂时阻塞
            int count = selector.select();
            System.out.println("监听到" + count + "个事件");
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                //根据不同的事件类型,执行不同的操作
                if (key.isAcceptable()) { //当前serverSocketChannel已经做好准备处理Accept
                    SocketChannel channel = serverChannel.accept();
                    System.out.println("客户端已经建立,IP为: " + channel.getRemoteAddress());
                    //连接建立后,需要将连接也注册选择器. 当这个连接有内容读时进行处理
                    channel.configureBlocking(false);
                    channel.register(selector, SelectionKey.OP_READ);
                    //在建立时完成了注册
                } else if (key.isReadable()) { //当前连接有可读的数据并可以写,就开始处理
                    SocketChannel channel = (SocketChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(128);
                    channel.read(buffer);
                    buffer.flip();
                    System.out.println("接收到客户端的数据:" + new String(buffer.array(), 0, buffer.remaining()));

                    //直接向通道中写入数据
                    channel.write(ByteBuffer.wrap("已收到".getBytes()));

                }

                //处理完成后,移出迭代器,不然下次还有
                iterator.remove();
            }
        }

    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}


Client

public static void main(String[] args) {
    //创建一个新的SocketChannel,通过通道进行通信
    try(SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", 8080))) {
        Scanner scanner = new Scanner(System.in);
        System.out.println("已连接到服务器端");
        while (true) { //一直发消息
            String text = scanner.nextLine();
            //直接向通道写数据
            channel.write(ByteBuffer.wrap(text.getBytes()));
            System.out.println("已发送");

            //接收服务器消息
            ByteBuffer buffer = ByteBuffer.allocate(128);
            channel.read(buffer);
            buffer.flip();
            System.out.println("收到服务器返回:" + new String(buffer.array(), 0, buffer.remaining()));
        }
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

使用多路复用,可以只用一条线程,就能同时处理多个请求

在这里插入图片描述

在这里插入图片描述



实现Reactor模式

Reactor模式可以对服务端进行优化。

首先抽象出两个组件,

Reactor

线程和

Handler

处理器


  • Reacotr

    线程:负责响应IO事件,并分发到

    Handler

    处理器。新的事件包含连接就绪、读就绪、写就绪等

  • Handler

    处理器:执行非阻塞操作

上面的例子就是单线程

Reactor

的朴素模型(面向过程的写法)

在这里插入图片描述

标准的写法:

客户端连接到

Reactor

,并通过

Selector

走到

Acceptor

或是

Handler



Acceptor

住院号负责客户端连接的建立,

Handler

负责读写操作

 * @description 具体实现操作
 */
public class Handler implements Runnable{

    private final SocketChannel channel;

    public Handler(SocketChannel channel) {
        this.channel = channel;
    }

    @Override
    public void run() {
        try {
            ByteBuffer buffer = ByteBuffer.allocate(128);
            channel.read(buffer);
            buffer.flip();
            System.out.println("接收到客户端数据:" + new String(buffer.array(), 0, buffer.remaining()));
            channel.write(ByteBuffer.wrap("已收到".getBytes()));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
 * @description 用于处理接收请求,连接操作
 */
public class Acceptor implements Runnable{

    private final ServerSocketChannel serverChannel;

    private final Selector selector;

    public Acceptor(ServerSocketChannel serverChannel, Selector selector) {
        this.serverChannel = serverChannel;
        this.selector = selector;
    }

    @Override
    public void run() {
        try {
            SocketChannel channel = serverChannel.accept();
            System.out.println("客户端已连接,IP地址为:" + channel.getRemoteAddress());
            channel.configureBlocking(false);
            //注册时,创建好对应的Handler,这样在Reactor中分发的时候就可以直接调用Handler
            channel.register(selector, SelectionKey.OP_READ, new Handler(channel));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

在注册时,放进去了一个附加对象

new Handler(channel)

,这个附加对象会在选择器选择到此通道上时,通过

attachment()

方法进行获取,可以简化代码

public class Reactor implements Closeable, Runnable {

    private final ServerSocketChannel serverChannel;
    private final Selector selector;

    public Reactor() throws IOException {
        serverChannel = ServerSocketChannel.open();
        selector = Selector.open();
    }
    
    @Override
    public void run() {
        try {
            serverChannel.bind(new InetSocketAddress(8080));
            serverChannel.configureBlocking(false);

            //注册时,将Acceptor作为附加对象存放,当选择器选择后也可获取到
            serverChannel.register(selector, SelectionKey.OP_ACCEPT,
                    new Acceptor(serverChannel, selector));
            while (true) {
                int count = selector.select();
                System.out.println("监听到" + count + "个事件");
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    //通过dispatch()进行分发
                    this.dispatch(iterator.next());
                    iterator.remove(); //分发后移出
                }
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 通过此方法进行分发
     * @param key
     */
    private void dispatch(SelectionKey key) {
        //获取attachment传入的对象,ServerSocketChannel和对应的客户端Channel都添加了
        Object att = key.attachment();
        if (att instanceof Runnable) {
            ((Runnable) att).run();
        } //这样就实现了对应的时候调用对应的Handler或Acceptor
    }

    @Override
    public void close() throws IOException {
        serverChannel.close();
        selector.close();
    }
}

启动服务器

public class Server {
    public static void main(String[] args) {
        //创建Reactor对象,启动
        try(Reactor reactor = new Reactor()) {
            reactor.run();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

在这里插入图片描述

在这里插入图片描述

这样实现了单线程

Reactor

模式。但是单线程无法应对大量的请求。


多线程

Reactor

模式:

创建了多个线程处理,可以将数据读取完成之后的操作交给线程池来执行。

在这里插入图片描述

只需要修改

Handler

即可。

在数据读出后,将数据处理交给线程池执行

public class Handler implements Runnable{
    
    //使用线程池
    private static final ExecutorService POOL = Executors.newFixedThreadPool(10);

    private final SocketChannel channel;

    public Handler(SocketChannel channel) {
        this.channel = channel;
    }

    @Override
    public void run() {
        try {
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            channel.read(buffer);
            buffer.flip();
            POOL.submit(() -> {
                try {
                    System.out.println("接收到客户端数据:" + new String(buffer.array(), 0, buffer.remaining()));
                    channel.write(ByteBuffer.wrap("已收到".getBytes()));
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });

        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

这样会导致一个

Reactor

同时处理来自客户端的所有请求操作,有些乏力。



Reactor

做成一主多从的模式,让主

Reactor

只负责

Accept

操作,而其他的

Reactor

进行各自的其他操作。

在这里插入图片描述

使用轮询机制

public class SubReactor implements Closeable, Runnable {

    //每个从Reactor也有一个Selector
    private final Selector selector;

    //创建一个4线程的线程池,即4个从Reactor工作
    public static final ExecutorService Pool = Executors.newFixedThreadPool(4);
    public static final SubReactor[] reactors = new SubReactor[4];
    //采用轮询机制,每接收一个新的连接,就轮询分配给四个SubReactor
    public static int selectedIndex = 0;

    //一开始就让4个从Reactor跑起来
    static {
        for (int i = 0; i < 4; i++) {
            try {
                reactors[i] = new SubReactor();
                Pool.submit(reactors[i]);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    //轮询获取下一个Selector(Acceptor用)
    public static Selector nextSelector() {
	    //从4个SubReactor中拿出一个,获得里面的selector
        Selector selector = reactors[selectedIndex].selector;
        selectedIndex = (selectedIndex + 1) % 4;
        return selector;
    }

    private SubReactor() throws IOException {
        selector = Selector.open();
    }

    @Override
    public void run() {
        //启动后直接等待selector监听到对应的事件即可,其他操作逻辑和Reactor一直
        try {
            while (true) {
                int count = selector.select();
                System.out.println(Thread.currentThread().getName() + ">>监听到" + count + "个事件");
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    this.dispatch(iterator.next());
                    iterator.remove();
                }
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
	//执行对应的Handler
    private void dispatch(SelectionKey key) {
        Object att = key.attachment();
        if (att instanceof Runnable) {
            ((Runnable) att).run();
        }
    }

    @Override
    public void close() throws IOException {
        selector.close();
    }
    
}

接着修改Acceptor,现在的Acceptor只执行accept

public class Acceptor implements Runnable{

    private final ServerSocketChannel serverChannel;

    public Acceptor(ServerSocketChannel serverChannel) {
        this.serverChannel = serverChannel;
    }

    @Override
    public void run() {
        try {
            //通过Accept操作得到SocketChannel
            SocketChannel channel = serverChannel.accept();
            System.out.println("客户端已连接,IP地址为:" + channel.getRemoteAddress());
            channel.configureBlocking(false);
            //选取下一个Reactor的Selector
            Selector selector = SubReactor.nextSelector();
            //注册之前唤醒一次防止卡死
            selector.wakeup();
            //注册从Reactor的Selector
            channel.register(selector, SelectionKey.OP_READ, new Handler(channel));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

之后,SocketChannel相关的操作就由

SubReactor

进行处理了,而不是一律交给

Reactor

进行操作



版权声明:本文为Wligt原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。