Netty 中的心跳机制详解

  • Post author:
  • Post category:其他



https://blog.csdn.net/youanyyou/article/details/117757273


我们知道在


TCP


长连接或者


WebSocket


长连接中一般我们都会使用心跳机制





即发送特殊的数据包来通告对方自己的业务还没有办完,不要关闭链接。



那么心跳机制可以用来做什么呢?


我们知道网络的传输是不可靠的,当我们发起一个链接请求的过程之中会发生什么事情谁都无法预料,或者断电,服务器重启,断网线之类。


如果有这种情况的发生对方也无法判断你是否还在线。所以这时候我们引入心跳机制,在长链接中双方没有数据交互的时候互相发送数据


(


可能是空包,也可能是特殊数据


)


,对方收到该数据之后也回复相应的数据用以确保双方都在线,这样就可以确保当前链接是有效的。



1.


如何实现心跳机制


一般实现心跳机制由两种方式:


  • TCP


    协议自带的心跳机制来实现;

  • 在应用层来实现。


但是


TCP


协议自带的心跳机制系统默认是设置的是


2


小时的心跳频率。它检查不到机器断电、网线拔出、防火墙这些断线。而且逻辑层处理断线可能也不是那么好处理。另外该心跳机制是与


TCP


协议绑定的,那如果我们要是使用


UDP


协议岂不是用不了?所以一般我们都不用。



而一般我们自己实现呢大致的策略是这样的:


  1. Client


    启动一个定时器,不断发送心跳;

  2. Server


    收到心跳后,做出回应;

  3. Server


    启动一个定时器,判断


    Client


    是否存在,这里做判断有两种方法:时间差和简单标识。



时间差:


  1. 收到一个心跳包之后记录当前时间;

  2. 判断定时器到达时间,计算多久没收到心跳时间


    =


    当前时间





    上次收到心跳时间。如果改时间大于设定值则认为超时。



简单标识:


  1. 收到心跳后设置连接标识为


    true;

  2. 判断定时器到达时间,如果未收到心跳则设置连接标识为


    false;


今天我们来看一下


Netty


的心跳机制的实现,在


Netty


中提供了


IdleStateHandler


类来进行心跳的处理,它可以对一个


Channel








/


写设置定时器


,





Channel


在一定事件间隔内没有数据交互时


(


即处于


idle


状态


),


就会触发指定的事件。



该类可以对三种类型的超时做心跳机制检测:


  1. public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {


  2. this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);

  3. }

  • readerIdleTimeSeconds


    :设置读超时时间;

  • writerIdleTimeSeconds


    :设置写超时时间;

  • allIdleTimeSeconds


    :同时为读或写设置超时时间;


下面我们还是通过一个例子来讲解


IdleStateHandler


的使用。



服务端:


  1. public class HeartBeatServer {


  2. private int port;

  3. public HeartBeatServer(int port) {


  4. this.port = port;

  5. }

  6. public void start(){


  7. EventLoopGroup bossGroup = new NioEventLoopGroup();

  8. EventLoopGroup workGroup = new NioEventLoopGroup();

  9. ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)

  10. .channel(NioServerSocketChannel.class)

  11. .childHandler(new HeartBeatServerChannelInitializer());

  12. try {


  13. ChannelFuture future = server.bind(port).sync();

  14. future.channel().closeFuture().sync();

  15. } catch (InterruptedException e) {


  16. e.printStackTrace();

  17. }finally {


  18. bossGroup.shutdownGracefully();

  19. workGroup.shutdownGracefully();

  20. }

  21. }

  22. public static void main(String[] args) {


  23. HeartBeatServer server = new HeartBeatServer(7788);

  24. server.start();

  25. }

  26. }



服务端Initializer:


  1. public class HeartBeatServerChannelInitializer extends ChannelInitializer<SocketChannel> {


  2. @Override

  3. protected void initChannel(SocketChannel socketChannel) throws Exception {


  4. ChannelPipeline pipeline = socketChannel.pipeline();

  5. pipeline.addLast(“handler”,new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));

  6. pipeline.addLast(“decoder”, new StringDecoder());

  7. pipeline.addLast(“encoder”, new StringEncoder());

  8. pipeline.addLast(new HeartBeatServerHandler());

  9. }

  10. }



