JAVA AIO 服务器与客户端实现示例(代码2)

  • Post author:
  • Post category:java


AIO用于文件处理还是比较快乐的,但用AIO来写网络消息处理服务器端与客户端是比较麻烦的事情,当然这只是我个人意见,主要是有几点原因:

一是AIO需要操作系统支持,还好Windows与Linux(模拟)都支持;

二是AIO同时使用递归调用和异步调用容易把程序员搞晕,代码容易出错;

三是CompletionHandler会使用单独的线程跑,容易出现多线程问题,频繁线程上下文切换比较消耗资源;

四是异步写要创建队列来缓存需要写入的数据,否则肯定会遇到WritePendingException。


相对来说,NIO比较清楚直白,容易控制。


另外,笔者使用多线程模拟多个客户场景失败,代码在run方法中调用AsynchronousSocketChannel.connect()没返回,没连接上服务器,不知道为何,请大侠指教,最后只好使用多个进程模拟多个客户端,写个类似下面代码的bat文件,同时运行多个。

1
2
3
4
5

java -classpath .\ com.stevex.app.aio.Client 1

java -classpath .\ com.stevex.app.aio.Client 1

pause


服务器代码:



  1. package

    aio;

  2. /**

  3. * Created by Administrator on 2016/10/28.

  4. */

  5. import

    java.io.IOException;

  6. import

    java.net.InetSocketAddress;

  7. import

    java.net.StandardSocketOptions;

  8. import

    java.nio.ByteBuffer;

  9. import

    java.nio.CharBuffer;

  10. import

    java.nio.channels.AsynchronousChannelGroup;

  11. import

    java.nio.channels.AsynchronousServerSocketChannel;

  12. import

    java.nio.channels.AsynchronousSocketChannel;

  13. import

    java.nio.channels.CompletionHandler;

  14. //import java.nio.channels.WritePendingException;

  15. import

    java.nio.charset.CharacterCodingException;

  16. import

    java.nio.charset.Charset;

  17. import

    java.util.LinkedList;

  18. import

    java.util.Queue;

  19. import

    java.util.concurrent.Executors;

  20. public


    class

    XiaoNa {

  21. private


    final

    AsynchronousServerSocketChannel server;

  22. //写队列,因为当前一个异步写调用还没完成之前,调用异步写会抛WritePendingException

  23. //所以需要一个写队列来缓存要写入的数据,这是AIO比较坑的地方

  24. private


    final

    Queue<ByteBuffer> queue =

    new

    LinkedList<ByteBuffer>();

  25. private


    boolean

    writing =

    false

    ;

  26. public


    static


    void

    main(String[] args)

    throws

    IOException{
  27. XiaoNa xiaona =

    new

    XiaoNa();
  28. xiaona.listen();
  29. }

  30. public

    XiaoNa()

    throws

    IOException{

  31. //设置线程数为CPU核数
  32. AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(Runtime.getRuntime().availableProcessors(), Executors.defaultThreadFactory());
  33. server = AsynchronousServerSocketChannel.open(channelGroup);

  34. //重用端口
  35. server.setOption(StandardSocketOptions.SO_REUSEADDR,

    true

    );

  36. //绑定端口并设置连接请求队列长度
  37. server.bind(

    new

    InetSocketAddress(

    8383

    ),

    80

    );
  38. }

  39. public


    void

    listen() {
  40. System.out.println(Thread.currentThread().getName() +

    “: run in listen method”

    );

  41. //开始接受第一个连接请求
  42. server.accept(

    null

    ,

    new

    CompletionHandler<AsynchronousSocketChannel, Object>(){

  43. @Override

  44. public


    void

    completed(AsynchronousSocketChannel channel,
  45. Object attachment) {
  46. System.out.println(Thread.currentThread().getName() +

    “: run in accept completed method”

    );

  47. //先安排处理下一个连接请求,异步非阻塞调用,所以不用担心挂住了

  48. //这里传入this是个地雷,小心多线程
  49. server.accept(

    null

    ,

    this

    );

  50. //处理连接读写
  51. handle(channel);
  52. }

  53. private


    void

    handle(

    final

    AsynchronousSocketChannel channel) {
  54. System.out.println(Thread.currentThread().getName() +

    “: run in handle method”

    );

  55. //每个AsynchronousSocketChannel,分配一个缓冲区

  56. final

    ByteBuffer readBuffer = ByteBuffer.allocateDirect(

    1024

    );
  57. readBuffer.clear();
  58. channel.read(readBuffer,

    null

    ,

    new

    CompletionHandler<Integer, Object>(){

  59. @Override

  60. public


    void

    completed(Integer count, Object attachment) {
  61. System.out.println(Thread.currentThread().getName() +

    “: run in read completed method”

    );

  62. if

    (count >

    0

    ){

  63. try

    {
  64. readBuffer.flip();

  65. //CharBuffer charBuffer = CharsetHelper.decode(readBuffer);
  66. CharBuffer charBuffer = Charset.forName(

    “UTF-8”

    ).newDecoder().decode(readBuffer);
  67. String question = charBuffer.toString();
  68. String answer = Helper.getAnswer(question);

  69. /*//写入也是异步调用,也可以使用传入CompletionHandler对象的方式来处理写入结果

  70. //channel.write(CharsetHelper.encode(CharBuffer.wrap(answer)));

  71. try{


  72. channel.write(Charset.forName(“UTF-8”).newEncoder().encode(CharBuffer.wrap(answer)));

  73. }

  74. //Unchecked exception thrown when an attempt is made to write to an asynchronous socket channel and a previous write has not completed.

  75. //看来操作系统也不可靠

  76. catch(WritePendingException wpe){


  77. //休息一秒再重试,如果失败就不管了

  78. Helper.sleep(1);

  79. channel.write(Charset.forName(“UTF-8”).newEncoder().encode(CharBuffer.wrap(answer)));

  80. }*/
  81. writeStringMessage(channel, answer);
  82. readBuffer.clear();
  83. }

  84. catch

    (IOException e){
  85. e.printStackTrace();
  86. }
  87. }

  88. else

    {

  89. try

    {

  90. //如果客户端关闭socket,那么服务器也需要关闭,否则浪费CPU
  91. channel.close();
  92. }

    catch

    (IOException e) {
  93. e.printStackTrace();
  94. }
  95. }

  96. //异步调用OS处理下个读取请求

  97. //这里传入this是个地雷,小心多线程
  98. channel.read(readBuffer,

    null

    ,

    this

    );
  99. }

  100. /**

  101. * 服务器读失败处理

  102. * @param exc

  103. * @param attachment

  104. */

  105. @Override

  106. public


    void

    failed(Throwable exc, Object attachment) {
  107. System.out.println(

    “server read failed: ”

    + exc);

  108. if

    (channel !=

    null

    ){

  109. try

    {
  110. channel.close();
  111. }

    catch

    (IOException e) {
  112. e.printStackTrace();
  113. }
  114. }
  115. }
  116. });
  117. }

  118. /**

  119. * 服务器接受连接失败处理

  120. * @param exc

  121. * @param attachment

  122. */

  123. @Override

  124. public


    void

    failed(Throwable exc, Object attachment) {
  125. System.out.println(

    “server accept failed: ”

    + exc);
  126. }
  127. });
  128. }

  129. /**

  130. * Enqueues a write of the buffer to the channel.

  131. * The call is asynchronous so the buffer is not safe to modify after

  132. * passing the buffer here.

  133. *

  134. * @param buffer the buffer to send to the channel

  135. */

  136. private


    void

    writeMessage(

    final

    AsynchronousSocketChannel channel,

    final

    ByteBuffer buffer) {

  137. boolean

    threadShouldWrite =

    false

    ;

  138. synchronized

    (queue) {
  139. queue.add(buffer);

  140. // Currently no thread writing, make this thread dispatch a write

  141. if

    (!writing) {
  142. writing =

    true

    ;
  143. threadShouldWrite =

    true

    ;
  144. }
  145. }

  146. if

    (threadShouldWrite) {
  147. writeFromQueue(channel);
  148. }
  149. }

  150. private


    void

    writeFromQueue(

    final

    AsynchronousSocketChannel channel) {
  151. ByteBuffer buffer;

  152. synchronized

    (queue) {
  153. buffer = queue.poll();

  154. if

    (buffer ==

    null

    ) {
  155. writing =

    false

    ;
  156. }
  157. }

  158. // No new data in buffer to write

  159. if

    (writing) {
  160. writeBuffer(channel, buffer);
  161. }
  162. }

  163. private


    void

    writeBuffer(

    final

    AsynchronousSocketChannel channel, ByteBuffer buffer) {
  164. channel.write(buffer, buffer,

    new

    CompletionHandler<Integer, ByteBuffer>() {

  165. @Override

  166. public


    void

    completed(Integer result, ByteBuffer buffer) {

  167. if

    (buffer.hasRemaining()) {
  168. channel.write(buffer, buffer,

    this

    );
  169. }

    else

    {

  170. // Go back and check if there is new data to write
  171. writeFromQueue(channel);
  172. }
  173. }

  174. @Override

  175. public


    void

    failed(Throwable exc, ByteBuffer attachment) {
  176. System.out.println(

    “server write failed: ”

    + exc);
  177. }
  178. });
  179. }

  180. /**

  181. * Sends a message

  182. * @param msg

  183. * @throws CharacterCodingException

  184. */

  185. private


    void

    writeStringMessage(

    final

    AsynchronousSocketChannel channel, String msg)

    throws

    CharacterCodingException {
  186. writeMessage(channel, Charset.forName(

    “UTF-8”

    ).newEncoder().encode(CharBuffer.wrap(msg)));
  187. }
  188. }

