概述
注: NIO2(AIO) 即异步IO
NIO2 简介
AIO和NIO2其实是一回事,如同孙行者、者行孙其实都是孙猴子,只是名称不同本质都一样
那么如何理解这个概念呢?举个例子
假设一个妹子有许多的舔狗(SpareTire),如果妹子想要完成某件事最简单、高效的方法是什么?
答案是,舔狗那么多,交给他们去办就ok了。那么狗子办事期间,妹子会一直等待狗子把事情做好吗?不行,这期间当然可以继续将其他任务派发给其他狗子。当狗子办事期间,如果有需要妹子处理的事情,通知处理一下即可。
当然狗子一般都是处理一些重活累活,比如数据拷贝、I/O啊,接收新连接啥的(太惨了)。妹子则专注于核心业务的处理。
在这个例子中,妹子相当于核心业务线程,主要用来处理业务逻辑,而狗子们则是(内核+I/O线程)的抽象。
P.S.
- 如果你了解NIO2,建议你直接阅读NIO2模型解读章节,不需要再阅读NIO2 DEMO章节(时间宝贵)
- 你可以直接越过所有章节去看总结,也可以简单阅读附录直接上手调试代码
NIO2 DEMO
NIO2中有个核心点,就是内核负责主要负责通知程序有什么事件,而连接的接收以及数据的拷贝还是需要程序提供线程来做这些事情,你可以理解为妹子(核心业务线程)需要提供舔狗池(线程池)给内核来做这些事情
talk is cheap, show me your hair
如果你想要学习一下NIO2,可以点击 https://github.com/anxpp/Java-IO/tree/master/src/com/anxpp/io/calculator/aio
该源码的注释为GBK编码,如果你看到注释为乱码,最好将其改为GBK编码
这是一个Demo,值得注意的是虽然该例子中并没有显式的创建线程池,这是因为如果你在open()服务端的时候,如果没有显示指定,系统将会默认分配给ServerSocketChannel一个线程池,用于事件的处理,我们可以打开JConsole验证一下.
channel = AsynchronousServerSocketChannel.open();
如图所示thread-0到thread-4就是系统默认分配的线程池,用来处理I/O事件。(天赐舔狗)
想象一下,如果我们在处理I/O事件的时候将所有线程都阻塞住了,那么整个系统的I/O都将陷入阻塞, 如下图所示。
在有新的I/O事件到来的时候,内核会选择一个线程来处理这些I/O事件,如果处理I/O的线程陷入阻塞,那么来自客户端的请求将会一直被阻塞住,无法返回。
因此处理I/O事件的线程最好
只处理I/O事件
(接收新连接、将数据从内核拷贝到线程中)
你可以理解为,舔狗最好只做舔狗该做的事情,即重活累活,至于核心业务或者会发生阻塞的情况的事件最好提交给妹子(
业务逻辑处理线程池
)来处理。
Tomcat NIO2 模型
关键类 org.apache.tomcat.util.net.Nio2Endpoint
既然是讲解NIO2的处理模型,那么我们有必要了解以下关键角色
-
Nio2Acceptor
Acceptor并不与特定的线程绑定,而是当由新连接到来从线程池中选择一个线程来执行Acceptor的代码,这一个过程是由底层帮我们完成的,Acceptor的主要任务是接收新连接,并为该连接注册读写的处理对象 -
LimitLatch
限制连接数,在异步I/O情况限制连接数的主要方式就是锁阻塞用于I/O事件的线程池中的线程 -
I/O处理器
处理I/O的类,与Nio2Acceptor运行在同一个线程池中
ServerSocket的启动
异步ServerSocket启动的流程较为枯燥,如果你不想看代码,以下为其启动的流程
- 创建线程池,将其包装为AsynchronousChannelGroup
- 打开ServerSocket
- 绑定端口, 并设置最大连接数
@Override public void bind() throws Exception { // 创建线程池 if (getExecutor() == null) { createExecutor(); } if (getExecutor() instanceof ExecutorService) { //创建用于I/O的线程池(需要用AsynchronousChannelGroup包装,才能提供给AsynchronousServerSocketChanne用) threadGroup = AsynchronousChannelGroup.withThreadPool((ExecutorService) getExecutor()); } // AsynchronousChannelGroup needs exclusive access to its executor service if (!internalExecutor) { log.warn(sm.getString("endpoint.nio2.exclusiveExecutor")); } //创建ServerSocketChannel serverSock = AsynchronousServerSocketChannel.open(threadGroup); socketProperties.setProperties(serverSock); InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset()); //绑定端口并设置backlog的参数 //backlog可以理解为当前最大待执行accept操作的连接数 serverSock.bind(addr, getAcceptCount()); // Initialize SSL if needed initialiseSsl(); }
如下图,就是和当前异步ServerSocketChannel绑定的线程池,801表示该连接器所监听的端口(附录中有开启NIO2的教程)
Nio2Acceptor
Nio2Acceptor主要功能接收新连接,并限制最大连接数,因为采用的是异步I/O,所以Acceptor并不会于特定的线程绑定,而是当新任务需要执行的时候,从线程池中选一个执行任务。如下图所示当有
客户端新连接到达时,程序会从线程池选择一个线程来执行Nio2Acceptor的completed方法并传入客户端Socket开始执行新连接处理的业务逻辑
AcceptHandler的注册
在异步I/O中我们需要向ServerSocketChannel注册处理Accept事件的处理器以便完成连接事件的处理 如以下代码所示,当tomcat启动的时候,会开启一个线程调用Nio2SocketAcceptor的run方法,将Nio2SocketAcceptor注册为ServerSocketChannel的accept事件处理器
protected class Nio2Acceptor extends Acceptor implements CompletionHandler { ... @Override public void run() { // The initial accept will be called in a separate utility thread if (!isPaused()) { // 连接数限制,如果达到最大连接数,则调用此方法的线程会陷入等待 try { countUpOrAwaitConnection(); } catch (InterruptedException e) { // Ignore } if (!isPaused()) { //将自己注册为accept事件的处理器(注意此类实现的接口) serverSock.accept(null, this); } else { state = AcceptorState.PAUSED; } } else { state = AcceptorState.PAUSED; } } ...}
新连接的处理
当有新连接到达时,底层会从线程池选择一个线程来执行completed方法并传入客户端socket,此时该方法主要的流程如下
- 检查是容器否仍在运行如果仍在运行则继续流程
- 检查是否需要限制连接数,如果需要限制连接数,则从线程池中选择一个线程来执行Acceptor的run方法(此方法可能会发生阻塞)
- 以上操作均已完成则调用setSocketOptions方法执行后续I/O事件处理,至此新连接的接收完成
@Override public void completed(AsynchronousSocketChannel socket, Void attachment) { // Successful accept, reset the error delay errorDelay = 0; // Continue processing the socket on the current thread // Configure the socket if (isRunning() && !isPaused()) { //检查限制的最大连接数,如果没有设置(即-1)则不进行连接数限制 if (getMaxConnections() == -1) { serverSock.accept(null, this); } else { //由于有新连接的到达,因此需要从线程池选一个线程执行增加连接数的操作,此操作可能会发生阻塞 getExecutor().execute(this); } //执行后续的I/O事件处理 if (!setSocketOptions(socket)) { closeSocket(socket); } } else { if (isRunning()) { state = AcceptorState.PAUSED; } destroySocket(socket); } }
限制最大连接数的实现
由于Acceptor并不与特定的线程绑定, 因此如果需要限制最大连接数,需要使用锁将空闲的线程阻塞住,这也时为什么需要在accept新连接的时候需要向线程池提交增加新连接数的任务,如下所示(也就是调用Nio2SocketAcceptor的run方法)
public void completed(AsynchronousSocketChannel socket, Void attachment) { ... getExecutor().execute(this); ...}
该参数主要用于设置当前ServerSocket所允许的最大未accept的连接数,也就是说如果超过了未accept得连接数backlog所设置的值,那么新来的连接都将会被丢弃掉。
I/O 事件的处理
既然是异步I/O,那么必然要在客户端Socket注册读写的CompletionHandler, 因此setSocketOptions必然会导致这一步骤的发生,那么此步骤发生在什么时候呢?
经过Debug跟踪发现setSocketOptions将会导致Nio2SocketWrapper的创建,而实际I/O流程就发生在新建Nio2SocketWrapper对象时所创建的readCompletionHandler中, 以下是其代码
ReadCompletionHandler
用于监听读事件,在读取到数据之后会调用processSocket方法开始数据的解析工作
public Nio2SocketWrapper(Nio2Channel channel, final Nio2Endpoint endpoint) { super(channel, endpoint); nioChannels = endpoint.getNioChannels(); socketBufferHandler = channel.getBufHandler(); this.readCompletionHandler = new CompletionHandler() { @Override public void completed(Integer nBytes, ByteBuffer attachment) { if (log.isDebugEnabled()) { log.debug("Socket: [" + Nio2SocketWrapper.this + "], Interest: [" + readInterest + "]"); } readNotify = false; //加锁,其他线程可能会对标志位进行修改 synchronized (readCompletionHandler) { //nBytes表示读取到的字节数,如果小于0 //抛出EOF异常,没数据读,那咋办吗,只好抛异常了 if (nBytes.intValue() < 0) { failed(new EOFException(), attachment); } else { if (readInterest && !Nio2Endpoint.isInline()) { readNotify = true; } else { // Release here since there will be no // notify/dispatch to do the release. readPending.release(); } readInterest = false; } } if (readNotify) { //处理读事件 getEndpoint().processSocket(Nio2SocketWrapper.this, SocketEvent.OPEN_READ, false); } } //省略代码,后面太长了 ...
debug验证一下,如下图所示,attachment即我们所读到的数据
注意 在debug的时候IDEA可能发出切换线程的请求(读数据和之前的操作是不在一个线程上的,如下所示
总结
总结如下
- accept事件和 I/O事件共用一个线程池,不会和特定线程绑定
- Acceptor(Nio2Acceptor) 用于接收新连接,并注册I/O事件的处理对象
- LimitLatch通过阻塞住线程池中的线程来实现连接数限制功能
- I/O Handler 即在Nio2SocketWrapper注册的读写处理器,有I/O事件到达时,程序会选择一个线程来执行这些处理器的代码
-
总体流程如下
新连接到达->选择一个线程执行Nio2Acceptor的代码->向线程池中提交增加连接数的任务->注册读写处理事件->I/O事件到达,选择一个线程处理I/O事件
思想迁移
不要使用默认线程池
在异步ServerSocketChannel创建的时候,tomcat会自己创建一个线程池,而不是使用默认提供的线程池,由于线程池在我们掌握之中,由此才实现了连接数限制的功能
不要阻塞I/O线程
I/O线程就要有I/O线程的亚子,不要在I/O线程执行会发生长时间阻塞的操作
附录 如何调试tomcat
后端程序猿都晓得,SpringBoot中内嵌了tomcat(当然还有jetty,取决于你如何选择), 因此我们可以新建一个SpringBoot应用来专门调试学习Tomcat的源码。
以下为tomcat调试的过程
-
第一步
, 打开IDEA -
第二步
, 新建SpringBoot工程 -
第三步
, 在项目侧边栏Ctrl+F 查找Tomcat的jar包
-
第四步
, /mute all,带上耳机,打上断点
package org.apache.tomcat.util.net;public class NioEndpoint extends AbstractJsseEndpoint {...public class Poller implements Runnable { public void run() { //此方法的代码位于692行 }}...}
如果你要测试Tomcat的
NIO2
的处理方式,则需要以下配置 将以下代码添加到你的代码中。(由于SpringBoot中内嵌的tomcat默认I/O方式为NIO所以我们需要通过配置增加NIO2的连接器)
import org.apache.catalina.connector.Connector;import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class ConnectorConf { //注意你的SpringBoot版本,此项目的版本是2.2.0,旧的版本1.5使用不同的类进行配置 @Bean public TomcatServletWebServerFactory servletContainer() { TomcatServletWebServerFactory tomcatServletWebServerFactory = new TomcatServletWebServerFactory(); tomcatServletWebServerFactory.addAdditionalTomcatConnectors(getConnector()); return tomcatServletWebServerFactory; } private Connector getConnector() { // 关键点哦 Connector connector = new Connector("org.apache.coyote.http11.Http11Nio2Protocol"); //将连接器的端口设置为801,这样访问801端口的就是NIO2的模式了 connector.setPort(801); return connector; }}
去org.apache.tomcat.util.net.Nio2Endpoint打上断点就完事了
如何调试多线程
在多线程的情况下,可能会出现进入不了断点的情况,此时只需在断点上右键选择Thread即可, 当其他线程到达断点时IDEA法发出通知,如下图所示
转载自https://juejin.im/post/5db147436fb9a02032779f33#heading-10