netty服务器返回信息关闭,netty4 服务端同步客户端返回的结果

  • Post author:
  • Post category:其他


netty是一个异步通讯框架,在有的时候咱们想使用服务端向客户端发送消息,服务端同步等待客户端返回结果真进行下一步的业务逻辑操做。那要怎么作才能同步获取客户端返回的数据呢?这里我用到了JDK中的闭锁等待 CountDownLatch,接下来看看代码如何实现:java

服务端:git

package com.example.demo.server;

import com.example.demo.cache.ChannelMap;

import com.example.demo.model.Result;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.Channel;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelOption;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.Lock;

import lombok.extern.slf4j.Slf4j;

/**

* @ClassName: NettyServer

* @Author: huangzf

* @Date: 2018/9/25 15:40

* @Description:

*/

@Slf4j

public class NettyServer {

private NettyServerChannelInitializer serverChannelInitializer = null;

private int port = 8000;

public void bind() throws Exception {

//配置服务端的NIO线程组

EventLoopGroup bossGroup = new NioEventLoopGroup();

EventLoopGroup workerGroup = new NioEventLoopGroup();

try {

serverChannelInitializer = new NettyServerChannelInitializer();

ServerBootstrap b = new ServerBootstrap();

b.group(bossGroup, workerGroup)

.channel(NioServerSocketChannel.class)

//保持长链接

.childOption(ChannelOption.SO_KEEPALIVE,true)

.option(ChannelOption.SO_BACKLOG, 1024)

.childHandler(serverChannelInitializer);

//绑定端口,同步等待成功

ChannelFuture f = b.bind(port).sync();

//等待服务器监听端口关闭

f.channel().closeFuture().sync();

} finally {

//释放线程池资源

bossGroup.shutdownGracefully();

workerGroup.shutdownGracefully();

}

}

public Result write(Object obj, String tenantId ,String uniId) throws Exception {

// 获取锁

Lock lock = ChannelMap.getChannelLock(tenantId);

try {

Channel channel = ChannelMap.getChannel(tenantId);

if(channel != null){

lock.lock();

if(channel.isOpen()){

// 设置同步

CountDownLatch latch = new CountDownLatch(1);

NettyServerHandler nettyServerHandler = (NettyServerHandler) channel.pipeline().get(“handler”);

nettyServerHandler.resetSync(latch,1);

nettyServerHandler.setUnidId(uniId);

channel.writeAndFlush(obj );

//同步返回结果

if (latch.await(60,TimeUnit.SECONDS)){

// printerServerHandler.setTimeout(0);

return nettyServerHandler.getResult();

}

//若是超时,将超时标志设置为1

//printerServerHandler.setTimeout(1);

log.error(“请求超时60s”);

return new Result(2,”请求超时”,null);

}else{

return new Result(0,”客户端已关闭!”,null);

}

}

}catch (Exception e){

e.printStackTrace();

return new Result(0,”服务出错!”,null);

}finally {

if (lock != null){

lock.unlock();

}

}

return new Result(0,”客户端没有链接!”,null);

}

public static void main(String[] args) throws Exception {

new NettyServer().bind();

}

}

代码中write方法是业务代码调用服务端向客户端发送信息的统一入口,这里用了Lock是为了防止并发操做影响数据返回的问题,这里每一个客户端通道分配一个锁。latch.await(60,TimeUnit.SECONDS) 是为了阻塞程序,等待客户端返回结果,若是60s内没有返回结果则释放锁并返回请求超时。bootstrap

服务端NettyServerChannelInitializer 的实现服务器

package com.example.demo.server;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelPipeline;

import io.netty.channel.socket.SocketChannel;

import io.netty.handler.codec.serialization.ClassResolvers;

import io.netty.handler.codec.serialization.ObjectDecoder;

import io.netty.handler.codec.serialization.ObjectEncoder;

import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

/**

* @ClassName: NettyServerChannelInitializer

* @Author: huangzf

* @Date: 2018/9/25 15:43

* @Description:

*/