package aio;

/**
 * Created by Administrator on 2016/10/28.
 */
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
//import java.nio.channels.WritePendingException;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Executors;

public class XiaoNa {
    private final AsynchronousServerSocketChannel server;
    //写队列,因为当前一个异步写调用还没完成之前,调用异步写会抛WritePendingException
    //所以需要一个写队列来缓存要写入的数据,这是AIO比较坑的地方
    private final Queue<ByteBuffer> queue = new LinkedList<ByteBuffer>();
    private boolean writing = false;

    public static void main(String[] args) throws IOException{
        XiaoNa xiaona = new XiaoNa();
        xiaona.listen();
    }

    public XiaoNa() throws IOException{
        //设置线程数为CPU核数
        AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(Runtime.getRuntime().availableProcessors(), Executors.defaultThreadFactory());
        server = AsynchronousServerSocketChannel.open(channelGroup);
        //重用端口
        server.setOption(StandardSocketOptions.SO_REUSEADDR, true);
        //绑定端口并设置连接请求队列长度
        server.bind(new InetSocketAddress(8383), 80);
    }

    public void listen() {
        System.out.println(Thread.currentThread().getName() + ": run in listen method" );
        //开始接受第一个连接请求
        server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>(){
            @Override
            public void completed(AsynchronousSocketChannel channel,
                                  Object attachment) {
                System.out.println(Thread.currentThread().getName() + ": run in accept completed method" );

                //先安排处理下一个连接请求,异步非阻塞调用,所以不用担心挂住了
                //这里传入this是个地雷,小心多线程
                server.accept(null, this);
                //处理连接读写
                handle(channel);
            }

            private void handle(final AsynchronousSocketChannel channel) {
                System.out.println(Thread.currentThread().getName() + ": run in handle method" );
                //每个AsynchronousSocketChannel,分配一个缓冲区
                final ByteBuffer readBuffer = ByteBuffer.allocateDirect(1024);
                readBuffer.clear();
                channel.read(readBuffer, null, new CompletionHandler<Integer, Object>(){

                    @Override
                    public void completed(Integer count, Object attachment) {
                        System.out.println(Thread.currentThread().getName() + ": run in read completed method" );

                        if(count > 0){
                            try{
                                readBuffer.flip();
                                //CharBuffer charBuffer = CharsetHelper.decode(readBuffer);
                                CharBuffer charBuffer = Charset.forName("UTF-8").newDecoder().decode(readBuffer);
                                String question = charBuffer.toString();
                                String answer = Helper.getAnswer(question);
                                /*//写入也是异步调用,也可以使用传入CompletionHandler对象的方式来处理写入结果
                                //channel.write(CharsetHelper.encode(CharBuffer.wrap(answer)));
                                try{
                                    channel.write(Charset.forName("UTF-8").newEncoder().encode(CharBuffer.wrap(answer)));
                                }
                                //Unchecked exception thrown when an attempt is made to write to an asynchronous socket channel and a previous write has not completed.
                                //看来操作系统也不可靠
                                catch(WritePendingException wpe){
                                    //休息一秒再重试,如果失败就不管了
                                    Helper.sleep(1);
                                    channel.write(Charset.forName("UTF-8").newEncoder().encode(CharBuffer.wrap(answer)));
                                }*/
                                writeStringMessage(channel, answer);

                                readBuffer.clear();
                            }
                            catch(IOException e){
                                e.printStackTrace();
                            }
                        }
                        else{
                            try {
                                //如果客户端关闭socket,那么服务器也需要关闭,否则浪费CPU
                                channel.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }

                        //异步调用OS处理下个读取请求
                        //这里传入this是个地雷,小心多线程
                        channel.read(readBuffer, null, this);
                    }

                    /**
                     * 服务器读失败处理
                     * @param exc
                     * @param attachment
                     */
                    @Override
                    public void failed(Throwable exc, Object attachment) {
                        System.out.println("server read failed: " + exc);
                        if(channel != null){
                            try {
                                channel.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    }

                });
            }

            /**
             * 服务器接受连接失败处理
             * @param exc
             * @param attachment
             */
            @Override
            public void failed(Throwable exc, Object attachment) {
                System.out.println("server accept failed: " + exc);
            }

        });
    }

    /**
     * Enqueues a write of the buffer to the channel.
     * The call is asynchronous so the buffer is not safe to modify after
     * passing the buffer here.
     *
     * @param buffer the buffer to send to the channel
     */
    private void writeMessage(final AsynchronousSocketChannel channel, final ByteBuffer buffer) {
        boolean threadShouldWrite = false;

        synchronized(queue) {
            queue.add(buffer);
            // Currently no thread writing, make this thread dispatch a write
            if (!writing) {
                writing = true;
                threadShouldWrite = true;
            }
        }

        if (threadShouldWrite) {
            writeFromQueue(channel);
        }
    }

    private void writeFromQueue(final AsynchronousSocketChannel channel) {
        ByteBuffer buffer;

        synchronized (queue) {
            buffer = queue.poll();
            if (buffer == null) {
                writing = false;
            }
        }

        // No new data in buffer to write
        if (writing) {
            writeBuffer(channel, buffer);
        }
    }

    private void writeBuffer(final AsynchronousSocketChannel channel, ByteBuffer buffer) {
        channel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer result, ByteBuffer buffer) {
                if (buffer.hasRemaining()) {
                    channel.write(buffer, buffer, this);
                } else {
                    // Go back and check if there is new data to write
                    writeFromQueue(channel);
                }
            }

            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                System.out.println("server write failed: " + exc);
            }
        });
    }

    /**
     * Sends a message
     * @param msg
     * @throws CharacterCodingException
     */
    private void writeStringMessage(final AsynchronousSocketChannel channel, String msg) throws CharacterCodingException {
        writeMessage(channel, Charset.forName("UTF-8").newEncoder().encode(CharBuffer.wrap(msg)));
    }
}



