java – 基于TCP协议的Socket多线程(线程池)通讯案例

  • Post author:
  • Post category:java




客户端

客户端启动类

package com.chat;

import com.alibaba.fastjson.JSON;
import com.chat.client.ChatClient;
import com.chat.message.MessagePacket;
import com.chat.message.MessageType;

import java.util.HashMap;
import java.util.Map;

/**
 * @author: 
 * @Date: 2020/06/09
 * @Description:客户端启动类
 */
public class ClientStater {

    public static void main(String[] args) {
        ChatClient client = ChatClient.getInstance();
        client.startUp();
    }
}

客户端

package com.chat.client;

import com.alibaba.fastjson.JSON;
import com.chat.message.MessagePacket;
import com.chat.message.MessageType;

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;

/**
 * @author: 
 * @Date: 2020/06/09
 * @Description:客户端(单例)
 */
public class ChatClient {

    private final static String SERVER_IP = "127.0.0.1";

    private final static int TCP_PORT = 9966;

    private static Socket socket;

    private static boolean connecting = false;


    private ChatClient(){

    }


    public Socket getSocket() {
        return socket;
    }

    // 启动
    public void startUp() {
        try {
            socket = new Socket(SERVER_IP, TCP_PORT);
            connecting = true;
            System.out.println("-----连接服务器成功--------");
            /**
             * 启动客户端接收消息线程
             */
            new Thread(new MessageReceiver()).start();
            /**
             * 通知服务器我上线了
             */
            Map<String, Object> parameters = new HashMap<>();
            parameters.put("username", "jax");
            String msg = JSON.toJSONString(parameters);
            MessagePacket packet = new MessagePacket(MessageType.LOGIN_REQUEST, msg);
            sendMessage(packet);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //关闭
    public void shutDown() {
        connecting = false;
        try {
            // 关闭socket
            socket.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    //发送消息到服务器
    public void sendMessage(MessagePacket message){
        String packet = JSON.toJSONString(message);
        try {
            BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
            writer.write(packet);
            //必须newLine()
            writer.newLine();
            writer.flush();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static ChatClient getInstance() {
        return SingletonClient.CLIENT;
    }

    private static class SingletonClient {
        private final static ChatClient CLIENT = new ChatClient();
    }
}

客户端接收消息线程

package com.chat.client;

import com.alibaba.fastjson.JSON;
import com.chat.message.MessageAnalysis;
import com.chat.message.MessagePacket;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;

/**
 * @author: 
 * @Date: 2020/06/09
 * @Description:
 */
public class MessageReceiver implements Runnable {

    private Socket socket;
    private MessageAnalysis analysis;

    public MessageReceiver(){
        init();
    }

    // 初始化
    private void init(){
        ChatClient client = ChatClient.getInstance();
        socket = client.getSocket();
        analysis = new MessageAnalysis();
    }

    @Override
    public void run() {
        receive();
    }

    // 接收消息并处理
    private void receive() {
        try {
            BufferedReader reader = null;
            try {
                reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            } catch (IOException e) {
                e.printStackTrace();
            }
            String message;
            try {
                while ((message = reader.readLine()) != null) {
                    JSON json = (JSON) JSON.parse(message);
                    MessagePacket packet = JSON.toJavaObject(json, MessagePacket.class);
                    /**
                     * 将消息转给MessageAnalysis分析处理
                     */
                    analysis.analyze(packet);
                }
            } catch (IOException e1) {
                e1.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


}

客户端消息处理类

package com.chat.message;

/**
 * @author: 
 * @Date: 2020/06/08
 * @Description:
 */
public class MessageAnalysis {

    public void analyze(MessagePacket packet) {
        int type = packet.getType();
        System.out.println("---解析来自服务器的消息----");
        switch (type){
            case MessageType.LOGIN_RESPONSE:
                login(packet);
                break;
            default:
                System.out.println("---未知消息---");
                break;
        }
    }

    // 登陆消息
    private void login(MessagePacket packet) {
        System.out.println(packet.getMessage());
    }
}



服务器

服务器启动类

package com.chat;

import com.chat.server.ChatServer;

/**
 * @author: 
 * @Date: 2020/06/09
 * @Description:服务器启动类
 */
public class ServerStarter {

    public static void main(String[] args) {
        ChatServer server = ChatServer.getInstance();
        server.startUp();
    }
}

服务器

package com.chat.server;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author: 
 * @Date: 2020/06/09
 * @Description:服务器(单例模式)
 */
public class ChatServer {

    private static ServerSocket serverSocket;

    private static ThreadPoolExecutor threadPool;

    private static boolean running = false;

    private static final int TCP_PORT = 9966;

    private ChatServer() {
        startUp();
    }

    /**
     * 启动
     */
    public void startUp() {
        try {
            serverSocket = new ServerSocket(TCP_PORT);
            running = true;
            System.out.println("服务器创建成功:" + TCP_PORT);
            /**
             * 手动创建一个线程池
             */
            threadPool = new ThreadPoolExecutor(10, 15, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
            while (running) {
                Socket clientSocket = serverSocket.accept();
                // 将新的连接提交给线程池
                threadPool.submit(new ServerSocketHandler(clientSocket));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    // 关闭
    public void shutDown() {
        running = false;
        try {
            // 关闭socket
            serverSocket.close();
            //关闭线程池
            threadPool.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    public static ChatServer getInstance() {
        return SingletonServer.SERVER;
    }

    private static class SingletonServer {
        private static final ChatServer SERVER = new ChatServer();
    }


}

客户端连接处理类

package com.chat.server;

import com.alibaba.fastjson.JSON;
import com.chat.message.MessagePacket;
import com.chat.message.MessageAnalysis;

import java.io.*;
import java.net.Socket;

/**
 * @author: 
 * @Date: 2020/06/03
 * @Description:所有的新连接都交给这个类来处理
 */
public class ServerSocketHandler implements Runnable {
    private Socket socket = null;
    private MessageAnalysis analysis = null;


    public ServerSocketHandler(Socket socket) {
        this.socket = socket;
        this.analysis = new MessageAnalysis();
    }

    @Override
    public void run() {
        readMessage();
    }

    /**
     * 读取从客户端传来的消息
     */
    private void readMessage() {
        try {
            BufferedReader reader = null;
            try {
                reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            } catch (IOException e) {
                e.printStackTrace();
            }
            String message;
            try {
                while ((message = reader.readLine()) != null) {
                    JSON json = (JSON) JSON.parse(message);
                    MessagePacket packet = JSON.toJavaObject(json, MessagePacket.class);
                    analysis.analyze(socket, packet);
                }
            } catch (IOException e1) {
                e1.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    /**
     * 发送消息到客户端
     */
    public static void sendMessage(Socket socket, MessagePacket packet) {
        System.out.println("---服务器发出消息---");
        BufferedWriter writer = null;
        try {
            writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
            String msg = JSON.toJSONString(packet);
            writer.write(msg);
            /**
             * writer.newLine()很关键
             */
            writer.newLine();
            writer.flush();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    /**
     * 发送消息到所有客户端
     */
    public static void sendMessageToWorld(MessagePacket packet) {
        System.out.println("---服务器发出消息---");
        BufferedWriter writer = null;
        try {
            for(Client client: ClientList.clients){
                writer = client.getWriter();
                String msg = JSON.toJSONString(packet);
                writer.write(msg);
                /**
                 * writer.newLine()很关键
                 */
                writer.newLine();
                writer.flush();
            }

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

服务器消息处理类

package com.chat.message;

import com.alibaba.fastjson.JSON;
import com.chat.server.Client;
import com.chat.server.ClientList;
import com.chat.server.ServerSocketHandler;

import java.net.Socket;
import java.util.Map;

/**
 * @author: 
 * @Date: 2020/06/08
 * @Description:
 */
public class MessageAnalysis {

    public void analyze(Socket socket, MessagePacket packet) {
        System.out.println("---解析来自客户端的消息----");
        System.out.println(packet);
        int type = packet.getType();
        switch (type) {
            case MessageType.LOGIN_REQUEST:
                login(socket, packet);
                break;
            default:
                System.out.println("---未知消息----");
                break;
        }
    }

    // 有客户端登录
    private void login(Socket socket, MessagePacket packet) {
        String json = packet.getMessage();
        Map parameters = JSON.parseObject(json, Map.class);
        String username = (String) parameters.get("username");
        Client client = new Client(username.trim(), socket);
        ClientList.putToClients(client);
        /**
         * 返回消息
         */
        packet = new MessagePacket(MessageType.LOGIN_RESPONSE, "来了,老弟!");
//        ServerSocketHandler.sendMessage(socket, packet);
        /**
         * 告诉所有人, 有新用户上线
         */
        ServerSocketHandler.sendMessageToWorld(packet);
    }


}

客户端信息存储

package com.chat.server;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.Socket;

/**
 * @author: 
 * @Date: 2020/06/09
 * @Description:
 */
public class Client {

    private String name;

    private String ip;

    private int port;

    private BufferedWriter writer;

    public Client() {
    }

    public Client(String name, Socket socket) {
        this.name = name;
        this.ip = socket.getInetAddress().getHostAddress();
        this.port = socket.getPort();
        try {
            this.writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getIp() {
        return ip;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public BufferedWriter getWriter() {
        return writer;
    }

    public void setWriter(BufferedWriter writer) {
        this.writer = writer;
    }


    @Override
    public String toString() {
        return "Client{" +
                "name='" + name + '\'' +
                ", ip='" + ip + '\'' +
                ", port=" + port +
                ", writer=" + writer +
                '}';
    }
}
package com.chat.server;

import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author: 
 * @Date: 2020/06/09
 * @Description:存储客户端数据
 */
public class ClientList {

    public static List<Client> clients = new ArrayList<>(10);

    // 存储新的客户端信息
    public static void putToClients(Client client) {
        clients.add(client);
    }
}



消息

消息类型标识

package com.chat.message;

/**
 * @author: 
 * @Date: 2020/06/09
 * @Description:
 */
public interface MessageType {

    int LOGIN_REQUEST = 1;
    int LOGIN_RESPONSE = 2;
}

消息体

package com.chat.message;

/**
 * @author: 
 * @Date: 2020/06/08
 * @Description:
 */
public class MessagePacket {
    private int type;
    private String message;

    public MessagePacket(){

    }

    public MessagePacket(int type, String message){
        this.type = type;
        this.message = message;
    }


    public int getType() {
        return type;
    }

    public void setType(int type) {
        this.type = type;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    @Override
    public String toString() {
        return "MessagePacket{" +
                "type=" + type +
                ", message=" + message +
                '}';
    }
}



版权声明:本文为qq_41456723原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。