Netty Chunk,HttpPost客户端附件上传,服务端解析上传附件

  • Post author:
  • Post category:其他




1.服务端返回Chunk

声明,笔者使用的是netty3,netty 3,语法大同小异,主要学习一个思想



1.RestServer.java


@Override
    public void run() {
        ExecutorService boss = null;
        ExecutorService worker = null;
        NioServerSocketChannelFactory nioServerSocketChannelFactory = null;
        ServerBootstrap bootstrap = null;
        try {
            boss = Executors.newCachedThreadPool();
            worker = Executors.newCachedThreadPool();
            nioServerSocketChannelFactory = new NioServerSocketChannelFactory(boss, worker);
            bootstrap = new ServerBootstrap(nioServerSocketChannelFactory);
            bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
                @Override
                public ChannelPipeline getPipeline() throws Exception {
                    return Channels.pipeline(
                            new HttpRequestDecoder(),
                            new HttpChunkAggregator(1024 * 1024*10),
                            //new HttpContentDecompressor(),
                            new HttpResponseEncoder(),
                            //new HttpContentCompressor(),
                            new ServerHandler()
                    );

                }
            });
            bootstrap.setOption("child.tcpNoDelay", true);
            bootstrap.setOption("child.keepAlive", true);
            Channel bind = bootstrap.bind(new InetSocketAddress(9877));
            bind.getCloseFuture().awaitUninterruptibly();
            //启动服务?
        } catch (Exception e) {
            LOGGER.error(e.getMessage());
        } finally {
            LOGGER.info("bootstrap release");
            if (bootstrap != null) {
                bootstrap.releaseExternalResources();
            }
            LOGGER.info("bootstrap release");
            if (nioServerSocketChannelFactory != null) {
                nioServerSocketChannelFactory.releaseExternalResources();
            }
            LOGGER.info("worker release");
            if (worker != null) {
                worker.shutdown();
            }
            LOGGER.info("boss release");
            if (boss != null) {
                boss.shutdown();
            }

        }
    }

笔者一开始就是被

ChunkedWriteHandler

给搞糊涂了,一直以为需要使用ChunkedWriteHandler,直到看到这一段,才知道,这个是对ChunkedFile进行转化的。跟Http没有关系!!!

  ChannelPipeline p = ...;
   p.addLast("streamer", new ChunkedWriteHandler());
   p.addLast("handler", new MyHandler());
   
   Channel ch = ...;
   ch.write(new ChunkedFile(new File("video.mkv"));

真正的Chunk是需要自己组装的



1.2 ServerHandler


public class ServerHandler extends SimpleChannelUpstreamHandler {

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        Object message = e.getMessage();
        System.out.println(message);
        if (message instanceof DefaultHttpRequest) {

            DefaultHttpRequest request = (DefaultHttpRequest) message;
            String uri = request.getUri();
            System.out.println(uri);
            ChannelBuffer content = request.getContent();
            System.out.println("read content ");
            StringBuilder stringBuilder = new StringBuilder();
            byte[] array = content.array();
            stringBuilder.append(new String(array,"utf-8"));
            System.out.println(stringBuilder.toString());
            /*关键代码是下面这一部分*/
            DefaultHttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
            response.setChunked(true);
            response.setHeader(HttpHeaders.Names.TRANSFER_ENCODING, HttpHeaders.Values.CHUNKED);
            response.setHeader(HttpHeaders.Names.CONTENT_TYPE,"text/plain;charset=utf-8");
            Channels.write(ctx.getChannel(), response);

            String str = "这个是Chunk 数据";
//
            HttpChunk chunk = new DefaultHttpChunk(ChannelBuffers.wrappedBuffer(str.toString().getBytes(CharsetUtil.UTF_8)));
            Channels.write(ctx.getChannel(),chunk);
            Channels.write(ctx.getChannel(),chunk);
            Channels.write(ctx.getChannel(),chunk);
            Channels.write(ctx.getChannel(),chunk);
            Channels.write(ctx.getChannel(),chunk);
            Channels.write(ctx.getChannel(),chunk);

            chunk = new DefaultHttpChunk(ChannelBuffers.EMPTY_BUFFER);
            ChannelFuture write = ctx.getChannel().write(chunk);

        }
        if (message instanceof DefaultHttpChunk){
            DefaultHttpChunk chuk=((DefaultHttpChunk) message);
            System.out.println(chuk.isLast());
        }

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        super.exceptionCaught(ctx, e);
    }

}



需要注意的是chunk与gzip 压缩存在着冲突的关系,实际上,gzip压缩是针对每一个的Chunk请求进行压缩后发送到客户端,但是客户端是把整个Chunk整合之后,再使用gzip 进行解压缩,故而会导致乱码的问题。所以实际上是,如果使用Chunk 就别使用gzip 压缩



