本文基于之前的BIO服务器进行改进,将原本的线程池+sokcet的阻塞式通信改为基于netty的异步通信,并附上简单的压力测试实验,基于BIO的服务器参考:
手写一个基于BIO的Java服务器
1.修改启动类:
将线程池+socket换成netty服务器
public void start() {
//初始化servlet
loadServlet();
//初始化server配置
loadServer();
NettyConfig nettyConfig = new NettyConfig(parameterConfig);
// 监听线程
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
//工作线程
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap().group(bossGroup, workGroup)//添加线程组
.channel(NioServerSocketChannel.class)
//添加处理流程
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// HttpResponseEncoder 编码器
socketChannel.pipeline().addLast(new HttpResponseEncoder());
// HttpRequestDecoder 解码器
socketChannel.pipeline().addLast(new HttpRequestDecoder());
// 业务逻辑处理
socketChannel.pipeline().addLast(new RequestProcessor(httpServletMap));
}
})
// 针对主线程的配置 分配线程最大数量 128
.option(ChannelOption.SO_BACKLOG, 128)
// 针对子线程的配置 保持长连接
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = bootstrap.bind(port).syncUninterruptibly();
System.out.println("AIO服务器启动,绑定" + port + "端口");
//异步关闭
future.channel().closeFuture().syncUninterruptibly();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
2.业务处理器
这里需要将netty封装的request信息转为自己的request类。注意netty在处理http请求的时候会拆分成请求头和请求头来分批处理,所以在处理完请求头信息后需要将解析到的request与channel绑定,使得处理请求头的时候可以得到当前的request,具体的封装逻辑可以参考源码。
public class RequestProcessor extends SimpleChannelInboundHandler<HttpObject> {
private Map<String, HttpServlet> httpServletMap;
public RequestProcessor(Map<String, HttpServlet> httpServletMap) {
this.httpServletMap = httpServletMap;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
Channel channel = ctx.channel();
if (msg instanceof io.netty.handler.codec.http.HttpRequest) {
//收到http请求,解析为自己的request
HttpRequest request = new HttpRequest(ctx,msg);
ChannelRequestHandler.setRequest(channel, request);
}
if (msg instanceof HttpContent) {
try {
HttpRequest request = ChannelRequestHandler.getRequest(channel);
//获取请求体数据
request.setRequestBodyMap(msg);
//解析自己的response
HttpResponse response = new HttpResponse(ctx, msg);
//执行请求处理
doProcessor(request, response);
} finally {
ChannelRequestHandler.removeChannel(channel);
}
}
}
private void doProcessor(HttpRequest httpRequest, HttpResponse httpResponse) {
try {
String url = httpRequest.getUrl();
String match = match(url);
if (!"".equals(match)) {
//交给对应的servlet来处理
HttpServlet httpServlet = httpServletMap.get(match);
httpServlet.service(httpRequest, httpResponse);
//执行销毁操作
httpServlet.destroy();
} else {
httpResponse.writeResource(url);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private String match(String url) {
int len = url.length();
for (String pattern : httpServletMap.keySet()) {
int length = pattern.length();
boolean isValid = true;
for (int i = 0; i < length && i < len; i++) {
char ch1 = url.charAt(i);
char ch2 = pattern.charAt(i);
if ('*' == ch2 && url.charAt(i - 1) == '/') return pattern;
if (ch1 != ch2) {
isValid = false;
break;
}
}
if (isValid) return pattern;
}
return "";
}
}
3.响应数据
与之前的OutputStream流不同,netty中使用的是channel来传输数据,且需要使用到netty自己的请求头信息,所以需要更改响应数据的函数。
响应文本数据:
/**
* 采用默认的请求头实时发送响应数据
* @param text 响应数据
* @throws IOException
*/
public void writeAndFlush(String text) {
FullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
HttpResponseStatus.OK,
Unpooled.copiedBuffer(text, CharsetUtil.UTF_8));
//设置响应类型
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
//设置响应长度
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
response.headers().set(HttpHeaderNames.SET_COOKIE, getCookies());
ctx.writeAndFlush(response);
}
public void write404() {
this.writeAndFlush("<h1>404 not found</h1>");
}
public void write500() {
this.writeAndFlush("<h1>500 Exception</h1>");
}
返回静态资源:
/**
* 读取文件并响应给前端
* @param file 文件名
* @param httpResponse 请求响应
* @param contentType 响应类型
* @throws IOException
*/
@Override
public void resolve(File file, HttpResponse httpResponse, String contentType) throws IOException {
RandomAccessFile raf = new RandomAccessFile(file, "r");
long fileLength = raf.length();
//设置默认响应头
ChannelHandlerContext ctx = httpResponse.getCtx();
DefaultHttpResponse defaultHttpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
//设置响应类型
defaultHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, contentType);
//设置响应长度
defaultHttpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH, fileLength);
//写入请求头
ctx.write(defaultHttpResponse);
//写入文件数据
ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise());
//写入结尾符号
ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
//响应结束后关闭通道
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
}
4.压力测试
以1000个线程为测试组,分别得到如下数据:
基于socket:
基于netty:
可以看到使用netty后吞吐量和响应时间都有了提升。
5.项目地址
版权声明:本文为qq_44993268原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。