客户端代码:





  1. package

    aio;

  2. /**

  3. * Created by Administrator on 2016/10/28.

  4. */

  5. import

    java.io.IOException;

  6. import

    java.net.InetSocketAddress;

  7. import

    java.net.StandardSocketOptions;

  8. import

    java.nio.ByteBuffer;

  9. import

    java.nio.CharBuffer;

  10. import

    java.nio.channels.AsynchronousChannelGroup;

  11. import

    java.nio.channels.AsynchronousSocketChannel;

  12. import

    java.nio.channels.CompletionHandler;

  13. import

    java.nio.charset.CharacterCodingException;

  14. import

    java.nio.charset.Charset;

  15. import

    java.util.LinkedList;

  16. import

    java.util.Queue;

  17. import

    java.util.concurrent.CountDownLatch;

  18. import

    java.util.concurrent.Executors;

  19. public


    class

    Client

    implements

    Runnable{

  20. private

    AsynchronousSocketChannel channel;

  21. private

    Helper helper;

  22. private

    CountDownLatch latch;

  23. private


    final

    Queue<ByteBuffer> queue =

    new

    LinkedList<ByteBuffer>();

  24. private


    boolean

    writing =

    false

    ;

  25. public

    Client(AsynchronousChannelGroup channelGroup, CountDownLatch latch)

    throws

    IOException, InterruptedException{

  26. this

    .latch = latch;
  27. helper =

    new

    Helper();
  28. initChannel(channelGroup);
  29. }

  30. private


    void

    initChannel(AsynchronousChannelGroup channelGroup)

    throws

    IOException {

  31. //在默认channel group下创建一个socket channel
  32. channel = AsynchronousSocketChannel.open(channelGroup);

  33. //设置Socket选项
  34. channel.setOption(StandardSocketOptions.TCP_NODELAY,

    true

    );
  35. channel.setOption(StandardSocketOptions.SO_KEEPALIVE,

    true

    );
  36. channel.setOption(StandardSocketOptions.SO_REUSEADDR,

    true

    );
  37. }

  38. public


    static


    void

    main(String[] args)

    throws

    IOException, InterruptedException {

  39. int

    sleepTime = Integer.parseInt(args[

    0

    ]);
  40. Helper.sleep(sleepTime);
  41. AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(Runtime.getRuntime().availableProcessors(), Executors.defaultThreadFactory());

  42. //只能跑一个线程,第二个线程connect会挂住,暂时不明原因

  43. final


    int

    THREAD_NUM =

    1

    ;
  44. CountDownLatch latch =

    new

    CountDownLatch(THREAD_NUM);

  45. //创建个多线程模拟多个客户端,模拟失败,无效

  46. //只能通过命令行同时运行多个进程来模拟多个客户端

  47. for

    (

    int

    i=

    0

    ; i<THREAD_NUM; i++){
  48. Client c =

    new

    Client(channelGroup, latch);
  49. Thread t =

    new

    Thread(c);
  50. System.out.println(t.getName() +

    “—start”

    );
  51. t.start();

  52. //让主线程等待子线程处理再退出, 这对于异步调用无效

  53. //t.join();
  54. }
  55. latch.await();

  56. if

    (channelGroup !=

    null

    ){
  57. channelGroup.shutdown();
  58. }
  59. }

  60. @Override

  61. public


    void

    run() {
  62. System.out.println(Thread.currentThread().getName() +

    “—run”

    );

  63. //连接服务器
  64. channel.connect(

    new

    InetSocketAddress(

    “localhost”

    ,

    8383

    ),

    null

    ,

    new

    CompletionHandler<Void, Void>(){

  65. final

    ByteBuffer readBuffer = ByteBuffer.allocateDirect(

    1024

    );

  66. @Override

  67. public


    void

    completed(Void result, Void attachment) {

  68. //连接成功后, 异步调用OS向服务器写一条消息

  69. try

    {

  70. //channel.write(CharsetHelper.encode(CharBuffer.wrap(helper.getWord())));
  71. writeStringMessage(helper.getWord());
  72. }

    catch

    (CharacterCodingException e) {
  73. e.printStackTrace();
  74. }

  75. //helper.sleep();//等待写异步调用完成
  76. readBuffer.clear();

  77. //异步调用OS读取服务器发送的消息
  78. channel.read(readBuffer,

    null

    ,

    new

    CompletionHandler<Integer, Object>(){

  79. @Override

  80. public


    void

    completed(Integer result, Object attachment) {

  81. try

    {

  82. //异步读取完成后处理

  83. if

    (result >

    0

    ){
  84. readBuffer.flip();
  85. CharBuffer charBuffer = CharsetHelper.decode(readBuffer);
  86. String answer = charBuffer.toString();
  87. System.out.println(Thread.currentThread().getName() +

    “—”

    + answer);
  88. readBuffer.clear();
  89. String word = helper.getWord();

  90. if

    (word !=

    null

    ){

  91. //异步写

  92. //channel.write(CharsetHelper.encode(CharBuffer.wrap(word)));
  93. writeStringMessage(word);

  94. //helper.sleep();//等待异步操作
  95. channel.read(readBuffer,

    null

    ,

    this

    );
  96. }

  97. else

    {

  98. //不想发消息了,主动关闭channel
  99. shutdown();
  100. }
  101. }

  102. else

    {

  103. //对方已经关闭channel,自己被动关闭,避免空循环
  104. shutdown();
  105. }
  106. }

  107. catch

    (Exception e){
  108. e.printStackTrace();
  109. }
  110. }

  111. /**

  112. * 读取失败处理

  113. * @param exc

  114. * @param attachment

  115. */

  116. @Override

  117. public


    void

    failed(Throwable exc, Object attachment) {
  118. System.out.println(

    “client read failed: ”

    + exc);

  119. try

    {
  120. shutdown();
  121. }

    catch

    (IOException e) {
  122. e.printStackTrace();
  123. }
  124. }
  125. });
  126. }

  127. /**

  128. * 连接失败处理

  129. * @param exc

  130. * @param attachment

  131. */

  132. @Override

  133. public


    void

    failed(Throwable exc, Void attachment) {
  134. System.out.println(

    “client connect to server failed: ”

    + exc);

  135. try

    {
  136. shutdown();
  137. }

    catch

    (IOException e) {
  138. e.printStackTrace();
  139. }
  140. }
  141. });
  142. }

  143. private


    void

    shutdown()

    throws

    IOException {

  144. if

    (channel !=

    null

    ){
  145. channel.close();
  146. }
  147. latch.countDown();
  148. }

  149. /**

  150. * Enqueues a write of the buffer to the channel.

  151. * The call is asynchronous so the buffer is not safe to modify after

  152. * passing the buffer here.

  153. *

  154. * @param buffer the buffer to send to the channel

  155. */

  156. private


    void

    writeMessage(

    final

    ByteBuffer buffer) {

  157. boolean

    threadShouldWrite =

    false

    ;

  158. synchronized

    (queue) {
  159. queue.add(buffer);

  160. // Currently no thread writing, make this thread dispatch a write

  161. if

    (!writing) {
  162. writing =

    true

    ;
  163. threadShouldWrite =

    true

    ;
  164. }
  165. }

  166. if

    (threadShouldWrite) {
  167. writeFromQueue();
  168. }
  169. }

  170. private


    void

    writeFromQueue() {
  171. ByteBuffer buffer;

  172. synchronized

    (queue) {
  173. buffer = queue.poll();

  174. if

    (buffer ==

    null

    ) {
  175. writing =

    false

    ;
  176. }
  177. }

  178. // No new data in buffer to write

  179. if

    (writing) {
  180. writeBuffer(buffer);
  181. }
  182. }

  183. private


    void

    writeBuffer(ByteBuffer buffer) {
  184. channel.write(buffer, buffer,

    new

    CompletionHandler<Integer, ByteBuffer>() {

  185. @Override

  186. public


    void

    completed(Integer result, ByteBuffer buffer) {

  187. if

    (buffer.hasRemaining()) {
  188. channel.write(buffer, buffer,

    this

    );
  189. }

    else

    {

  190. // Go back and check if there is new data to write
  191. writeFromQueue();
  192. }
  193. }

  194. @Override

  195. public


    void

    failed(Throwable exc, ByteBuffer attachment) {
  196. }
  197. });
  198. }

  199. /**

  200. * Sends a message

  201. * @param msg

  202. * @throws CharacterCodingException

  203. */

  204. public


    void

    writeStringMessage(String msg)

    throws

    CharacterCodingException {
  205. writeMessage(Charset.forName(

    “UTF-8”

    ).newEncoder().encode(CharBuffer.wrap(msg)));
  206. }
  207. }

