1 Netty多客户端连接与通信
前言 上篇写到netty的socket简单的例子,这里将上篇的例子改一改,来分析多客户端
连接与通信的情况。下面还是直接上代码。
1.1 服务端
1.1.1 MyServer
package netty.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class MyServer {
public static void main(String[] args) {
EventLoopGroup boosGourp = new NioEventLoopGroup();
EventLoopGroup WorkerGourp = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boosGourp, WorkerGourp).channel(NioServerSocketChannel.class)
.childHandler(new MyServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
boosGourp.shutdownGracefully();
WorkerGourp.shutdownGracefully();
}
}
}
1.1.2 MyServerInitializer
package netty.server;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new MyServerHandler());
}
}
1.1.3 MyServerInitializer
package netty.server;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
public class MyServerHandler extends SimpleChannelInboundHandler<String> {
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
// 当前通道
Channel channel = ctx.channel();
channelGroup.forEach(ch -> {
if (channel != ch) {
// 不是当前的通道
ch.writeAndFlush("[" + channel.remoteAddress() + "]"+":" + msg + "\n");
} else {
// 当前的通道
channel.writeAndFlush("[自己]:" + msg + "\n");
}
});
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[服务器]-" + channel.remoteAddress() + "加入\n");
channelGroup.add(channel);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[服务器]-" + channel.remoteAddress() + "离线\n");
//channelGroup.remove(channel);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + "上线");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + "下线");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
1.2 客户端
1.2.1 MyClient
package netty.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.BufferedReader;
import java.io.InputStreamReader;
public class MyClient {
public static void main(String[] args) {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new MyClientInitializer());
Channel channel = bootstrap.connect("localhost", 8899).sync().channel();
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
// 死循环 读取控制台信息
for (;;){
channel.writeAndFlush(br.readLine()+"\r\n");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
1.2.2 MyClientInitializer
package netty.client;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new MyClientHandler());
}
}
1.2.3 MyClientHandler
package netty.client;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class MyClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg);
}
}
1.3 分析
1.3.1 演示日志
****************服务段***********************
15:50:47: Executing task 'MyServer.main()'...
> Task :compileJava UP-TO-DATE
> Task :processResources NO-SOURCE
> Task :classes UP-TO-DATE
> Task :MyServer.main()
/127.0.0.1:61245上线
/127.0.0.1:61381上线
/127.0.0.1:61419上线
****************客户段(1)***********************
15:50:57: Executing task 'MyClient.main()'...
Starting Gradle Daemon...
Gradle Daemon started in 2 s 276 ms
> Task :compileJava UP-TO-DATE
> Task :processResources NO-SOURCE
> Task :classes UP-TO-DATE
> Task :MyClient.main()
[服务器]-/127.0.0.1:61381加入
[服务器]-/127.0.0.1:61419加入
客户端一你好
[自己]:客户端一你好
[/127.0.0.1:61419]:客户端你�?
[/127.0.0.1:61419]:客户端三你好
****************客户段(2)***********************
15:51:53: Executing task 'MyClient.main()'...
Starting Gradle Daemon...
Gradle Daemon started in 2 s 249 ms
> Task :compileJava UP-TO-DATE
> Task :processResources NO-SOURCE
> Task :classes UP-TO-DATE
> Task :MyClient.main()
[服务器]-/127.0.0.1:61419加入
[/127.0.0.1:61245]:客户端一你好
****************客户段(3)***********************
15:51:57: Executing task 'MyClient.main()'...
Starting Gradle Daemon...
Gradle Daemon started in 2 s 756 ms
> Task :compileJava UP-TO-DATE
> Task :processResources NO-SOURCE
> Task :classes UP-TO-DATE
> Task :MyClient.main()
[/127.0.0.1:61245]:客户端一你好
客户端3你好
[自己]:客户端你�?
客户端三你好
[自己]:客户端三你好
1.3.2 查看tcp连接情况
C:\Users\Administrator>netstat -ano | findstr "8899"
TCP 0.0.0.0:8899 0.0.0.0:0 LISTENING 15832
TCP 127.0.0.1:8899 127.0.0.1:61245 ESTABLISHED 15832
TCP 127.0.0.1:8899 127.0.0.1:61381 ESTABLISHED 15832
TCP 127.0.0.1:8899 127.0.0.1:61419 ESTABLISHED 15832
TCP 127.0.0.1:61245 127.0.0.1:8899 ESTABLISHED 21500
TCP 127.0.0.1:61381 127.0.0.1:8899 ESTABLISHED 16544
TCP 127.0.0.1:61419 127.0.0.1:8899 ESTABLISHED 19940
TCP 192.168.2.140:58899 182.138.81.22:31895 ESTABLISHED 9700
TCP [::]:8899 [::]:0 LISTENING 15832
1.3.23 总结
ChannelGroup就是一个set集合,保存对应的通道,在接受对应的信息,做到广播的效果。
版权声明:本文为qq_42641261原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。