netty 源码分析(一)- 启动

  • Post author:
  • Post category:其他




0 预备工作

前往github下载源码,版本为4.1,地址:

https://github.com/netty/netty



1 官方的启动example代码

服务端的启动代码为EchoServer,具体路径为io.netty.example.echo.EchoServer。

本文分析的也就是这个类,具体代码如下:

public final class EchoServer {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // Configure SSL. 不是本文重点,不进行源码分析
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }

        // Configure the server.
        // 1. 创建bossGroup和workerGroup时入参为何不同
        // 2. 创建NioEventLoopGroup的具体源码实现
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {
            // 3. 创建ServerBootstrap
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc()));
                     }
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(serverHandler);
                 }
             });

            // 4. 启动服务器,本文重点
            ChannelFuture f = b.bind(PORT).sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

上文代码的注释中列出了启动服务器的几个关键点:

  1. 创建bossGroup和workerGroup时入参为何不同
  2. 创建NioEventLoopGroup的具体源码实现
  3. 创建ServerBootstrap
  4. 启动服务器,本文重点

接下来对这几个关键点分别进行分析。



2 创建bossGroup和workerGroup时入参为何不同

关于bossGroup和workerGroup,可以看这张图,不了解的需要先去了解netty的线程模型,可见:

https://blog.csdn.net/qq_35751014/article/details/104443715


在这里插入图片描述

bossGroup和workerGroup是netty的两个核心对象,基于了单reactor多线程这一线程模型。bossGroup用于接收tcp请求,它会将请求交给workerGroup。workerGroup会获取到真正的链接,然后和链接进行通信。

既然是基于单reactor多线程模型,那么一般情况下,bossGroup只有一个线程,而workerGroup是由多线程组成。

因此EventLoopGroup bossGroup = new NioEventLoopGroup(1); bossGroup由一个线程组成。

而EventLoopGroup workerGroup = new NioEventLoopGroup(); 默认含有cpu*2的线程数(I/O密集型常用线程数)。

//默认线程数 = cpu*2
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
        "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));



3 创建NioEventLoopGroup的具体源码实现

进入构造方法,一直继续this(…)、super(…)的跟进,最后会来到真正的实现部分:

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        checkPositive(nThreads, "nThreads");

        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
            	//1. 初始化EventLoop数组children
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                   // 关闭代码略
                   ...
                }
            }
        }

		//2. 生成选择器chooser
        chooser = chooserFactory.newChooser(children);

		//3. 创建关闭时的监听器
        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }
		//4. 生成可读集合childrenSet
        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }

创建NioEventLoopGroup主要有四步:

  1. 初始化EventLoop数组children
  2. 生成选择器chooser
  3. 创建关闭时的监听器
  4. 生成可读集合childrenSet

下面只对1和2两点进行展开详解



3.1 初始化EventLoop数组children

这里调用了NioEventLoop的构造方法,不详细进行展开了:

    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
    }



3.2 生成选择器chooser

选择器有两种:PowerOfTwoEventExecutorChooser和GenericEventExecutorChooser

	/**
	* 选择器使用的是round-robin(轮询)算法。
	* 另外,当长度为2的n次方时,使用&进行位运算的效率会比直接%的效率要高。
	* 这是netty对性能的压榨。
	*/
    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }
PowerOfTwoEventExecutorChooserpublic EventExecutor next() {
      return executors[idx.getAndIncrement() & executors.length - 1];
  }
 
GenericEventExecutorChooserpublic EventExecutor next() {
       return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
   }



4 创建ServerBootstrap

		ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG, 100)
        .handler(new LoggingHandler(LogLevel.INFO))
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
               ChannelPipeline p = ch.pipeline();
               if (sslCtx != null) {
                    p.addLast(sslCtx.newHandler(ch.alloc()));
               }
               p.addLast(new EchoServerHandler());
            }
        });

这一块就不详细展开了,每个属性的具体含义,会用netty的也都了解。



5 启动服务器,本文重点

     // 4. 启动服务器,本文重点
     ChannelFuture f = b.bind(PORT).sync();

bind()方法中的内容决定了我们的服务器是如何启动起来的。而sync()方法只是同步等待创建结果的而已。因此应该着重关注bind()方法:

    private ChannelFuture doBind(final SocketAddress localAddress) {
    	//1. 初始化一个channel并将其注册
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            //2. doBind0()
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
			...
        } 
    }   



5.1 初始化一个channel并将其注册

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
        	//1. 生产channel
            channel = channelFactory.newChannel();
            //2. 初始化channel
            init(channel);
        } catch (Throwable t) {
        	异常场景,代码略
            ...
        }
		//3. 注册channel
        ChannelFuture regFuture = config().group().register(channel);
        
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        return regFuture;
    }



5.1.1 channel = channelFactory.newChannel();

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        ObjectUtil.checkNotNull(clazz, "clazz");
        try {
            this.constructor = clazz.getConstructor();
        } catch (NoSuchMethodException e) {
            throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
                    " does not have a public non-arg constructor", e);
        }
    }

    @Override
    public T newChannel() {
        try {
            return constructor.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
        }
    }
}

跟进方法内部发现是通过工厂模式 反射创建clazz对象实例,clazz实际上是在我们第二步中channel方法进行指定的。newChannel()方法的作用就是就是创建出我们的NioServerSocketChannel实例对象。

在这里插入图片描述

在这里插入图片描述



5.1.2 init(channel);

