Netty学习笔记(二)— Bootstrapping

  • Post author:
  • Post category:其他


在上一篇文章的

Echo应用程序案例

中,我们可以发现服务端和客服端都有一段引导代码。什么是引导呢,顾名思义,它就像一个调度员,将本来独立的各个组件引导到合适的位置,从而使整个系统能有序协调的运行。

引导

Bootstrapping

是Netty对应用程序进行配置的过程,我们需要通过它来连接客户端和将服务端绑定到指定的端口上。它有两种类型:一种是用于服务端的

ServerBootstrop

,一种是用于客户端的

Bootstrap

。通过查看Netty的api,发现ServerBootstrop和Bootstrap的关系如下:

在这里插入图片描述

它们都有共同的父类:AbastractBootstrap,因此要弄清楚Netty的引导是如何工作的,就需要从AbastractBootstrap抽象类入手。



AbastractBootstrap

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
    volatile EventLoopGroup group;
    private volatile ChannelFactory<? extends C> channelFactory;
    private volatile SocketAddress localAddress;
    private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
    private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();
    private volatile ChannelHandler handler;
    ...
}

可以看出,AbastractBootstrap类提供了以上的6个属性,并为它们提供了一组设置和获取的方法,类似setter和getter方法,通过这些方法可设置或者获取这6个属性:

// setter
public B group(EventLoopGroup group)public B channelFactory(ChannelFactory<? extends C> channelFactory)public B localAddress(SocketAddress localAddress)public <T> B option(ChannelOption<T> option, T value)public <T> B attr(AttributeKey<T> key, T value)public B handler(ChannelHandler handler)// getter
public EventLoopGroup group()final ChannelFactory<? extends C> channelFactory()final SocketAddress localAddress()final Map<ChannelOption<?>, Object> options()final Map<AttributeKey<?>, Object> attrs()final ChannelHandler handler()

除channelFactory(…)外,其他设置方法均为直接调用,实际的应用程序代码中是通过channel(…)方法实现间接调用channelFactory(…)方法。

public B channel(Class<? extends C> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    return channelFactory(new BootstrapChannelFactory<C>(channelClass));
}

在channel(…)方法中,将对应的参数(如NioSocketChannel.class)传入ChannelFactory的实现类BootstrapChannelFactory的构造方法中,而该类的主要方法就是通过反射创建传入参数的实例对象,以方便后续程序使用。

private static final class BootstrapChannelFactory<T extends Channel> implements ChannelFactory<T> {
    private final Class<? extends T> clazz;

    BootstrapChannelFactory(Class<? extends T> clazz) {
        this.clazz = clazz;
    }
    @Override
    public T newChannel() {
        try {
            return clazz.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }
	...
}

在应用程序中,options和atrrs两个属性并不是必须的,下面的示例中就没有对二者进行设置,实际应用中可根据需要对其进行设置,这里对这两个参数做一个简单的了解。


options

主要是用于配置netty中一些相关的参数,这些参数的key已经在ChannelOption中以静态变量的方式设置好了,可参考api直接使用。如果ChannelOption设置了一个不存在的key,就会以日志的形式提示错误信息,但是不会抛出异常。这些选项可以配置底层连接的详细信息,如通道“keep-alive(保持活跃)”或“timeout(超时)”的特性,并会自动应用到引导创建的所有通道。如果在 Channel 已经被创建后再调用options()方法设置该值将不会有任何的效果。


attrs

可以将数据和通道以一个安全的方式关联,这些属性只是作用于客户端和服务器的通道,属于用户自定义的。例如,客户端请求web服务器应用程序,为了跟踪通道属于哪个用户,应用程序可以存储用的ID作为通道的一个属性。任何对象或数据都可以使用属性被关联到一个通道。同样的,这个属性在 Channel 被创建后设置将不会有任何的效果。

// 创建一个AttributeKey以标识该属性
final AttributeKey<Integer> id = new AttributeKey<Integer>("ID");
// 设置options
bootstrap.option(ChannelOption.SO_KEEPALIVE,true);
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
// 设置attrs
bootstrap.attr(id, 123456);

通过继承一个共同的父类,两种应用程序类型之间通用的引导步骤由AbstractBootstrap处理,而特定于客户端或者服务端的引导步骤分别由Bootstrap或ServerBootstrap处理。



Bootstrap

除了上面的共有方法,客户端的启动还可能需要如下两个方法:

// 设置远程连接地址,也可以通过 下面的connect() 方法来指定
public Bootstrap remoteAddress(SocketAddress remoteAddress)// 连接到远程节点并返回一个 ChannelFuture ,其将会在连接操作完成后接收到通知,包括其他重载方法
public ChannelFuture connect()

下面看看客户端的引导代码(部分):

EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.group(workerGroup);
    bootstrap.channel(NioSocketChannel.class);
    bootstrap.remoteAddress(host,port);
    bootstrap.handler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new EchoClientHandler());
        }
    });
    // 尝试连接远程地址
    ChannelFuture future = bootstrap.connect().sync();
    future.channel().closeFuture().sync();
}finally {
    workerGroup.shutdownGracefully().sync();
}