package aio;

/**
 * Created by Administrator on 2016/10/28.
 */
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;

public class Client implements Runnable{
    private AsynchronousSocketChannel channel;
    private Helper helper;
    private CountDownLatch latch;
    private final Queue<ByteBuffer> queue = new LinkedList<ByteBuffer>();
    private boolean writing = false;

    public Client(AsynchronousChannelGroup channelGroup, CountDownLatch latch) throws IOException, InterruptedException{
        this.latch = latch;
        helper = new Helper();
        initChannel(channelGroup);
    }

    private void initChannel(AsynchronousChannelGroup channelGroup) throws IOException {
        //在默认channel group下创建一个socket channel
        channel = AsynchronousSocketChannel.open(channelGroup);
        //设置Socket选项
        channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
        channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
        channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        int sleepTime = Integer.parseInt(args[0]);
        Helper.sleep(sleepTime);

        AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(Runtime.getRuntime().availableProcessors(), Executors.defaultThreadFactory());
        //只能跑一个线程,第二个线程connect会挂住,暂时不明原因
        final int THREAD_NUM = 1;
        CountDownLatch latch = new CountDownLatch(THREAD_NUM);

        //创建个多线程模拟多个客户端,模拟失败,无效
        //只能通过命令行同时运行多个进程来模拟多个客户端
        for(int i=0; i<THREAD_NUM; i++){
            Client c = new Client(channelGroup, latch);
            Thread t = new Thread(c);
            System.out.println(t.getName() + "---start");
            t.start();
            //让主线程等待子线程处理再退出, 这对于异步调用无效
            //t.join();
        }

        latch.await();

        if(channelGroup !=null){
            channelGroup.shutdown();
        }
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "---run");

