AIO概述
AIO是异步IO的缩写,虽然NIO在网络操作中,提供了非阻塞的方法,但是NIO的IO行为还是同步的。 对于NIO来说,我们的业务线程是在IO操作准备好时,得到通知,接着就由这个线程自行进行IO操作, IO操作本身是同步的。 但是对AIO来说,则更加进了一步,它不是在IO准备好时再通知线程,而是在IO操作已经完成后,再给 线程发出通知。因此AIO是不会阻塞的,此时我们的业务逻辑将变成一个回调函数,等待IO操作完成 后,由系统自动触发。
与NIO不同,当进行读写操作时,只须直接调用API的read或write方法即可。这两种方法均为异步的, 对于读操作而言,当有流可读取时,操作系统会将可读的流传入read方法的缓冲区,并通知应用程序; 对于写操作而言,当操作系统将write方法传递的流写入完毕时,操作系统主动通知应用程序。 即可以 理解为,read/write方法都是异步的,完成后会主动调用回调函数。
在JDK1.7中,这部分内容被称作 NIO.2,主要在Java.nio.channels包下增加了下面四个异步通道:
1).AsynchronousSocketChannel
2).AsynchronousServerSocketChannel
3).AsynchronousFileChannel
4).AsynchronousDatagramChannel
在AIO socket编程中,服务端通道是AsynchronousServerSocketChannel,这个类提供了一个open() 静态工厂,一个bind()方法用于绑定服务端IP地址(还有端口号),另外还提供了accept()用于接收用户 连接请求。在客户端使用的通道是AsynchronousSocketChannel,这个通道处理提供open静态工厂方法 外,还提供了read和write方法。
在AIO编程中,发出一个事件(accept read write等)之后要指定事件处理类(回调函数),AIO中的 事件处理类是CompletionHandler<V,A>,这个接口定义了如下两个方法,分别在异步操作成功和失败 时被回调。
void completed(V result, A attachment);
void failed(Throwable exc, A attachment);
AIO 异步非阻塞连接
服务器:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
public class Server {
public static void main(String[] args) throws IOException {
//1.获取一个AsynchronousServerSocketChannel对象
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open();
server.bind(new InetSocketAddress(8888));
//2.异步、非阻塞的accept()
System.out.println("服务器端开始accept()....");
server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
//连接成功的"异步的回调函数"
@Override
public void completed(AsynchronousSocketChannel result, Void attachment) {
System.out.println("有连接到达...");
}
//连接失败的回调函数
@Override
public void failed(Throwable exc, Void attachment) { }
});
System.out.println("服务器端结束accept()....");
//写个死循环,模拟服务器端继续运行
while (true) { }
}
}
客户端:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousSocketChannel;
public class Client {
public static void main(String[] args) throws IOException {
AsynchronousSocketChannel socket = AsynchronousSocketChannel.open();
socket.connect(new InetSocketAddress("127.0.0.1", 8888));
}
}
AIO同步非阻塞读写
服务器:
public class Server {
public static void main(String[] args) throws IOException {
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open();
server.bind(new InetSocketAddress(6666));
System.out.println("服务器端接收链接...");
server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
@Override
public void completed(AsynchronousSocketChannel socket, Void attachment) {
//服务器端连上客户端
//接收请求
System.out.println("有客户端连接!");
ByteBuffer buf = ByteBuffer.allocate(1024);
Future<Integer> read = socket.read(buf);
try {
String msg = new String(buf.array(),0,read.get());
System.out.println("服务器收到消息:"+msg);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
//向客户端反馈
Future<Integer> write = socket.write(ByteBuffer.wrap("你好客户端,我是服务器,已收到你的请求!".getBytes()));
try {
System.out.println("反馈大小:"+write.get());
socket.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Void attachment) {
System.out.println("没有客户端链接!");
}
});
System.out.println("服务器端继续...");
//保证回调函数完成
while (true){}
}
}
客户端:
public class Client {
public static void main(String[] args) throws IOException {
AsynchronousSocketChannel socket = AsynchronousSocketChannel.open();
//链接服务器
socket.connect(new InetSocketAddress("127.0.0.1", 6666), null, new CompletionHandler<Void, Void>() {
@Override
public void completed(Void result, Void attachment) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("客户端链接成功!");
//向服务器发送请求
Future<Integer> write = socket.write(ByteBuffer.wrap("你好服务器,我是客户端".getBytes()));
try {
System.out.println("写入大小:"+write.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
//收到服务器反馈
ByteBuffer buf = ByteBuffer.allocate(1024);
Future<Integer> read = socket.read(buf);
try {
String msg = new String(buf.array(),0,read.get());
System.out.println("客户端收到反馈:"+msg);
socket.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Void attachment) {
System.out.println("客户端链接失败!");
}
});
System.out.println("客户端继续...");
//保证回调函数完成
while (true){}
}
}
AIO 异步非阻塞读写
服务器:
public class Server {
public static void main(String[] args) throws IOException {
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open();
server.bind(new InetSocketAddress(9999));
System.out.println("开始连接...");
server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
@Override
public void completed(AsynchronousSocketChannel socket, Void attachment) {
System.out.println("有客户端链接...");
ByteBuffer buf = ByteBuffer.allocate(1024);
System.out.println("开始读取...");
socket.read(buf, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer len, Void attachment) {
String msg = new String(buf.array(),0,len);
System.out.println("服务器收到消息:"+msg);
}
@Override
public void failed(Throwable exc, Void attachment) {
}
});
System.out.println("结束读取!");
socket.write(ByteBuffer.wrap("你好客户端,我是服务器,已收到你的消息!".getBytes()), null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer len, Void attachment) {
System.out.println("服务器反馈完毕!");
}
@Override
public void failed(Throwable exc, Void attachment) {
}
});
}
@Override
public void failed(Throwable exc, Void attachment) {
}
});
System.out.println("结束连接!");
while (true){}
}
}
客户端:
public class Client {
public static void main(String[] args) throws IOException, InterruptedException {
AsynchronousSocketChannel socket = AsynchronousSocketChannel.open();
socket.connect(new InetSocketAddress("127.0.0.1",9999));
Thread.sleep(1000*2);
socket.write(ByteBuffer.wrap("你好服务器,我是客户端,收到请回复!".getBytes()), null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
System.out.println("客户端发送请求完毕!");
}
@Override
public void failed(Throwable exc, Void attachment) {
}
});
ByteBuffer buf = ByteBuffer.allocate(1024);
socket.read(buf, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer len, Void attachment) {
String msg = new String(buf.array(),0,len);
System.out.println("客户端收到消息:"+msg);
}
@Override
public void failed(Throwable exc, Void attachment) {
}
});
while (true){}
}
}