1.3客户端发送Chunk到服务端

 public static void main(String args[]) {
        ExecutorService boss = null;
        ExecutorService worker = null;
        NioClientSocketChannelFactory nioClientSocketChannelFactory = null;
        ClientBootstrap bootstrap = null;
        boss = Executors.newCachedThreadPool();
        worker = Executors.newCachedThreadPool();
        nioClientSocketChannelFactory = new NioClientSocketChannelFactory(boss, worker);
        bootstrap = new ClientBootstrap(nioClientSocketChannelFactory);
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() throws Exception {
                return Channels.pipeline(
                        new HttpResponseDecoder(),
                        new HttpContentDecompressor(),
//                        new HttpChunkAggregator(65536),
                        new HttpRequestEncoder(),
                        new RestClient()

                );
            }
        });
        ChannelFuture connect = bootstrap.connect(new InetSocketAddress(9877));
        connect.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                System.out.println("write");
                Channel channel = future.getChannel();

                DefaultHttpRequest httpRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/home");
                httpRequest.setChunked(true);

                httpRequest.setHeader(HttpHeaders.Names.ACCEPT_ENCODING,"gzip,deflate,br");
                httpRequest.setHeader(HttpHeaders.Names.CONTENT_TYPE,"text/plain;charset=utf-8");
                httpRequest.setHeader(HttpHeaders.Names.TRANSFER_ENCODING, HttpHeaders.Values.CHUNKED);
                Channels.write(channel,httpRequest);
                String str = "这个是Chunk 数据";
                HttpChunk chunk = new DefaultHttpChunk(ChannelBuffers.wrappedBuffer(str.toString().getBytes(CharsetUtil.UTF_8)));
                Channels.write(channel,chunk);
                Channels.write(channel,chunk);
                Channels.write(channel,chunk);
                chunk = new DefaultHttpChunk(ChannelBuffers.EMPTY_BUFFER);
                Channels.write(channel,chunk);

            }
        });
        ChannelFuture closeFuture = connect.getChannel().getCloseFuture();
        closeFuture.awaitUninterruptibly();
        System.out.println("close ");
        bootstrap.releaseExternalResources();
        nioClientSocketChannelFactory.releaseExternalResources();
        worker.shutdown();
        boss.shutdown();
        //启动服务?


    }

后续发现了Decompressor()其实应该放在Aggregator之前的,因为如果包是一个一个压缩的化,那么就得一个一个解压,而不应该改先汇总后再整体解压,HttpContentCompressor(),移动到前面,主要是因为里面存在设置Http头的部分代码,所以一般建议不要放太后面,避免写入响应的时候,因为缺乏头而报错的情况

@Override
public ChannelPipeline getPipeline() throws Exception {
     return Channels.pipeline(
             new HttpRequestDecoder(),
             new HttpContentDecompressor(),//内容必须是基于Request转码之后解压缩与压缩?
             new HttpContentCompressor(),
             new HttpChunkAggregator(1024 * 1024 * 10),//报文最大10M,Aggreegate
             new HttpResponseEncoder(),
             new ServerHandler()
     );

 }



Netty 客户端实现HTTP Post文件上传

multipart/form-data

附件上传一般是需要借助

HttpPostRequestEncoder

帮我们实现,特别需要注意的是HttpObjectAggregator,这一段是必须要的

  @Test
    public void testClientSendMultipart() throws Exception {
        Bootstrap bootstrap = new Bootstrap();
        EventLoopGroup bossGroup = new NioEventLoopGroup();

        bootstrap.group(bossGroup)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast("httpe", new HttpClientCodec());
                        pipeline.addLast("chunk", new ChunkedWriteHandler());
                        //特别注意,需要HttpObjectAggregator
                        pipeline.addLast("aggregate",new HttpObjectAggregator(6553600));
                        pipeline.addLast(new SimpleChannelInboundHandler<HttpResponse>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext ctx, HttpResponse msg) throws Exception {
                                if (msg instanceof DefaultFullHttpResponse) {
                                    DefaultFullHttpResponse response = (DefaultFullHttpResponse) msg;
                                    System.out.println(response);
                                }
                            }

                        });
                    }
                });
        ChannelFuture connect = bootstrap.connect(new InetSocketAddress(9877)).sync();
        connect.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Channel channel = future.channel();
                DefaultFullHttpRequest req = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/oldService.do");
                HttpPostRequestEncoder encoder = new HttpPostRequestEncoder(req, true);
                encoder.addBodyAttribute("username", "blueboz");
                MemoryFileUpload body = new MemoryFileUpload("data", "test.txt", "text/plain", null, null, 36);
                body.setContent(Unpooled.copiedBuffer("this is the content of test.txt file", StandardCharsets.UTF_8));
                encoder.addBodyHttpData(body);
                //发送finalizeRequest
                channel.writeAndFlush(encoder.finalizeRequest());
                //直接将encoder直接发送
                if (encoder.isChunked()) {
                    channel.writeAndFlush(encoder).sync();
                }
            }
        });
        connect.channel().closeFuture().sync();
        bossGroup.shutdownGracefully();


    }

