Netty成长之路——心跳保活

  • Post author:
  • Post category:其他



引言

在我上一篇《Netty成长之路——初始Netty》文章列举的演示Netty客户端和服务端使用效果示例中,要想保持Netty客户端与服务端长连接不中断的目的。就需要加入另一个机制——心跳保活。


1、什么是心跳保活

心跳保活,是应用层通过心跳包的超时、重连切换等方式来执行重连操作。心跳一般是指定客户端或服务端,每隔一定时间向另一端发送一段自动命令消息,以判断双方是否存活,因其按照一定间隔发送,类似于心跳,保证存活。故被称为心跳保活。


2、为什么要使用心跳保活

心跳保活处于应用层维持通讯双方长连接的机制。它是长连接保活的一种方式。


3、实现原理



要想实现心跳保活其实并不难,基本原理是客户端和服务端建立连接后,通常由客户端向服务端发送自定义消息命令,若服务端收到消息后,回复一个自定义的消息给客户端。客户端收到后,表示双方连接互通;倘若服务端未收到消息,则表示连接失败,如果失败的次数达到指定上限后,则重新发起连接。定时执行这样的流程,就会建立起服务端和客户端之间的心跳保活机制。


4、实战

结合上一篇《Netty成长之路——初始Netty》中示例代码,我们实现客户端发起自定义消息命令,服务端收到后把消息内容原样回复给客户端。这样,客户端和服务端增加和修改的代码如下。


Netty客户端

NettyClientHandler.java ,主要增加自定义消息命令。


import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * Netty 客户端
 * @program: nettyChat
 * @Date: 2015/07/29
 * @Author: william liu
 * @Description:
 */
public class NettyClientHandler extends SimpleChannelInboundHandler<String> {
    /** 客户端请求的心跳命令 */
    private static final ByteBuf HEARTBEAT_SEQUENCE =
            Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("hb_request", CharsetUtil.UTF_8));
    /** 空闲次数 */
    private int idle_count = 1;

    /** 发送次数 */
    private int count = 1;

    /** 循环次数 */
    private int fcount = 1;

    /**
     * 建立连接时
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("建立连接时:" + date());
        ctx.fireChannelActive();
    }

    /**
     * 心跳请求处理,每4秒发送一次心跳请求;
     *
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
        System.out.println("\r\n循环请求的时间:" + date() + ",次数" + fcount);

        if (obj instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) obj;
            if (IdleState.WRITER_IDLE.equals(event.state())) { // 如果写通道处于空闲状态就发送心跳命令
                // 设置发送次数,允许发送3次心跳包
                if (idle_count <= 3) {
                    idle_count++;
                    ctx.channel().writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());
                } else {
                    System.out.println("心跳包发送结束,不再发送心跳请求!!!");
                }
            }
        }

        fcount++;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
        System.out.println("客户端消息:"+s);
    }

    private String date(){
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        return sdf.format(new Date());
    }
}

NettyClientInitializer.java 设置发送心跳消息的时间。

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;

/**
 * @program: netty客户端
 * @Date: 2015/5/29 17:31
 * @Author: william liu
 * @Description: 客户端-初始化配置
 */
public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        //设置定时时间5秒
        pipeline.addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS));
        pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());
        pipeline.addLast("handler", new NettyClientHandler());
    }
}


Netty服务端

NettyServerHandler.java 主要增加收到客户端消息后的处理机制。具体是如果收到客户端消息则回复“服务端成功收到心跳信息”消息内容给客户端;若超时处理,如果5秒没有收到客户端的心跳,就触发; 如果超过两次,则直接关闭。

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.swagger.models.auth.In;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * Websocket netty服务端
 *
 * @program: netty
 * @Date: 2015/05/31
 * @Author: william.liu
 * @Description:
 */
@Slf4j
public class NettyServerHandler extends SimpleChannelInboundHandler<String> {

    /**
     * 空闲次数
     */
    private int idle_count = 1;
    /**
     * 发送次数
     */
    private int count = 1;
    /**
     * A thread-safe Set  Using ChannelGroup, you can categorize Channels into a meaningful group.
     * A closed Channel is automatically removed from the collection,
     */
    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    /**
     * 超时处理,如果5秒没有收到客户端的心跳,就触发; 如果超过两次,则直接关闭;
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
        Channel incoming = ctx.channel();
        if (obj instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) obj;
            // 如果读通道处于空闲状态,说明没有接收到心跳命令
            if (IdleState.READER_IDLE.equals(event.state())) { 
                if (idle_count > 2) {
                    log.info("超过两次无客户端请求,关闭该channel");
                    ctx.channel().close();
                }

                log.info("已等待5秒还没收到客户端发来的消息");
                idle_count++;
            }
        } else {
            super.userEventTriggered(ctx, obj);
        }
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel incoming = ctx.channel();

        // Broadcast a message to multiple Channels
        channels.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 加入\n");

        channels.add(ctx.channel());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel incoming = ctx.channel();

        // Broadcast a message to multiple Channels
        channels.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 离开\n");

        // A closed Channel is automatically removed from ChannelGroup,
        // so there is no need to do "channels.remove(ctx.channel());"
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
        log.info("接收到客户端消息:{}", s);
        try {
            if (!StringUtils.isEmpty(s)) {
                PushNettyData jsonMsg = GsonUtils.fromJson(s, PushNettyData.class);

            // 如果是心跳命令,服务端收到命令后回复一个相同的命令给客户端
            if ("hb_request".equals(jsonMsg.getMsgType())) {
                ctx.write("服务端成功收到心跳信息");
                ctx.flush();
            }
            }
        } finally {
            ReferenceCountUtil.release(s);
        }

        Channel incoming = ctx.channel();
        for (Channel channel : channels) {
            if (channel != incoming) {
                channel.writeAndFlush("[" + incoming.remoteAddress() + "]" + s + "\n");
            } else {
                BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
                while (true) {
                    channel.writeAndFlush("[server]" + in.readLine() + "\r\n");
                }
            }
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel incoming = ctx.channel();
        log.info("NettyClient:" + incoming.remoteAddress() + "在线");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel incoming = ctx.channel();
        log.info("NettyClient:" + incoming.remoteAddress() + "掉线");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        Channel incoming = ctx.channel();
        log.info("NettyClient:" + incoming.remoteAddress() + "异常");
        // 当出现异常就关闭连接
        cause.printStackTrace();
        ctx.close();
    }
}

至此,心跳保活机制建立完成。这样我们使用netty时就不必过多考虑客户端和服务端连接断开的事情了。



版权声明:本文为lotus35原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。