@Override
    void init(Channel channel) {
    	//1. 配置一些相关属性
        setChannelOptions(channel, newOptionsArray(), logger);
        setAttributes(channel, newAttributesArray());

        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);

		//2. 帮助bossGroup自动生成ChannelInitializer,这个重要
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                // config.handler() 就是 配置时设置的handler。
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
				//3. ch.eventLoop().execute 异步启动线程。
				//4. 用于添加ServerBootstrapAcceptor到pipeline中
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                    	// 4. 添加使用的是pipeline.addLast()方法
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

这个方法前大半部分的操作都是将我们给ServerBootStrap()注册的那些属性(options、attrs等等)赋值给我们的ServerSocketChannel,后半部分的代码很重要。

对于bossGroup,我们无需自定义ChannelInitializer。因为netty会帮助bossGroup自动生成ChannelInitializer,这个ChannelInitializer包含了我们在

.handler(new LoggingHandler(LogLevel.INFO))

时自定义的Handler。同时,还会另起一个线程来添加Handler:ServerBootstrapAcceptor。


ServerBootstrapAcceptor

的功能有两点:

  1. 帮助bossGroup监听accpect事件,获取对应的socketChannel
  2. 将socketChannel注册到workerGroup上。

接下来分别对第2、4两点进行讲解。而第3点ch.eventLoop().execute()涉及netty的任务执行机制,会在第二篇《netty 源码分析(二)- 任务执行机制 及 epoll bug解决方案》单独进行讲解。



5.1.2.1 pipeline.addLast()

pipeline.addLast() 并不是大家以为的加到了tail的后面,而是加在了tail的前面。

在这里插入图片描述

    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }



5.1.3 register(channel);

//3. 注册channel
ChannelFuture regFuture = config().group().register(channel);

对于register,我是带着两个疑问来分析的:

  1. config().group()指的是workGoup还是bossGroup?
  2. 为什么channel是注册到group上?



5.1.3.1 config().group()指的是workGoup还是bossGroup?

通过debug,我们先看一下bossGroup和workerGroup的id。

        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

在这里插入图片描述

接着往下debug,可以看到,在启动时,config().group()为NioEventLoopGroup@1254,也就是说,config().group()指的是bossGroup。
在这里插入图片描述



5.1.3.2 channel究竟注册到了哪儿?(重点)

我们沿着config().group().register(channel)往下调试:

在这里插入图片描述

可以看到,channel不是注册在group上,而是注册在next()返回的对象上。通过下面的代码可以看到:next()实际上返回的是选择器chooser所指向的EventLoop。

在这里插入图片描述

在这里插入图片描述

接着我们再进入register()方法:

    @Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        //取得
        promise.channel().unsafe().register(this, promise);
        return promise;
    }

顺着register,会走到AbstractChannel的register()方法:

        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
			...
			
            AbstractChannel.this.eventLoop = eventLoop;
			//这里为false
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
            	//调用eventLoop.execute()来异步执行register0()
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
					...
                }
            }
        }

关于eventLoop.inEventLoop()为什么会返回false,以及eventLoop.execute()方法,会在第二篇:

netty 源码分析(二)- 任务执行机制 及 epoll bug解决方案

中进行讲解。

顺着register0(),会走到接口方法doRegister(),我们对其中的两个实现epoll和nio进行介绍。

在这里插入图片描述

AbstractEpollChannel:

    @Override
    protected void doRegister() throws Exception {
        // Just in case the previous EventLoop was shutdown abruptly, or an event is still pending on the old EventLoop
        // make sure the epollInReadyRunnablePending variable is reset so we will be able to execute the Runnable on the
        // new EventLoop.
        epollInReadyRunnablePending = false;
        ((EpollEventLoop) eventLoop()).add(this);
    }

    void add(AbstractEpollChannel ch) throws IOException {
        assert inEventLoop();
        //1. 拿到socket的文件描述符
        int fd = ch.socket.intValue();
        //2. 使用epollCtl将fd添加到epoll中
        Native.epollCtlAdd(epollFd.intValue(), fd, ch.flags);
        //3. 保存fd和channel的映射关系
        AbstractEpollChannel old = channels.put(fd, ch);

        // We either expect to have no Channel in the map with the same FD or that the FD of the old Channel is already
        // closed.
        assert old == null || !old.isOpen();
    }

AbstractEpollChannel的注册channel主要分为三步:

  1. 拿到socket的文件描述符
  2. 使用epollCtl将fd添加到epoll中
  3. 保存fd和channel的映射关系

AbstractNioChannel:

    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
            ...
            }

    }

AbstractNioChannel的注册channel,是将channel注册到selector中,并返回selectionKey。



5.1.3.3 总结

总结一下:对于注册channel,

config().group().register(channel)

这行代码看上去像是注册到group上,

实际上底层是使用chooser.next()来选择一个eventLoop,然后将channel注册到eventLoop的selector中。

对于5.1小节的

config().group().register(channel);

因为config().group()返回的是bossGroup。

所以实际上是将一条新的channel注册到了bossGroup的eventLoop(一般只有一个)的selector中,用于监听客户端的请求。



5.2 doBind0()

在初始化并生成channel后,便可以将channel与端口进行绑定

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

实际上只需要关注一行代码:

channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);


这行代码做了几件事:

  1. 使用nio的channel的bind方法,将channel绑定到端口上。
  2. 注册关闭的监听事件

bind方法,通过debug发现,最后会进入NioServerSocketChannel的doBind方法

    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
        	//用的是ServerSocketChannel,也就是说,底层实际上是socket的bind方法。
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }

    @Override
    protected ServerSocketChannel javaChannel() {
    	//可以看到,用的channel是java.nio的ServerSocketChannel
        return (ServerSocketChannel) super.javaChannel();
    }

至此,启动部分就讲完了。

接下来第二篇会对netty的任务执行机制 及 epoll bug解决方案进行介绍。



版权声明:本文为bintoYu原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。