AIO服务器与客户端进行交互

  • Post author:
  • Post category:其他




AIO概述

AIO是异步IO的缩写,虽然NIO在网络操作中,提供了非阻塞的方法,但是NIO的IO行为还是同步的。 对于NIO来说,我们的业务线程是在IO操作准备好时,得到通知,接着就由这个线程自行进行IO操作, IO操作本身是同步的。 但是对AIO来说,则更加进了一步,它不是在IO准备好时再通知线程,而是在IO操作已经完成后,再给 线程发出通知。因此AIO是不会阻塞的,此时我们的业务逻辑将变成一个回调函数,等待IO操作完成 后,由系统自动触发。

与NIO不同,当进行读写操作时,只须直接调用API的read或write方法即可。这两种方法均为异步的, 对于读操作而言,当有流可读取时,操作系统会将可读的流传入read方法的缓冲区,并通知应用程序; 对于写操作而言,当操作系统将write方法传递的流写入完毕时,操作系统主动通知应用程序。 即可以 理解为,read/write方法都是异步的,完成后会主动调用回调函数。

在JDK1.7中,这部分内容被称作 NIO.2,主要在Java.nio.channels包下增加了下面四个异步通道:

1).AsynchronousSocketChannel

2).AsynchronousServerSocketChannel

3).AsynchronousFileChannel

4).AsynchronousDatagramChannel

在AIO socket编程中,服务端通道是AsynchronousServerSocketChannel,这个类提供了一个open() 静态工厂,一个bind()方法用于绑定服务端IP地址(还有端口号),另外还提供了accept()用于接收用户 连接请求。在客户端使用的通道是AsynchronousSocketChannel,这个通道处理提供open静态工厂方法 外,还提供了read和write方法。

在AIO编程中,发出一个事件(accept read write等)之后要指定事件处理类(回调函数),AIO中的 事件处理类是CompletionHandler<V,A>,这个接口定义了如下两个方法,分别在异步操作成功和失败 时被回调。

void completed(V result, A attachment);

void failed(Throwable exc, A attachment);



AIO 异步非阻塞连接

服务器:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class Server {
    public static void main(String[] args) throws IOException {
        //1.获取一个AsynchronousServerSocketChannel对象
        AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open();
        server.bind(new InetSocketAddress(8888));
        //2.异步、非阻塞的accept()
        System.out.println("服务器端开始accept()....");
        server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
            //连接成功的"异步的回调函数"
            @Override
            public void completed(AsynchronousSocketChannel result, Void attachment) {
                System.out.println("有连接到达...");
            }
            //连接失败的回调函数
            @Override
            public void failed(Throwable exc, Void attachment) { }
        });
        System.out.println("服务器端结束accept()....");
        //写个死循环,模拟服务器端继续运行
        while (true) { }
    }
}

客户端:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousSocketChannel;

public class Client {
    public static void main(String[] args) throws IOException {
        AsynchronousSocketChannel socket = AsynchronousSocketChannel.open();
        socket.connect(new InetSocketAddress("127.0.0.1", 8888));
    }
}



AIO同步非阻塞读写

服务器:

public class Server {
    public static void main(String[] args) throws IOException {
        AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open();
        server.bind(new InetSocketAddress(6666));
        System.out.println("服务器端接收链接...");
        server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
            @Override
            public void completed(AsynchronousSocketChannel socket, Void attachment) {
                //服务器端连上客户端
                //接收请求
                System.out.println("有客户端连接!");
                ByteBuffer buf = ByteBuffer.allocate(1024);
                Future<Integer> read = socket.read(buf);
                try {
                    String msg = new String(buf.array(),0,read.get());
                    System.out.println("服务器收到消息:"+msg);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
                //向客户端反馈
                Future<Integer> write = socket.write(ByteBuffer.wrap("你好客户端,我是服务器,已收到你的请求!".getBytes()));
                try {
                    System.out.println("反馈大小:"+write.get());
                    socket.close();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            @Override
            public void failed(Throwable exc, Void attachment) {
                System.out.println("没有客户端链接!");
            }
        });
        System.out.println("服务器端继续...");
        //保证回调函数完成
        while (true){}
    }
}

客户端:

public class Client {
    public static void main(String[] args) throws IOException {
        AsynchronousSocketChannel socket = AsynchronousSocketChannel.open();
        //链接服务器
        socket.connect(new InetSocketAddress("127.0.0.1", 6666), null, new CompletionHandler<Void, Void>() {
            @Override
            public void completed(Void result, Void attachment) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("客户端链接成功!");
                //向服务器发送请求
                Future<Integer> write = socket.write(ByteBuffer.wrap("你好服务器,我是客户端".getBytes()));
                try {
                    System.out.println("写入大小:"+write.get());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
                //收到服务器反馈
                ByteBuffer buf = ByteBuffer.allocate(1024);
                Future<Integer> read = socket.read(buf);
                try {
                    String msg = new String(buf.array(),0,read.get());
                    System.out.println("客户端收到反馈:"+msg);
                    socket.close();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            @Override
            public void failed(Throwable exc, Void attachment) {
                System.out.println("客户端链接失败!");
            }
        });
        System.out.println("客户端继续...");
        //保证回调函数完成
        while (true){}
    }
}



AIO 异步非阻塞读写

服务器:

public class Server {
    public static void main(String[] args) throws IOException {
        AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open();
        server.bind(new InetSocketAddress(9999));

        System.out.println("开始连接...");
        server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
            @Override
            public void completed(AsynchronousSocketChannel socket, Void attachment) {
                System.out.println("有客户端链接...");
                ByteBuffer buf = ByteBuffer.allocate(1024);
                System.out.println("开始读取...");
                socket.read(buf, null, new CompletionHandler<Integer, Void>() {
                    @Override
                    public void completed(Integer len, Void attachment) {
                        String msg = new String(buf.array(),0,len);
                        System.out.println("服务器收到消息:"+msg);
                    }

                    @Override
                    public void failed(Throwable exc, Void attachment) {

                    }
                });
                System.out.println("结束读取!");

                socket.write(ByteBuffer.wrap("你好客户端,我是服务器,已收到你的消息!".getBytes()), null, new CompletionHandler<Integer, Void>() {
                    @Override
                    public void completed(Integer len, Void attachment) {
                        System.out.println("服务器反馈完毕!");
                    }

                    @Override
                    public void failed(Throwable exc, Void attachment) {

                    }
                });

            }

            @Override
            public void failed(Throwable exc, Void attachment) {

            }
        });
        System.out.println("结束连接!");

        while (true){}
    }
}

客户端:

public class Client {
    public static void main(String[] args) throws IOException, InterruptedException {
        AsynchronousSocketChannel socket = AsynchronousSocketChannel.open();
        socket.connect(new InetSocketAddress("127.0.0.1",9999));

        Thread.sleep(1000*2);

        socket.write(ByteBuffer.wrap("你好服务器,我是客户端,收到请回复!".getBytes()), null, new CompletionHandler<Integer, Void>() {
            @Override
            public void completed(Integer result, Void attachment) {
                System.out.println("客户端发送请求完毕!");
            }

            @Override
            public void failed(Throwable exc, Void attachment) {

            }
        });

        ByteBuffer buf = ByteBuffer.allocate(1024);
        socket.read(buf, null, new CompletionHandler<Integer, Void>() {
            @Override
            public void completed(Integer len, Void attachment) {
                String msg = new String(buf.array(),0,len);
                System.out.println("客户端收到消息:"+msg);
            }

            @Override
            public void failed(Throwable exc, Void attachment) {

            }
        });

        while (true){}

    }
}