public class NettyServerChannelInitializer extends ChannelInitializer {

private NettyServerHandler handler ;

@Override

protected void initChannel(SocketChannel socketChannel) throws Exception {

ChannelPipeline pipeline = socketChannel.pipeline();

pipeline.addLast(“decoder”, new ObjectDecoder(Integer.MAX_VALUE,ClassResolvers

.weakCachingConcurrentResolver(this.getClass().getClassLoader())));

pipeline.addLast(“encoder”, new ObjectEncoder());

pipeline.addLast(new IdleStateHandler(40,0,0,TimeUnit.SECONDS));

//服务器的逻辑

handler = new NettyServerHandler();

pipeline.addLast(“handler”, handler);

}

}

这里使用了对象进行数据传输,避免了客户端从新解析组装对象的麻烦并发

package com.example.demo.server;

import com.example.demo.cache.ChannelMap;

import com.example.demo.model.Result;

import com.example.demo.model.Tenant;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

import io.netty.handler.timeout.IdleState;

import io.netty.handler.timeout.IdleStateEvent;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.locks.ReentrantLock;

import lombok.extern.slf4j.Slf4j;

/**

* @ClassName: NettyServerHandler

* @Author: huangzf

* @Date: 2018/9/25 15:44

* @Description:

*/

@Slf4j

public class NettyServerHandler extends SimpleChannelInboundHandler {

private CountDownLatch latch;

/**

* 消息的惟一ID

*/

private String unidId = “”;

/**

* 同步标志

*/

private int rec;

/**

* 客户端返回的结果

*/

private Result result;

/**

* 心跳丢失次数

*/

private int counter = 0;

@Override

protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {

System.out.println(“Client say : ” + msg.toString());

if(msg instanceof Tenant){

ChannelMap.setChannel(((Tenant) msg).getTenantId(),ctx.channel());

ChannelMap.setChannelLock(((Tenant) msg).getTenantId(),new ReentrantLock());

}

counter = 0;

if(rec == 1 && msg instanceof Result){

Result re = (Result) msg;

//校验返回的信息是不是同一个信息

if (unidId.equals(re.getUniId())){

latch.countDown();//消息返回完毕,释放同步锁,具体业务须要判断指令是否匹配

rec = 0;

result = re;

}

}

}

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

log.info(“RemoteAddress : ” + ctx.channel().remoteAddress().toString()+ ” active !”);

super.channelActive(ctx);

}

@Override

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

if (evt instanceof IdleStateEvent) {

IdleStateEvent event = (IdleStateEvent) evt;

if (event.state().equals(IdleState.READER_IDLE)){

// 空闲40s以后触发 (心跳包丢失)

if (counter >= 3) {

// 连续丢失3个心跳包 (断开链接)

ctx.channel().close().sync();

log.error(“已与”+ctx.channel().remoteAddress()+”断开链接”);

System.out.println(“已与”+ctx.channel().remoteAddress()+”断开链接”);

} else {

counter++;

log.debug(ctx.channel().remoteAddress() + “丢失了第 ” + counter + ” 个心跳包”);

System.out.println(“丢失了第 ” + counter + ” 个心跳包”);

}

}

}

}

public void resetSync(CountDownLatch latch, int rec) {

this.latch = latch;

this.rec = rec;

}

public void setUnidId(String s){

this.unidId = s;

}

public Result getResult() {

return result;

}

}

在channelRead0方法中 若是读取到的信息是Tenant (客户端刚链接上发送的消息)就为该客户端关联一个惟一标志和分配一个锁Lock(用于并发操做)框架

若是读取到的信息是Result(客户端响服务端的消息)就判断其是不是同一个消息(服务端发送的消息中带有该消息的惟一id,客户端返回时也要带上该id),若是是就latch.countDown() 释放同步锁,这样就能够使得服务端同步获得客户端返回的消息了。异步

详情与客户端代码请移步码云:socket