1.创建服务端
/**
* 1.服务端
*/
public class NettyServer {
private Logger logger = LoggerFactory.getLogger(this.getClass());
public boolean isRunning = false;
private EventLoopGroup bossGroup = null;
private EventLoopGroup workGroup = null;
private ServerBootstrap bootstrap = null;
public void start(InetSocketAddress socketAddress) {
logger.info("Netty服务端启动成功,当前线程名称:" + Thread.currentThread().getName());
//new 一个主线程组
bossGroup = new NioEventLoopGroup(5);
//new 一个工作线程组
workGroup = new NioEventLoopGroup(200);
bootstrap = new ServerBootstrap()
.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ServerChannelInitializer())
.localAddress(socketAddress)
//设置队列大小
.option(ChannelOption.SO_BACKLOG, 1024)
// 两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
.childOption(ChannelOption.SO_KEEPALIVE, true);
//绑定端口,开始接收进来的连接
try {
ChannelFuture future = bootstrap.bind(socketAddress).sync();
logger.info("服务器启动开始监听端口: {}", socketAddress.getPort());
isRunning = true;
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
logger.info("服务端执行结束");
//关闭主线程组
bossGroup.shutdownGracefully();
//关闭工作线程组
workGroup.shutdownGracefully();
isRunning = false;
}
}
public void stop(){
if(bossGroup != null && !bossGroup.isShutdown()){
bossGroup.shutdownGracefully();
}
if(workGroup != null && !workGroup.isShutdown()){
workGroup.shutdownGracefully();
}
isRunning = false;
logger.info("关闭服务器");
}
}
/**
* 2.创建ServerHandler
* 接收消息等
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private Logger logger = LoggerFactory.getLogger(this.getClass());
private final NettyServerCacheTemplate nettyServerCacheTemplate = SpringUtils.getBean(NettyServerCacheTemplate.class);
/**
* 客户端连接会触发
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("客户端["+ ctx.channel().id().asLongText() +"]建立链接成功");
// 存储到map中
nettyServerCacheTemplate.saveChannel(ctx.channel());
}
/**
* 客户端发消息会触发
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String name = Thread.currentThread().getName();
logger.info("服务器收到消息: {}", msg.toString());
System.out.println("当前线程名称:" + name + ":当前channel的id:" + ctx.channel().id().asLongText());
//ctx.write("当前线程名称:" + name + " 当前channel的id:" + ctx.channel().id().asShortText());
//ctx.flush();
// 接收消息后关闭链接
// ctx.close();
}
/**
* 发生异常触发
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
logger.info("发生异常");
// 关闭链接删除对象
nettyServerCacheTemplate.deleteChannel(ctx.channel());
ctx.close();
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
logger.info("链接已关闭" +ctx.channel().id().asLongText());
// 关闭链接删除对象
nettyServerCacheTemplate.deleteChannel(ctx.channel());
}
}
/**
* 3.ChannelInitializer初始化
*/
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//添加编解码
socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
socketChannel.pipeline().addLast(new NettyServerHandler());
}
}
/**
* 4.保存与客户端链接的channel
*/
@Component
public class NettyServerCacheTemplate {
// 记录当前在线channel数量
public static Map<String,Channel> channelMap = new ConcurrentHashMap<>();
/**
* 存储对应的用户名与Netty链接实例
*/
public void saveChannel(Channel channel){
channelMap.put(channel.id().asLongText(),channel);
}
/**
* 获取存储池中的链接实例
*/
public Object getChannel(String name){
return channelMap.get(name);
}
/**
* 删除存储池实例
*/
public void deleteChannel(Channel channel){
channelMap.remove(channel.id().asLongText());
}
/**
* 获取储存池链接数
*/
public Integer getSize(){
return channelMap.size();
}
/**
* 返回在线用户列表信息
*/
public List<String> getOnline() {
return new ArrayList<>(channelMap.keySet());
}
/**
* 想指定链接发送数据
* @param msg 消息
* @param channel 指定链接
*/
public static String sendMsg(String msg, Channel channel) {
try {
if(channel.isActive()){
channel.write(msg);
channel.flush();
return "success";
}else {
return "不在线";
}
}catch (Exception e){
e.printStackTrace();
return "error";
}
}
}
2.创建客户端
/**
* 1.客户端
**/
public class NettyClient {
private Logger logger = LoggerFactory.getLogger(this.getClass());
public void start() {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap()
.group(group)
//该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输
.option(ChannelOption.TCP_NODELAY, true)
.channel(NioSocketChannel.class)
.handler(new NettyClientInitializer());
try {
ChannelFuture future = bootstrap.connect("127.0.0.1", 8091).sync();
logger.info("客户端成功....");
// 设置attr
future.channel().attr(AttributeKey.valueOf("key")).set("sssss");
//发送消息
//future.channel().writeAndFlush(sendMsg);
// 等待连接被关闭
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
group.shutdownGracefully();
}
}
}
/**
* 2.客户端处理器
**/
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private Logger logger = LoggerFactory.getLogger(this.getClass());
private final NettyClientCacheTemplate nettyClientCacheTemplate = SpringUtils.getBean(NettyClientCacheTemplate.class);
// 出发链接
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("客户端与服务端链接成功");
// 将channel 保存到 map中
nettyClientCacheTemplate.saveChannel(ctx.channel());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Object key = ctx.channel().attr(AttributeKey.valueOf("key")).get();
System.out.println("获取attr中的值:" + key);
logger.info("客户端收到消息: {}", msg.toString());
logger.info("当前channel的编号是" +ctx.channel().id().asLongText());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
nettyClientCacheTemplate.deleteChannel(ctx.channel());
cause.printStackTrace();
ctx.close();
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
nettyClientCacheTemplate.deleteChannel(ctx.channel());
}
}
/**
* 3.客户端初始化器
**/
public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast("decoder", new StringDecoder());
socketChannel.pipeline().addLast("encoder", new StringEncoder());
socketChannel.pipeline().addLast(new NettyClientHandler());
}
}
/**
* 存储客户端与服务端链接的channel
*/
@Component
public class NettyClientCacheTemplate {
// 记录当前在线channel数量
public static Map<String, Channel> channelMap = new ConcurrentHashMap<>();
/**
* 存储对应的用户名与Netty链接实例
*/
public void saveChannel(Channel channel){
channelMap.put(channel.id().asLongText(),channel);
}
/**
* 获取存储池中的链接实例
*/
public Object getChannel(String name){
return channelMap.get(name);
}
/**
* 删除存储池实例
*/
public void deleteChannel(Channel channel){
channelMap.remove(channel.id().asLongText());
}
/**
* 获取储存池链接数
*/
public Integer getSize(){
return channelMap.size();
}
/**
* 返回在线用户列表信息
*/
public List<String> getOnline() {
return new ArrayList<>(channelMap.keySet());
}
/**
* 想指定链接发送数据
*/
public static String sendMsg(String msg, Channel channel) {
try {
if(channel.isActive()){
channel.write(msg);
channel.flush();
return "success";
}else {
return "不在线";
}
}catch (Exception e){
e.printStackTrace();
return "error";
}
}
}
3.创建线程池
/**
* spring boot 线程池
*/
@Configuration
@EnableAsync
public class ThreadPoolTaskConfig {
/**
* 默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,
* 当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
* 当队列满了,就继续创建线程,当线程数量大于等于maxPoolSize后,开始使用拒绝策略拒绝
*/
/**
* 核心线程数(默认线程数)
*/
private static final int corePoolSize = 5;
/**
* 最大线程数
*/
private static final int maxPoolSize = 30;
/**
* 允许线程空闲时间(单位:默认为秒)
*/
private static final int keepAliveTime = 30;
/**
* 缓冲队列大小
*/
private static final int queueCapacity = 10000;
/**
* 线程池名前缀
*/
private static final String threadNamePrefix = "xianchengchi-";
@Bean("taskExecutor") // bean的名称,默认为首字母小写的方法名
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(keepAliveTime);
executor.setThreadNamePrefix(threadNamePrefix);
// 线程池对拒绝任务的处理策略
// CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 初始化
executor.initialize();
return executor;
}
}
4.用线程池启动server
@Component
public class NettyPoolServer {
@Resource
private ThreadPoolTaskConfig poolTaskExecutor;
private static NettyPoolServer single = null;
private NettyServer nettyServer;
@PostConstruct //通过@PostConstruct实现初始化bean之前进行的操作
public void init() {
single = this;
single.poolTaskExecutor = this.poolTaskExecutor;
// 初使化时将已静态化的testService实例化
}
public static NettyPoolServer getSingle(){
return single;
}
public void run(){
poolTaskExecutor.taskExecutor().execute(new Runnable() {
@Override
public void run() {
//启动服务端
System.out.println("NettyPoolServer当前线程池:" + Thread.currentThread().getName());
nettyServer = new NettyServer();
nettyServer.start(new InetSocketAddress("127.0.0.1", 8091));
}
});
}
public void stop(){
if(nettyServer!= null && nettyServer.isRunning){
nettyServer.stop();
}
}
public boolean getIsRunning(){
if(nettyServer == null){return false;}
return nettyServer.isRunning;
}
}
5.spring boot 启动
public static void main(String[] args) {
SpringApplication.run(BootAdminApplication.class, args);
// 启动服务端
NettyPoolServer.getSingle().run();
// 创建20个客户端
for (int i = 0 ; i< 20 ; i++){
new Thread(){
@Override
public void run() {
new NettyClient().start();
}
}.start();
}
}
6.客户端服务端调用测试
/**
* 1。客户端发送信息到服务端
*/
@RestController
@RequestMapping("nettyClient")
public class NettyClientController {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Resource
private NettyClientCacheTemplate nettyClientCacheTemplate;
/**
* 获取在线用户数
*/
@GetMapping("/size")
public ResultModel getSize(){
return ResultModel.RESULT(ErrorMsg.SELECT_SUCCESS,nettyClientCacheTemplate.getSize());
}
/**
* 获取在线用户列表
*/
@GetMapping("/online")
public ResultModel getOnline(){
List<String> returne = nettyClientCacheTemplate.getOnline();
return new ResultModel(ErrorMsg.SELECT_SUCCESS,returne);
}
/**
* API调用向在线用户发送消息
* @param name 用户名
* @param msg 消息
*/
@PostMapping("/send")
public ResultModel send(String name,String msg){
Channel channel = (Channel) nettyClientCacheTemplate.getChannel(name);
String result = nettyClientCacheTemplate.sendMsg(msg,channel);
return ResultModel.RESULT(ErrorMsg.SAVE_SUCCESS,"客户端发送成功" + result);
}
}
/**
* 2.服务端发送到客户端
*/
@RestController
@RequestMapping("nettyServer")
public class NettyServerController {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Resource
private NettyServerCacheTemplate nettyCacheTemplate;
/**
* 获取在线用户数
*/
@GetMapping("/size")
public ResultModel getSize(){
return ResultModel.RESULT(ErrorMsg.SELECT_SUCCESS,nettyCacheTemplate.getSize());
}
/**
* 获取在线用户列表
*/
@GetMapping("/online")
public ResultModel getOnline(){
List<String> returne = nettyCacheTemplate.getOnline();
return new ResultModel(ErrorMsg.SELECT_SUCCESS,returne);
}
/**
* API调用向在线用户发送消息
* @param name 用户名
* @param msg 消息
*/
@PostMapping("/send")
public ResultModel send(String name,String msg){
Channel channel = (Channel) nettyCacheTemplate.getChannel(name);
System.out.println(channel.isActive());
String result = nettyCacheTemplate.sendMsg(msg,channel);
return ResultModel.RESULT(ErrorMsg.SAVE_SUCCESS,"发送成功" + result);
}
}
7.其他
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.36.Final</version>
</dependency>
// 获取nameId
http://localhost:8090/nettyServer/online
// 发送
http://localhost:8090/nettyServer/send
客户端同样操作
版权声明:本文为lxl_any原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。