在这里IdleStateH

andler


也是


handler


的一种,所以加入


addLast


。我们分别设置


4


个参数:读超时时间为


3s


,写超时和读写超时为


0


,然后加入时间控制单元。另外,关注公众号


Java


技术栈,在后台回复:面试,可以获取我整理的


Java


系列面试题和答案,非常齐全。


服务端


handler




  1. public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter{


  2. private int loss_connect_time = 0;

  3. @Override

  4. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {


  5. System.out.println(ctx.channel().remoteAddress() + “Server :” + msg.toString());

  6. }

  7. @Override

  8. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {


  9. if(evt instanceof IdleStateEvent){


  10. //

    服务端对应着读事件,当为


    READER_IDLE


    时触发


  11. IdleStateEvent event = (IdleStateEvent)evt;

  12. if(event.state() == IdleState.READER_IDLE){


  13. loss_connect_time++;

  14. System.out.println(”

    接收消息超时






    );

  15. if(loss_connect_time > 2){


  16. System.out.println(”

    关闭不活动的链接






    );

  17. ctx.channel().close();

  18. }

  19. }else{


  20. super.userEventTriggered(ctx,evt);

  21. }

  22. }

  23. }

  24. @Override

  25. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {


  26. ctx.close();

  27. }

  28. }


我们看到在


handler


中调用了


userEventTriggered


方法,


IdleStateEvent





state()


方法一个有三个值:


READER_IDLE





WRITER_IDLE





ALL_IDLE


。正好对应读事件写事件和读写事件。


再来写一下客户端:


  1. public class HeartBeatsClient {


  2. private  int port;

  3. private  String address;

  4. public HeartBeatsClient(int port, String address) {


  5. this.port = port;

  6. this.address = address;

  7. }

  8. public void start(){


  9. EventLoopGroup group = new NioEventLoopGroup();

  10. Bootstrap bootstrap = new Bootstrap();

  11. bootstrap.group(group)

  12. .channel(NioSocketChannel.class)

  13. .handler(new HeartBeatsClientChannelInitializer());

  14. try {


  15. ChannelFuture future = bootstrap.connect(address,port).sync();

  16. future.channel().closeFuture().sync();

  17. } catch (Exception e) {


  18. e.printStackTrace();

  19. }finally {


  20. group.shutdownGracefully();

  21. }

  22. }

  23. public static void main(String[] args) {


  24. HeartBeatsClient client = new HeartBeatsClient(7788,”127.0.0.1″);

  25. client.start();

  26. }

  27. }


客户端


Initializer




  1. public class HeartBeatsClientChannelInitializer extends  ChannelInitializer<SocketChannel> {


  2. protected void initChannel(SocketChannel socketChannel) throws Exception {


  3. ChannelPipeline pipeline = socketChannel.pipeline();

  4. pipeline.addLast(“handler”, new IdleStateHandler(0, 3, 0, TimeUnit.SECONDS));

  5. pipeline.addLast(“decoder”, new StringDecoder());

  6. pipeline.addLast(“encoder”, new StringEncoder());

  7. pipeline.addLast(new HeartBeatClientHandler());

  8. }

  9. }


这里我们设置了


IdleStateHandler


的写超时为


3


秒,客户端执行的动作为写消息到服务端,服务端执行读动作。



Spring Boot 学习笔记



分享给你看下。



