Java使用netty

  • Post author:
  • Post category:java


netty,io与nio的区别,先上代码在分析:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.0.Final</version>
</dependency>

依赖引入

io与nio虽然目标不一样,但操作还是一样的:

接收请求服务类代码:

package com.kaige123.daomu.bootjsp.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class server {

    private int port = 8081;

    public void run() throws Exception {
        //bossGroup 用来接收进来的连接
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //workerGroup 用来处理已经被接收的连接
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //启动 NIO 服务的辅助启动类
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new serverHandle());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            // 服务器绑定端口
            ChannelFuture f = b.bind(port).sync();
            // 等待服务器 socket 关闭 。
            f.channel().closeFuture().sync();
        } finally {
            // 出现异常终止
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
            System.out.println("连接关闭等异常");
        }
    }

    public static void main(String[] args) throws Exception {
        new server().run();
    }

}

服务器绑定端口,绑定收到请求后的处理者,有2个类注意:

bossGroup 与workerGroup ,一个是收到请求池,一个是已连接池。当客户端断开后,应该从这里面剔除

处理请求服务类代码:

package com.kaige123.daomu.bootjsp.netty;

import com.sun.xml.internal.bind.v2.runtime.unmarshaller.XsiNilLoader;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.io.*;
import java.util.Arrays;

public class serverHandle extends ChannelInboundHandlerAdapter {

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf result = (ByteBuf) msg;
        byte[] result1 = new byte[result.readableBytes()];
        // msg中存储的是ByteBuf类型的数据,把数据读取到byte[]中
        result.readBytes(result1);
        String resultStr = new String(result1);
        // 释放资源,这行很关键
        result.release();

        FileOutputStream fileOutputStream = new FileOutputStream("D:\\Java_apiCopy.rar", true);
        Savedisk(fileOutputStream, result1);

        sendMsg(ctx);
    }

    // 写入本地磁盘
    public void Savedisk(FileOutputStream fileOutputStream, byte[] bytes) throws IOException {
        fileOutputStream.write(bytes, 0, bytes.length);
        fileOutputStream.close();
    }

    // 向客户端发送消息
    public void sendMsg(ChannelHandlerContext ctx) {
        String response = "OK BYTES Client!";
        // 发送的数据必须转换成ByteBuf字节数据数组,进行传输
        ByteBuf encoded = ctx.alloc().buffer(response.length());
        encoded.writeBytes(response.getBytes());
        ctx.write(encoded);
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 当出现异常就关闭连接
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

}

msg字节数据,readableBytes一口气读取到本次共传输的数据

客户端请求代码:

package com.kaige123.daomu.bootjsp.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class client {

    public void connect(String host, int port) throws Exception {
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new clientHandle());
                }
            });

            // 连接发服务器地址
            ChannelFuture f = b.connect(host, port).sync();
            // 关闭连接
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            System.out.println("连接关闭等异常");
        }
    }

    public static void main(String[] args) throws Exception {
        client client = new client();
        client.connect("127.0.0.1", 8081);
    }
}

发送请求,并注册当连接后,自己的处理者

客户端处理代码:

package com.kaige123.daomu.bootjsp.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.io.FileInputStream;

public class clientHandle extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("客户端收到处理");
        ByteBuf result = (ByteBuf) msg;
        byte[] result1 = new byte[result.readableBytes()];
        result.readBytes(result1);
        System.out.println("收到数据:" + result1 + "收到长度: " + result1.length);
        result.release();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 当出现异常就关闭连接
        cause.printStackTrace();
        ctx.close();
    }


    // 连接成功后,向server发送消息
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        FileInputStream fileInputStream = new FileInputStream("D:\\Java_api.rar");
        byte[] bytes = new byte[1024 * 8];
        int len = -1;
        while ((len = fileInputStream.read(bytes)) != -1) {
            ByteBuf encoded = ctx.alloc().buffer(len);
            encoded.writeBytes(bytes, 0, len);
            ctx.write(encoded);
            ctx.flush();
        }

    }
}

io与nio

io的分析:一个客户端一个socket连接,获得in与out流,在建立的相互相互通信。

1:当有n人来时,一人一个服务就吃不住了。(这点不重要)

2:当双方连接上时,数据并不是要一直交互的,资源就一直耗着,等对方的信息,这段等待时间是无意义的

nio猜测,io有切实干过n个例题,nio仅是接触了netty,下面是猜测:

1:netty是一个池,客户来了养池中,客户请求抓服务去接待。pass,如果真是这样,那么我还不敢用。为什么?因为这样就不能保证请求者与服务器是同一个,这样将出现数据混乱的问题

1例子求证:

客户端分2次发出数据,第一次是给客户端贴上标签。第二次请求是看客户端再次请求时,netty是取出贴在客户端身上的标签,还是空标签。如果是取出标签,说明处理者与客户端始终是一个,如果是空,说明对象不是同一个

申请全局变量:String filename;

长度不为3,是第一次请求去贴上标签,并将标签打印出来。

长度为3,是第二次请求打印全局变量。如果是一对一,那么这个标签就会保持。如果抓新的来服务,标签就是null,查看打印:

开启2个客户端去请求,第一次贴标签,第二次打印标签。打印的标签与贴上的标签一致。说明什么?说明一开始去服务与过一段时间去服务的都是同一人同一对象。这样则没有问题,保证数据的一致性。否则我不敢用

区别在于:

阻塞与非阻塞,当在read或write时,在干其他的时间可以的。但会出现问题。如上,第一次请求出来,修改10秒。第二次就收不到了,客户端发完就死掉了。服务器休息完也没拿到第二次数据。也就是,这个过程,就连读连写,过程不能在做其他事情

nio非阻塞,不需要阻塞来接收保证消息是在持续传输。即使这个过程中,我去干其他时间,消息回头还是会传递回来,正常收到。

阻塞非阻塞:

阻塞io:我一直等你来信息,保证消息持续传输

非阻塞你:我不用守着你来消息,即使这会我中途去干其他时间浪费了时间,但最后我返回调用处后这个消息还会被处理得到。

这就是阻塞与非阻塞,其区别在于,传输过程中其形式不同,从netty切入到nio



版权声明:本文为Java_Dmz原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。