        //连接服务器
        channel.connect(new InetSocketAddress("localhost", 8383), null, new CompletionHandler<Void, Void>(){
            final ByteBuffer readBuffer = ByteBuffer.allocateDirect(1024);

            @Override
            public void completed(Void result, Void attachment) {
                //连接成功后, 异步调用OS向服务器写一条消息
                try {
                    //channel.write(CharsetHelper.encode(CharBuffer.wrap(helper.getWord())));
                    writeStringMessage(helper.getWord());
                } catch (CharacterCodingException e) {
                    e.printStackTrace();
                }

                //helper.sleep();//等待写异步调用完成
                readBuffer.clear();
                //异步调用OS读取服务器发送的消息
                channel.read(readBuffer, null, new CompletionHandler<Integer, Object>(){

                    @Override
                    public void completed(Integer result, Object attachment) {
                        try{
                            //异步读取完成后处理
                            if(result > 0){
                                readBuffer.flip();
                                CharBuffer charBuffer = CharsetHelper.decode(readBuffer);
                                String answer = charBuffer.toString();
                                System.out.println(Thread.currentThread().getName() + "---" + answer);
                                readBuffer.clear();

                                String word = helper.getWord();
                                if(word != null){
                                    //异步写
                                    //channel.write(CharsetHelper.encode(CharBuffer.wrap(word)));
                                    writeStringMessage(word);
                                    //helper.sleep();//等待异步操作
                                    channel.read(readBuffer, null, this);
                                }
                                else{
                                    //不想发消息了,主动关闭channel
                                    shutdown();
                                }
                            }
                            else{
                                //对方已经关闭channel,自己被动关闭,避免空循环
                                shutdown();
                            }
                        }
                        catch(Exception e){
                            e.printStackTrace();
                        }
                    }

                    /**
                     * 读取失败处理
                     * @param exc
                     * @param attachment
                     */
                    @Override
                    public void failed(Throwable exc, Object attachment) {
                        System.out.println("client read failed: " + exc);
                        try {
                            shutdown();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }

                });
            }

            /**
             * 连接失败处理
             * @param exc
             * @param attachment
             */
            @Override
            public void failed(Throwable exc, Void attachment) {
                System.out.println("client connect to server failed: " + exc);

                try {
                    shutdown();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    private void shutdown() throws IOException {
        if(channel != null){
            channel.close();
        }

        latch.countDown();
    }

    /**
     * Enqueues a write of the buffer to the channel.
     * The call is asynchronous so the buffer is not safe to modify after
     * passing the buffer here.
     *
     * @param buffer the buffer to send to the channel
     */
    private void writeMessage(final ByteBuffer buffer) {
        boolean threadShouldWrite = false;

        synchronized(queue) {
            queue.add(buffer);
            // Currently no thread writing, make this thread dispatch a write
            if (!writing) {
                writing = true;
                threadShouldWrite = true;
            }
        }

        if (threadShouldWrite) {
            writeFromQueue();
        }
    }

    private void writeFromQueue() {
        ByteBuffer buffer;

        synchronized (queue) {
            buffer = queue.poll();
            if (buffer == null) {
                writing = false;
            }
        }

        // No new data in buffer to write
        if (writing) {
            writeBuffer(buffer);
        }
    }

    private void writeBuffer(ByteBuffer buffer) {
        channel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer result, ByteBuffer buffer) {
                if (buffer.hasRemaining()) {
                    channel.write(buffer, buffer, this);
                } else {
                    // Go back and check if there is new data to write
                    writeFromQueue();
                }
            }

            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
            }
        });
    }

    /**
     * Sends a message
     * @param msg
     * @throws CharacterCodingException
     */
    public void writeStringMessage(String msg) throws CharacterCodingException {
        writeMessage(Charset.forName("UTF-8").newEncoder().encode(CharBuffer.wrap(msg)));
    }
}





工具类:





  1. package

    aio;

  2. /**

  3. * Created by Administrator on 2016/10/28.

  4. */

  5. import

    java.util.Random;

  6. import

    java.util.concurrent.ArrayBlockingQueue;

  7. import

    java.util.concurrent.BlockingQueue;

  8. import

    java.util.concurrent.TimeUnit;

  9. public


    class

    Helper {

  10. private


    static

    BlockingQueue<String> words;

  11. private


    static

    Random random;

  12. public

    Helper()

    throws

    InterruptedException{
  13. words =

    new

    ArrayBlockingQueue<String>(

    5

    );
  14. words.put(

    “hi”

    );
  15. words.put(

    “who”

    );
  16. words.put(

    “what”

    );
  17. words.put(

    “where”

    );
  18. words.put(

    “bye”

    );
  19. random =

    new

    Random();
  20. }

  21. public

    String getWord(){

  22. return

    words.poll();
  23. }

  24. public


    void

    sleep() {

  25. try

    {
  26. TimeUnit.SECONDS.sleep(random.nextInt(

    3

    ));
  27. }

    catch

    (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. }

  31. public


    static


    void

    sleep(

    long

    l) {

  32. try

    {
  33. TimeUnit.SECONDS.sleep(l);
  34. }

    catch

    (InterruptedException e) {
  35. e.printStackTrace();
  36. }
  37. }

  38. public


    static

    String getAnswer(String question){
  39. String answer =

    null

    ;

  40. switch

    (question){

  41. case


    “who”

    :
  42. answer =

    “我是小娜\n”

    ;

  43. break

    ;

  44. case


    “what”

    :
  45. answer =

    “我是来帮你解闷的\n”

    ;

  46. break

    ;

  47. case


    “where”

    :
  48. answer =

    “我来自外太空\n”

    ;

  49. break

    ;

  50. case


    “hi”

    :
  51. answer =

    “hello\n”

    ;

  52. break

    ;

  53. case


    “bye”

    :
  54. answer =

    “88\n”

    ;

  55. break

    ;

  56. default

    :
  57. answer =

    “请输入 who, 或者what, 或者where”

    ;
  58. }

  59. return

    answer;
  60. }
  61. }

package aio;

/**
 * Created by Administrator on 2016/10/28.
 */
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class Helper {
    private static BlockingQueue<String> words;
    private static Random random;

    public Helper() throws InterruptedException{
        words = new ArrayBlockingQueue<String>(5);
        words.put("hi");
        words.put("who");
        words.put("what");
        words.put("where");
        words.put("bye");

        random = new Random();
    }

    public String getWord(){
        return words.poll();
    }

    public void sleep() {
        try {
            TimeUnit.SECONDS.sleep(random.nextInt(3));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void sleep(long l) {
        try {
            TimeUnit.SECONDS.sleep(l);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static String getAnswer(String question){
        String answer = null;

        switch(question){
            case "who":
                answer = "我是小娜\n";
                break;
            case "what":
                answer = "我是来帮你解闷的\n";
                break;
            case "where":
                answer = "我来自外太空\n";
                break;
            case "hi":
                answer = "hello\n";
                break;
            case "bye":
                answer = "88\n";
                break;
            default:
                answer = "请输入 who, 或者what, 或者where";
        }

        return answer;
    }
}


  1. package

    aio;

  2. /**

  3. * Created by Administrator on 2016/10/28.

  4. */

  5. import

    java.nio.ByteBuffer;

  6. import

    java.nio.CharBuffer;

  7. import

    java.nio.charset.CharacterCodingException;

  8. import

    java.nio.charset.Charset;

  9. import

    java.nio.charset.CharsetDecoder;

  10. import

    java.nio.charset.CharsetEncoder;

  11. public


    class

    CharsetHelper {

  12. private


    static


    final

    String UTF_8 =

    “UTF-8”

    ;

  13. private


    static

    CharsetEncoder encoder = Charset.forName(UTF_8).newEncoder();

  14. private


    static

    CharsetDecoder decoder = Charset.forName(UTF_8).newDecoder();

  15. public


    static

    ByteBuffer encode(CharBuffer in)

    throws

    CharacterCodingException{

  16. return

    encoder.encode(in);
  17. }

  18. public


    static

    CharBuffer decode(ByteBuffer in)

    throws

    CharacterCodingException{

  19. return

    decoder.decode(in);
  20. }
  21. }

package aio;

/**
 * Created by Administrator on 2016/10/28.
 */

import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;

public class CharsetHelper {
    private static final String UTF_8 = "UTF-8";
    private static CharsetEncoder encoder = Charset.forName(UTF_8).newEncoder();
    private static CharsetDecoder decoder = Charset.forName(UTF_8).newDecoder();

    public static ByteBuffer encode(CharBuffer in) throws CharacterCodingException{
        return encoder.encode(in);
    }

    public static CharBuffer decode(ByteBuffer in) throws CharacterCodingException{
        return decoder.decode(in);
    }
}