完成了参数的设置后,客户端就可以通过调用Bootstrap#connect()方法连接远程地址,基本过程描述如下:

  • 初始化创建一个NioEventLoopGroup对象,用于处理 I/O 读写事件和业务逻辑;

  • 通过Bootstrap配置 EventLoopGroup、Channel 类型,连接参数、handler等内容;

  • 连接远程地址,和服务端进行交互。

前两个步骤比较好理解,就是对应用程序的设置,第三个步骤虽然只有一行代码,但其中涉及了关键的连接过程,下面通过源码对其具体的实现进行简单的分析。

首先,查看bootstrap的connect()方法:

public ChannelFuture connect() {
    validate();
    SocketAddress remoteAddress = this.remoteAddress;
    if (remoteAddress == null) {
        throw new IllegalStateException("remoteAddress not set");
    }
    return doConnect(remoteAddress, localAddress());
}

执行该方法,首先会调用validate()方法,确认bootstrap中的参数是否设置完成,然后调用doConnect()方法。

private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    final ChannelPromise promise = channel.newPromise();
    if (regFuture.isDone()) {
        doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
    }
    ...
    // 省略多余的代码
}

进入doConnect()后,首先执行从父类AbstractBootstrap继承过来initAndRegister()方法,如下面的代码所示:

final ChannelFuture initAndRegister() {
    final Channel channel = channelFactory().newChannel();
    try {
        init(channel);
    } catch (Throwable t) {
        channel.unsafe().closeForcibly();
        return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    }
    ChannelFuture regFuture = group().register(channel);
    ...
  	//省略部分代码
    return regFuture;
}

在newChannel()方法中,通过类对象NioSocketChannel.newInstance()方法会调用该类的默认构造方法创建一个该类的实例对象。

public NioSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

通过newSocket(…)方法,会返回一个新的SocketChannel对象,接下来通过一系列的调用链最终来到了抽象类AbstractNioChannel中,调用其构造方法:

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
        ch.configureBlocking(false);
    } catch (IOException e) {
        throw new ChannelException("Failed to enter non-blocking mode.", e);
        // 省略部分代码
        ...
    }
}

在这里,将前面初始化的SocketChannel对象赋值给AbstractNioChannel的ch属性,将readInterestOp设置为SelectionKey.OP_READ,同时设置通道为非阻塞状态。到此实现了netty的channel和nio的channel的关联。继续向下执行Bootstrap自身重写的init()方法对channel进行设置,channel中的ChannelPipeline对象中添加new EchoClientHandler()处理器,并将new ChannelInitializer()对象从中删除,此时channel初始化完成。

void init(Channel channel) throws Exception {
    ChannelPipeline p = channel.pipeline();
    p.addLast(handler());
	...
}

继续向下,执行group().register(channel)方法来注册channel,继续跟踪的话, 会发现其调用链如下:

MultithreadEventLoopGroup.register(…) -> SingleThreadEventLoop.register(…) -> AbstractUnsafe(AbstractChannel内部类).register(…),到此完成channel的注册,即将channel和eventloop关联起来。

在这里插入图片描述

重新回到Bootstrap的deConnect()方法,继续向下执行,会调用doConnect0(…)方法:

private static void doConnect0(...) {
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                if (localAddress == null) {
                    channel.connect(remoteAddress, promise);
                } else {
                    channel.connect(remoteAddress, localAddress, promise);
                }
                promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

启动与channel关联的EventLoop中绑定的线程,执行channel.connect(…),建立和远程地址的连接,这样就可以和服务端进行交互了。



ServerBootstrap

查看源码,会发现ServerBootstrap和AbstractBootstrap相比,多了一些变量及其相关的方法:

// 成员变量
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();
private volatile EventLoopGroup childGroup;
private volatile ChannelHandler childHandler;
// 成员方法
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value)public <T> ServerBootstrap childAttr(AttributeKey<T> childKey, T value)public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup)
public ServerBootstrap childHandler(ChannelHandler childHandler)public EventLoopGroup childGroup()

上面这些变量和方法特别用于服务器应用程序的操作。具体来说,ServerChannel 的实现负责创建子 Channel,这些子 Channel 代表了已被接受的连接。因此,负责引导 ServerChannel 的 ServerBootstrap 提供了这些方法,以简化将设置应用到已被接受的子 Channel 的ChannelConfig 的任务。下图展示了 ServerBootstrap 在 bind() 方法被调用时创建了一个 ServerChannel,并且该ServerChannel 管理了多个子Channel。

