手写一个基于netty的tomcat服务器:附压测对比

  • Post author:
  • Post category:其他


本文基于之前的BIO服务器进行改进,将原本的线程池+sokcet的阻塞式通信改为基于netty的异步通信,并附上简单的压力测试实验,基于BIO的服务器参考:

手写一个基于BIO的Java服务器

1.修改启动类:

将线程池+socket换成netty服务器

public void start() {
        //初始化servlet
        loadServlet();
        //初始化server配置
        loadServer();
        NettyConfig nettyConfig = new NettyConfig(parameterConfig);
        // 监听线程
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        //工作线程
        NioEventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap().group(bossGroup, workGroup)//添加线程组
                    .channel(NioServerSocketChannel.class)
                    //添加处理流程
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // HttpResponseEncoder 编码器
                            socketChannel.pipeline().addLast(new HttpResponseEncoder());
                            // HttpRequestDecoder 解码器
                            socketChannel.pipeline().addLast(new HttpRequestDecoder());
                            // 业务逻辑处理
                            socketChannel.pipeline().addLast(new RequestProcessor(httpServletMap));
                        }
                    })
                    // 针对主线程的配置 分配线程最大数量 128
                    .option(ChannelOption.SO_BACKLOG, 128)
                    // 针对子线程的配置 保持长连接
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            ChannelFuture future = bootstrap.bind(port).syncUninterruptibly();
            System.out.println("AIO服务器启动,绑定" + port + "端口");
            //异步关闭
            future.channel().closeFuture().syncUninterruptibly();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }

2.业务处理器

这里需要将netty封装的request信息转为自己的request类。注意netty在处理http请求的时候会拆分成请求头和请求头来分批处理,所以在处理完请求头信息后需要将解析到的request与channel绑定,使得处理请求头的时候可以得到当前的request,具体的封装逻辑可以参考源码。

public class RequestProcessor extends SimpleChannelInboundHandler<HttpObject> {

    private Map<String, HttpServlet> httpServletMap;
    public RequestProcessor(Map<String, HttpServlet> httpServletMap) {
        this.httpServletMap = httpServletMap;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {

        Channel channel = ctx.channel();
        if (msg instanceof io.netty.handler.codec.http.HttpRequest) {

            //收到http请求,解析为自己的request
            HttpRequest request = new HttpRequest(ctx,msg);
            ChannelRequestHandler.setRequest(channel, request);
        }
        if (msg instanceof HttpContent) {
            try {
                HttpRequest request = ChannelRequestHandler.getRequest(channel);
                //获取请求体数据
                request.setRequestBodyMap(msg);
                //解析自己的response
                HttpResponse response = new HttpResponse(ctx, msg);
                //执行请求处理
                doProcessor(request, response);
            } finally {
                ChannelRequestHandler.removeChannel(channel);
            }
        }
    }
    
    private void doProcessor(HttpRequest httpRequest, HttpResponse httpResponse) {
        try {
            String url = httpRequest.getUrl();
            String match = match(url);
            if (!"".equals(match)) {
                //交给对应的servlet来处理
                HttpServlet httpServlet = httpServletMap.get(match);
                httpServlet.service(httpRequest, httpResponse);
                //执行销毁操作
                httpServlet.destroy();
            } else {
                httpResponse.writeResource(url);
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
    private String match(String url) {
        int len = url.length();
        for (String pattern : httpServletMap.keySet()) {
            int length = pattern.length();
            boolean isValid = true;
            for (int i = 0; i < length && i < len; i++) {
                char ch1 = url.charAt(i);
                char ch2 = pattern.charAt(i);
                if ('*' == ch2 && url.charAt(i - 1) == '/') return pattern;
                if (ch1 != ch2) {
                    isValid = false;
                    break;
                }
            }
            if (isValid) return pattern;
        }
        return "";
    }

}

3.响应数据

与之前的OutputStream流不同,netty中使用的是channel来传输数据,且需要使用到netty自己的请求头信息,所以需要更改响应数据的函数。

响应文本数据:

/**
     * 采用默认的请求头实时发送响应数据
     * @param text 响应数据
     * @throws IOException
     */
    public void writeAndFlush(String text) {
        FullHttpResponse response = new DefaultFullHttpResponse(
                HttpVersion.HTTP_1_1,
                HttpResponseStatus.OK,
                Unpooled.copiedBuffer(text, CharsetUtil.UTF_8));
        //设置响应类型
        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
        //设置响应长度
        response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
        response.headers().set(HttpHeaderNames.SET_COOKIE, getCookies());

        ctx.writeAndFlush(response);

    }

    public void write404() {
        this.writeAndFlush("<h1>404 not found</h1>");
    }

    public void write500() {
        this.writeAndFlush("<h1>500 Exception</h1>");
    }

返回静态资源:

     /**
     * 读取文件并响应给前端
     * @param file 文件名
     * @param httpResponse 请求响应
     * @param contentType 响应类型
     * @throws IOException
     */
    @Override
    public void resolve(File file, HttpResponse httpResponse, String contentType) throws IOException {
        RandomAccessFile raf = new RandomAccessFile(file, "r");
        long fileLength = raf.length();
        //设置默认响应头
        ChannelHandlerContext ctx = httpResponse.getCtx();
        DefaultHttpResponse defaultHttpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
        //设置响应类型
        defaultHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, contentType);
        //设置响应长度
        defaultHttpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH, fileLength);
        //写入请求头
        ctx.write(defaultHttpResponse);
        //写入文件数据
        ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise());
        //写入结尾符号
        ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
        //响应结束后关闭通道
        lastContentFuture.addListener(ChannelFutureListener.CLOSE);
    }

4.压力测试

以1000个线程为测试组,分别得到如下数据:


基于socket:


基于netty:

可以看到使用netty后吞吐量和响应时间都有了提升。

5.项目地址


手写框架



版权声明:本文为qq_44993268原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。