Netty源码分析(五) Netty检测新连接,并注册到workgroup的nioeventLoop线程,最后更新selector状态

  • Post author:
  • Post category:其他

新连接处理逻辑

  1. 检测新连接
  2. 创建NioSocketChannel
  3. 分配线程和注册Selector
  4. 向Selector注册读事件

检测新连接

  1. bossGroup线程不断轮询,一有连接接入会调用processSelectedKey 方法
  2. 获取通道上就绪的事件,关心连接事件连接
  3. 并把缓冲区的写事件全部释放出去
class NioEventLoop {
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                //拿到channel注册的eventloop
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                // If the channel implementation throws an exception because there is no event loop, we ignore this
                // because we are only trying to determine if ch is registered to this event loop and thus has authority
                // to close ch.
                //如果没有拿到eventloop,我们就忽视掉,我们只试图确定通道是否注册到eventloop和eventloop是否有权限去关掉通道
                return;
            }
            if (eventLoop == this) {
                unsafe.close(unsafe.voidPromise());
            }
            return;
        }

        try {
            //获取通道上就绪的事件
            int readyOps = k.readyOps();
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                //获取关心的事件
                int ops = k.interestOps();
                //对应位删除connectkey
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
                //完成新连接的接入
                unsafe.finishConnect();
            }

            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            //先处理写事件,要去写一些缓冲区或者释放内存(之前缓存的未写事件)
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // 强制清除缓冲区
                ch.unsafe().forceFlush();
            }
            //处理读事件
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

    protected int doReadMessages(List<Object> buf) throws Exception {
        //真正连接到channel
        SocketChannel ch = SocketUtils.accept(javaChannel());

        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }

        return 0;
    }
}

分配线程和注册selector

  1. 调用 pipeline.fireChannelRead(readBuf.get(i)); 分配进nioeventloop线程
  2. 使用chooser.next 寻找下一个eventloop并注册进行
class Test {
    @Override
    public final ChannelPipeline fireChannelRead(Object msg) {
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        final Channel child = (Channel) msg;

        //添加childhandler
        child.pipeline().addLast(childHandler);
        //设置通道参数
        setChannelOptions(child, childOptions, logger);
        //设置Attributes,客户端channel绑定一些属性
        setAttributes(child, childAttrs);

        try {
            //选择eventloop并注册selector
            childGroup.register(child).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        forceClose(child, future.cause());
                    }
                }
            });
        } catch (Throwable t) {
            forceClose(child, t);
        }
    }

    @Override
    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }

    @Override
    public ChannelFuture register(ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        //底层unsafe的注册
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
}

NioSocketChannel读事件的注册

class Test {
    private void register0(ChannelPromise promise) {
        try {
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }
            boolean firstRegistration = neverRegistered;
            //调用底层注册
            doRegister();
            neverRegistered = false;
            registered = true;
            pipeline.invokeHandlerAddedIfNeeded();

            safeSetSuccess(promise);
            pipeline.fireChannelRegistered();
            // Only fire a channelActive if the channel has never been registered. This prevents firing
            // multiple channel actives if the channel is deregistered and re-registered.
            if (isActive()) {
                //如果是第一次注册
                if (firstRegistration) {
                    //调用所有的handleractive
                    pipeline.fireChannelActive();
                } else if (config().isAutoRead()) {
                    // This channel was registered before and autoRead() is set. This means we need to begin read
                    // again so that we process inbound data.
                    //
                    // See https://github.com/netty/netty/issues/4805
                    beginRead();
                }
            }
        } catch (Throwable t) {
            // Close the channel directly to avoid FD leak.
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }

    private void readIfIsAutoRead() {
        if (channel.config().isAutoRead()) {
            channel.read();
        }
    }
    @Override
    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;

        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            //绑定注册读事件感兴趣
            //从此这个通道可以传数据
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }
}

版权声明:本文为weixin_43869261原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。