在这里插入图片描述

在完成了参数的设置后,服务端就可以通过调用ServerBootstrap#bind()方法监听指定的端口,基本过程描述如下:

  • 初始化创建两个NioEventLoopGroup对象,其中parentGroup用于监听端口并分发请求,childGroup用于处理 I/O 读写事件和业务逻辑;

  • 通过ServerBootstrap配置 EventLoopGroup、Channel 类型,连接参数、handler等内容;

  • 绑定端口,监听相关事件并处理。

下面还是先看看服务端的引导过程(部分):

EventLoopGroup parentGroup = new NioEventLoopGroup();
EventLoopGroup childGroup = new NioEventLoopGroup();
try {
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(parentGroup,childGroup);
    bootstrap.channel(NioServerSocketChannel.class);
    bootstrap.localAddress(port);
    bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new EchoServerHandler());
        }
    });
    ChannelFuture future = bootstrap.bind().sync();
    future.channel().closeFuture().sync();
}finally {
    bossGroup.shutdownGracefully().sync();
}

从代码上看,服务端和客户端的代码大致结构相同,但是还是存在如下的差异:

  • ServerBootstrap用于将服务端绑定到一个指定的端口,需要配置子Channel的属性,有两个EventLoopGroup;
  • Bootstrap用于客户端连接到远程主机和端口,只需要配置自身channel的属性,并且只有一个EventLoopGroup,。

前面的区别上面的分析也提到了,但是为什么两个类中的EventLoopGroup数量不一致呢?

这是因为服务器有两组不同的Channel,其中一组只包含了ServerChannel,代表服务器本身已绑定的端口正在监听的套接字通道(可以类比NIO的ServerSocketChannel),另一组则包含了服务端已接受的客户端连接(可以类比NIO的SocketChannel)。

这样做有什么意义呢?《Netty In Action》中通过下图展示了为何ServerBootstrap需要两个EventLoopGroup。



具有两个 EventLoopGroup 的服务器

我们通过现实生活中的例子来理解一下,将服务端比作是一个饭店,EventLoopGroup比作饭店的职员,客户端比作一个客人。当有大量客人来饭店消费时,因为职员并没有进行分类,所有的职员都执行接待客人的操作(客人的数量大于职员的数量),没有多余的职员来执行炒菜的操作,直至所有客人接待完毕有职员空闲下来,才会去执行炒菜的操作,这样就导致先来的客人等了很久都没能上菜,必然会影响客人的体验。于是饭店经理将职员分为服务员和厨师,当有客人(客户端)来消费时,服务员负责招待点餐,而厨师就负责后续的炒菜,这样即使在大量客人访问时,既能接待到客人,也能服务到客人。

查看ServerBootstrap的源码,我们来看看源码是怎么实现的。

bootstrap.group(group);
-->
@Override
public ServerBootstrap group(EventLoopGroup group) {
     return group(group, group);
}
-->
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
    super.group(parentGroup);
    if (childGroup == null) {
        throw new NullPointerException("childGroup");
    }
    if (this.childGroup != null) {
        throw new IllegalStateException("childGroup set already");
    }
    this.childGroup = childGroup;
    return this;
}

从以上代码可以看出,当我们向ServerBootstrap的group()方法中传入一个EventLoopGroup对象时,它会再次调用group(group, group),将parentGroup和childGroup均设置为group,相当于将处理I/O操作和接收连接的任务交给了同一个EventLoopGroup。

继续向下执行,直到调用bind()方法,前面的代码和Bootstrap在功能上都是类似的,即为应用程序配置相关组件。

接着进入bind()方法:

public ChannelFuture bind() {
    validate();
    SocketAddress localAddress = this.localAddress;
    if (localAddress == null) {
        throw new IllegalStateException("localAddress not set");
    }
    return doBind(localAddress);
}

类似的,首先会调用validate()方法,确认bootstrap中的参数是否设置完成,然后调用doBind(…)方法。

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
    }
    // 省略多余的代码
    ...
}

首先还是执行initAndRegister()方法,和客户端的流程的一样,依次调用newChannel(…)、init(…)和register(…)方法,对Channel对象进行初始化、配置以及将其和eventLoop关联起来,只是这里的Channel对象变为了NioServerSocketChannel。

然后重新回到doBind(…)方法,继续向下执行doBind0(…):

private static void doBind0(
    final ChannelFuture regFuture, final Channel channel,
    final SocketAddress localAddress, final ChannelPromise promise) {
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

和客户端类似,启动与channel关联的EventLoop中绑定的线程,执行channel.bind(…),监听指定的端口,这样就可以和客户端进行交互了。



版权声明:本文为Lincain原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。