新连接处理逻辑
- 检测新连接
- 创建NioSocketChannel
- 分配线程和注册Selector
- 向Selector注册读事件
检测新连接
- bossGroup线程不断轮询,一有连接接入会调用processSelectedKey 方法
- 获取通道上就绪的事件,关心连接事件连接
- 并把缓冲区的写事件全部释放出去
class NioEventLoop {
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
//拿到channel注册的eventloop
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
//如果没有拿到eventloop,我们就忽视掉,我们只试图确定通道是否注册到eventloop和eventloop是否有权限去关掉通道
return;
}
if (eventLoop == this) {
unsafe.close(unsafe.voidPromise());
}
return;
}
try {
//获取通道上就绪的事件
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
//获取关心的事件
int ops = k.interestOps();
//对应位删除connectkey
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
//完成新连接的接入
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
//先处理写事件,要去写一些缓冲区或者释放内存(之前缓存的未写事件)
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// 强制清除缓冲区
ch.unsafe().forceFlush();
}
//处理读事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
protected int doReadMessages(List<Object> buf) throws Exception {
//真正连接到channel
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
}
分配线程和注册selector
- 调用 pipeline.fireChannelRead(readBuf.get(i)); 分配进nioeventloop线程
- 使用chooser.next 寻找下一个eventloop并注册进行
class Test {
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
//添加childhandler
child.pipeline().addLast(childHandler);
//设置通道参数
setChannelOptions(child, childOptions, logger);
//设置Attributes,客户端channel绑定一些属性
setAttributes(child, childAttrs);
try {
//选择eventloop并注册selector
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
//底层unsafe的注册
promise.channel().unsafe().register(this, promise);
return promise;
}
}
NioSocketChannel读事件的注册
class Test {
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
//调用底层注册
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
//如果是第一次注册
if (firstRegistration) {
//调用所有的handleractive
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
//绑定注册读事件感兴趣
//从此这个通道可以传数据
selectionKey.interestOps(interestOps | readInterestOp);
}
}
}
版权声明:本文为weixin_43869261原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。