以下是服务端接收到的报文,可以看到报文已经按照form表单的格式打印了出来,而且服务端也接收到了报文

POST /oldService.do HTTP/1.1
content-type: multipart/form-data; boundary=6a953d931dd100c1
Content-Length: 349
--6a953d931dd100c1
content-disposition: form-data; name="username"
content-length: 7
content-type: text/plain; charset=UTF-8

blueboz
--6a953d931dd100c1
content-disposition: form-data; name="data"; filename="test.txt"
content-length: 36
content-type: text/plain; charset=UTF-8

this is the content of test.txt file
--6a953d931dd100c1--

DefaultHttpRequest(chunked: false)
POST /oldService.do HTTP/1.1
content-type: multipart/form-data; boundary=4f778495a75b702b
Content-Length: 349
--4f778495a75b702b
content-disposition: form-data; name="username"
content-length: 7
content-type: text/plain; charset=UTF-8

blueboz
--4f778495a75b702b
content-disposition: form-data; name="data"; filename="test.txt"
content-length: 36
content-type: text/plain; charset=UTF-8

this is the content of test.txt file
--4f778495a75b702b--

关于添加附件的代码

  HttpPostRequestEncoder encoder = new HttpPostRequestEncoder(req, true);
  //添加内存文件
  MemoryFileUpload body =
          new MemoryFileUpload("data", "test.txt", "text/plain", null, null, 4);
  body.setContent(Unpooled.copiedBuffer("test", StandardCharsets.UTF_8));
  encoder.addBodyHttpData(body);

另外,也支持下面这种方式进行报文的发送

 @Override
 public void operationComplete(ChannelFuture future) throws Exception {
      Channel channel = future.channel();
      DefaultFullHttpRequest req = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/oldService.do");
      HttpPostRequestEncoder encoder = new HttpPostRequestEncoder(req, true);
      encoder.addBodyAttribute("username", "blueboz");
      MemoryFileUpload body = new MemoryFileUpload("data", "test.txt", "text/plain", null, null, 36);
      body.setContent(Unpooled.copiedBuffer("this is the content of test.txt file", StandardCharsets.UTF_8));
      encoder.addBodyHttpData(body);
      channel.writeAndFlush(encoder.finalizeRequest());

      while (!encoder.isEndOfInput()) {
          ByteBuffer content = encoder.readChunk(PooledByteBufAllocator.DEFAULT).content().nioBuffer();
          DefaultHttpContent httpCnt = new DefaultHttpContent(Unpooled.wrappedBuffer(content));
          channel.writeAndFlush(httpCnt);
      }
      channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
  }



Netty Http服务端接收并处理上传附件

@Test
public void testMultipartServer() throws InterruptedException {
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    serverBootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast("dec", new HttpServerCodec());
                    socketChannel.pipeline().addLast("aggregator", new HttpObjectAggregator(655330000));
                    socketChannel.pipeline().addLast("buss", new SimpleChannelInboundHandler<FullHttpRequest>() {
                        @Override
                        protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest request) throws Exception {

                            HttpDataFactory factory = new DefaultHttpDataFactory(false);
                            HttpPostRequestDecoder requestDecoder = new HttpPostRequestDecoder(factory, request);
                            List<InterfaceHttpData> datas = requestDecoder.getBodyHttpDatas();
                            for (InterfaceHttpData data : datas) {
                                System.out.println(data.getName());

                                if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.FileUpload) {
                                    FileUpload fup = (FileUpload) data;
                                    System.out.println(fup.content().toString(StandardCharsets.UTF_8));
                                } else if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
                                    Attribute att = (Attribute) data;
                                    System.out.println(att.getValue());
                                }
                            }

                            DefaultFullHttpResponse resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
                            resp.content().writeBytes("Hello world this is netty response string".getBytes(StandardCharsets.UTF_8));
                            resp.headers().set(HttpHeaderNames.CONTENT_LENGTH, resp.content().readableBytes());
                            channelHandlerContext.writeAndFlush(resp);

                        }

                        @Override
                        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                            DefaultFullHttpResponse resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
                            resp.content().writeBytes("Hello world this is netty response string".getBytes(StandardCharsets.UTF_8));
                            resp.headers().set(HttpHeaderNames.CONTENT_LENGTH, resp.content().readableBytes());
                            ctx.channel().writeAndFlush(resp);
                            ctx.channel().close();

                        }
                    });
                }
            });

    ChannelFuture bind = serverBootstrap.bind(11344).sync();
    bind.channel().closeFuture().sync();
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
}



版权声明:本文为blueboz原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。