客户端handler:


  1. public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {


  2. private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer(“Heartbeat”,

  3. CharsetUtil.UTF_8));

  4. private static final int TRY_TIMES = 3;

  5. private int currentTime = 0;

  6. @Override

  7. public void channelActive(ChannelHandlerContext ctx) throws Exception {


  8. System.out.println(”

    激活时间是:






    +new Date());

  9. System.out.println(”

    链接已经激活






    );

  10. ctx.fireChannelActive();

  11. }

  12. @Override

  13. public void channelInactive(ChannelHandlerContext ctx) throws Exception {


  14. System.out.println(”

    停止时间是:






    +new Date());

  15. System.out.println(”

    关闭链接






    );

  16. }

  17. @Override

  18. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {


  19. System.out.println(”

    当前轮询时间:






    +new Date());

  20. if (evt instanceof IdleStateEvent) {


  21. IdleStateEvent event = (IdleStateEvent) evt;

  22. if (event.state() == IdleState.WRITER_IDLE) {


  23. if(currentTime <= TRY_TIMES){


  24. System.out.println(“currentTime:”+currentTime);

  25. currentTime++;

  26. ctx.channel().writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());

  27. }

  28. }

  29. }

  30. }

  31. @Override

  32. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {


  33. String message = (String) msg;

  34. System.out.println(message);

  35. if (message.equals(“Heartbeat”)) {


  36. ctx.write(“has read message from server”);

  37. ctx.flush();

  38. }

  39. ReferenceCountUtil.release(msg);

  40. }

  41. @Override

  42. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {


  43. ctx.close();

  44. }

  45. }



启动服务端和客户端我们看到输出为:



我们再来屡一下思路:


  1. 首先客户端激活


    channel


    ,因为客户端中并没有发送消息所以会触发客户端的


    IdleStateHandler


    ,它设置的写超时时间为


    3s




  2. 然后触发客户端的事件机制进入


    userEventTriggered


    方法,在触发器中计数并向客户端发送消息;

  3. 服务端接收消息;

  4. 客户端触发器继续轮询发送消息,直到计数器满不再向服务端发送消息;

  5. 服务端在


    IdleStateHandler


    设置的读消息超时时间


    5s


    内未收到消息,触发了服务端中


    handler





    userEventTriggered


    方法,于是关闭客户端的链接。


大体我们的简单心跳机制就是这样的思路,通过事件触发机制以及计数器的方式来实现,上面我们的案例中最后客户端没有发送消息的时候我们是强制断开了客户端的链接,那么既然可以关闭,我们是不是也可是重新链接客户端呢?因为万一客户端本身并不想关闭而是由于别的原因导致他无法与服务端通信。下面我们来说一下重连机制。


当我们的服务端在未读到客户端消息超时而关闭客户端的时候我们一般在客户端的


finally


块中方的是关闭客户端的代码,这时我们可以做一下修改的,


finally


是一定会被执行新的,所以我们可以在


finally


块中重新调用一下启动客户端的代码,这样就又重新启动了客户端了,



上客户端代码:



  1. /**


  2. *





    Client


    为测试


    netty


    重连机制

  3. * Server


    端代码都一样,所以不做修改

  4. *


    只用在


    client


    端中做一下判断即可

  5. */

  6. public class HeartBeatsClient2 {


  7. private  int port;

  8. private  String address;

  9. ChannelFuture future;

  10. public HeartBeatsClient2(int port, String address) {


  11. this.port = port;

  12. this.address = address;

  13. }

  14. public void start(){


  15. EventLoopGroup group = new NioEventLoopGroup();

  16. Bootstrap bootstrap = new Bootstrap();

  17. bootstrap.group(group)

  18. .channel(NioSocketChannel.class)

  19. .handler(new HeartBeatsClientChannelInitializer());

  20. try {


  21. future = bootstrap.connect(address,port).sync();

  22. future.channel().closeFuture().sync();

  23. } catch (Exception e) {


  24. e.printStackTrace();

  25. }finally {


  26. //group.shutdownGracefully();

  27. if (null != future) {


  28. if (future.channel() != null && future.channel().isOpen()) {


  29. future.channel().close();

  30. }

  31. }

  32. System.out.println(”

    准备重连






    );

  33. start();

  34. System.out.println(”

    重连成功






    );

  35. }

  36. }

  37. public static void main(String[] args) {


  38. HeartBeatsClient2 client = new HeartBeatsClient2(7788,”127.0.0.1″);

  39. client.start();

  40. }

  41. }



其余部分的代码与上面的实例并无异同



,只需改造客户端即可,我们再运行服务端和客户端会看到客户端虽然被关闭了,但是立马又被重启: