https://blog.csdn.net/youanyyou/article/details/117757273
我们知道在
TCP
长连接或者
WebSocket
长连接中一般我们都会使用心跳机制
–
即发送特殊的数据包来通告对方自己的业务还没有办完,不要关闭链接。
那么心跳机制可以用来做什么呢?
我们知道网络的传输是不可靠的,当我们发起一个链接请求的过程之中会发生什么事情谁都无法预料,或者断电,服务器重启,断网线之类。
如果有这种情况的发生对方也无法判断你是否还在线。所以这时候我们引入心跳机制,在长链接中双方没有数据交互的时候互相发送数据
(
可能是空包,也可能是特殊数据
)
,对方收到该数据之后也回复相应的数据用以确保双方都在线,这样就可以确保当前链接是有效的。
1.
如何实现心跳机制
一般实现心跳机制由两种方式:
-
TCP
协议自带的心跳机制来实现;
-
在应用层来实现。
但是
TCP
协议自带的心跳机制系统默认是设置的是
2
小时的心跳频率。它检查不到机器断电、网线拔出、防火墙这些断线。而且逻辑层处理断线可能也不是那么好处理。另外该心跳机制是与
TCP
协议绑定的,那如果我们要是使用
UDP
协议岂不是用不了?所以一般我们都不用。
而一般我们自己实现呢大致的策略是这样的:
-
Client
启动一个定时器,不断发送心跳;
-
Server
收到心跳后,做出回应;
-
Server
启动一个定时器,判断
Client
是否存在,这里做判断有两种方法:时间差和简单标识。
时间差:
-
收到一个心跳包之后记录当前时间;
-
判断定时器到达时间,计算多久没收到心跳时间
=
当前时间
–
上次收到心跳时间。如果改时间大于设定值则认为超时。
简单标识:
-
收到心跳后设置连接标识为
true;
-
判断定时器到达时间,如果未收到心跳则设置连接标识为
false;
今天我们来看一下
Netty
的心跳机制的实现,在
Netty
中提供了
IdleStateHandler
类来进行心跳的处理,它可以对一个
Channel
的
读
/
写设置定时器
,
当
Channel
在一定事件间隔内没有数据交互时
(
即处于
idle
状态
),
就会触发指定的事件。
该类可以对三种类型的超时做心跳机制检测:
-
public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
-
this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
-
}
-
readerIdleTimeSeconds
:设置读超时时间;
-
writerIdleTimeSeconds
:设置写超时时间;
-
allIdleTimeSeconds
:同时为读或写设置超时时间;
下面我们还是通过一个例子来讲解
IdleStateHandler
的使用。
-
public class HeartBeatServer {
-
private int port;
-
public HeartBeatServer(int port) {
-
this.port = port;
-
}
-
public void start(){
-
EventLoopGroup bossGroup = new NioEventLoopGroup();
-
EventLoopGroup workGroup = new NioEventLoopGroup();
-
ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)
-
.channel(NioServerSocketChannel.class)
-
.childHandler(new HeartBeatServerChannelInitializer());
-
try {
-
ChannelFuture future = server.bind(port).sync();
-
future.channel().closeFuture().sync();
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
}finally {
-
bossGroup.shutdownGracefully();
-
workGroup.shutdownGracefully();
-
}
-
}
-
public static void main(String[] args) {
-
HeartBeatServer server = new HeartBeatServer(7788);
-
server.start();
-
}
-
}
-
public class HeartBeatServerChannelInitializer extends ChannelInitializer<SocketChannel> {
-
@Override
-
protected void initChannel(SocketChannel socketChannel) throws Exception {
-
ChannelPipeline pipeline = socketChannel.pipeline();
-
pipeline.addLast(“handler”,new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
-
pipeline.addLast(“decoder”, new StringDecoder());
-
pipeline.addLast(“encoder”, new StringEncoder());
-
pipeline.addLast(new HeartBeatServerHandler());
-
}
-
}
在这里IdleStateH
andler
也是
handler
的一种,所以加入
addLast
。我们分别设置
4
个参数:读超时时间为
3s
,写超时和读写超时为
0
,然后加入时间控制单元。另外,关注公众号
Java
技术栈,在后台回复:面试,可以获取我整理的
Java
系列面试题和答案,非常齐全。
服务端
handler
:
-
public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter{
-
private int loss_connect_time = 0;
-
@Override
-
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-
System.out.println(ctx.channel().remoteAddress() + “Server :” + msg.toString());
-
}
-
@Override
-
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
-
if(evt instanceof IdleStateEvent){
-
//
服务端对应着读事件,当为
READER_IDLE
时触发
-
IdleStateEvent event = (IdleStateEvent)evt;
-
if(event.state() == IdleState.READER_IDLE){
-
loss_connect_time++;
-
System.out.println(”
接收消息超时
”
);
-
if(loss_connect_time > 2){
-
System.out.println(”
关闭不活动的链接
”
);
-
ctx.channel().close();
-
}
-
}else{
-
super.userEventTriggered(ctx,evt);
-
}
-
}
-
}
-
@Override
-
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-
ctx.close();
-
}
-
}
我们看到在
handler
中调用了
userEventTriggered
方法,
IdleStateEvent
的
state()
方法一个有三个值:
READER_IDLE
,
WRITER_IDLE
,
ALL_IDLE
。正好对应读事件写事件和读写事件。
再来写一下客户端:
-
public class HeartBeatsClient {
-
private int port;
-
private String address;
-
public HeartBeatsClient(int port, String address) {
-
this.port = port;
-
this.address = address;
-
}
-
public void start(){
-
EventLoopGroup group = new NioEventLoopGroup();
-
Bootstrap bootstrap = new Bootstrap();
-
bootstrap.group(group)
-
.channel(NioSocketChannel.class)
-
.handler(new HeartBeatsClientChannelInitializer());
-
try {
-
ChannelFuture future = bootstrap.connect(address,port).sync();
-
future.channel().closeFuture().sync();
-
} catch (Exception e) {
-
e.printStackTrace();
-
}finally {
-
group.shutdownGracefully();
-
}
-
}
-
public static void main(String[] args) {
-
HeartBeatsClient client = new HeartBeatsClient(7788,”127.0.0.1″);
-
client.start();
-
}
-
}
客户端
Initializer
:
-
public class HeartBeatsClientChannelInitializer extends ChannelInitializer<SocketChannel> {
-
protected void initChannel(SocketChannel socketChannel) throws Exception {
-
ChannelPipeline pipeline = socketChannel.pipeline();
-
pipeline.addLast(“handler”, new IdleStateHandler(0, 3, 0, TimeUnit.SECONDS));
-
pipeline.addLast(“decoder”, new StringDecoder());
-
pipeline.addLast(“encoder”, new StringEncoder());
-
pipeline.addLast(new HeartBeatClientHandler());
-
}
-
}
这里我们设置了
IdleStateHandler
的写超时为
3
秒,客户端执行的动作为写消息到服务端,服务端执行读动作。
Spring Boot 学习笔记
分享给你看下。
-
public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {
-
private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer(“Heartbeat”,
-
CharsetUtil.UTF_8));
-
private static final int TRY_TIMES = 3;
-
private int currentTime = 0;
-
@Override
-
public void channelActive(ChannelHandlerContext ctx) throws Exception {
-
System.out.println(”
激活时间是:
”
+new Date());
-
System.out.println(”
链接已经激活
”
);
-
ctx.fireChannelActive();
-
}
-
@Override
-
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-
System.out.println(”
停止时间是:
”
+new Date());
-
System.out.println(”
关闭链接
”
);
-
}
-
@Override
-
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
-
System.out.println(”
当前轮询时间:
”
+new Date());
-
if (evt instanceof IdleStateEvent) {
-
IdleStateEvent event = (IdleStateEvent) evt;
-
if (event.state() == IdleState.WRITER_IDLE) {
-
if(currentTime <= TRY_TIMES){
-
System.out.println(“currentTime:”+currentTime);
-
currentTime++;
-
ctx.channel().writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());
-
}
-
}
-
}
-
}
-
@Override
-
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-
String message = (String) msg;
-
System.out.println(message);
-
if (message.equals(“Heartbeat”)) {
-
ctx.write(“has read message from server”);
-
ctx.flush();
-
}
-
ReferenceCountUtil.release(msg);
-
}
-
@Override
-
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-
ctx.close();
-
}
-
}
我们再来屡一下思路:
-
首先客户端激活
channel
,因为客户端中并没有发送消息所以会触发客户端的
IdleStateHandler
,它设置的写超时时间为
3s
;
-
然后触发客户端的事件机制进入
userEventTriggered
方法,在触发器中计数并向客户端发送消息;
-
服务端接收消息;
-
客户端触发器继续轮询发送消息,直到计数器满不再向服务端发送消息;
-
服务端在
IdleStateHandler
设置的读消息超时时间
5s
内未收到消息,触发了服务端中
handler
的
userEventTriggered
方法,于是关闭客户端的链接。
大体我们的简单心跳机制就是这样的思路,通过事件触发机制以及计数器的方式来实现,上面我们的案例中最后客户端没有发送消息的时候我们是强制断开了客户端的链接,那么既然可以关闭,我们是不是也可是重新链接客户端呢?因为万一客户端本身并不想关闭而是由于别的原因导致他无法与服务端通信。下面我们来说一下重连机制。
当我们的服务端在未读到客户端消息超时而关闭客户端的时候我们一般在客户端的
finally
块中方的是关闭客户端的代码,这时我们可以做一下修改的,
finally
是一定会被执行新的,所以我们可以在
finally
块中重新调用一下启动客户端的代码,这样就又重新启动了客户端了,
上客户端代码:
-
/**
-
*
本
Client
为测试
netty
重连机制
-
* Server
端代码都一样,所以不做修改
-
*
只用在
client
端中做一下判断即可
-
*/
-
public class HeartBeatsClient2 {
-
private int port;
-
private String address;
-
ChannelFuture future;
-
public HeartBeatsClient2(int port, String address) {
-
this.port = port;
-
this.address = address;
-
}
-
public void start(){
-
EventLoopGroup group = new NioEventLoopGroup();
-
Bootstrap bootstrap = new Bootstrap();
-
bootstrap.group(group)
-
.channel(NioSocketChannel.class)
-
.handler(new HeartBeatsClientChannelInitializer());
-
try {
-
future = bootstrap.connect(address,port).sync();
-
future.channel().closeFuture().sync();
-
} catch (Exception e) {
-
e.printStackTrace();
-
}finally {
-
//group.shutdownGracefully();
-
if (null != future) {
-
if (future.channel() != null && future.channel().isOpen()) {
-
future.channel().close();
-
}
-
}
-
System.out.println(”
准备重连
”
);
-
start();
-
System.out.println(”
重连成功
”
);
-
}
-
}
-
public static void main(String[] args) {
-
HeartBeatsClient2 client = new HeartBeatsClient2(7788,”127.0.0.1″);
-
client.start();
-
}
-
}
其余部分的代码与上面的实例并无异同
,只需改造客户端即可,我们再运行服务端和客户端会看到客户端虽然被关闭了,但是立马又被重启: