Websocket 技术实践 实现在线聊天系统

  • Post author:
  • Post category:其他



目录


一、认识Websocket


1、什么是websocket


2、websocket优势分析


3、websocket与springboot应用程序交互的图解


4、websocket与http协议对比


二、Springboot实现websocket技术的案例


1、引入依赖


2、注入IOC容器


3、websocket服务类


三、前端websocket连接


1、websocket连接的js


2、在线访问的html


四、代码设计与测试


1、失败重连


2、心跳检测


3、用户在线聊天测试


五、案例源码


一、认识Websocket

1、什么是websocket



WebSocket是一种在单个TCP连接上进行全双工通信的协议。


全双工:可以同时双向传输数据(B->A,A->B可以同时进行)。

推送技术是建立在客户端服务器的一种机制,就是由服务器主动将信息发往客户端的技术。就像广播电台播音。

2、websocket优势分析

过去传统的服务器与客户端的聊天通信一般有两种:

1、轮询请求:通过http的短连接的方式来进行轮询请求,不断轮询请求,对服务器压力很大。

2、socket长连接:socket长连接保证了客户端和服务器的长连接通信,但对服务器资源照成了极大的浪费。

于是诞生了websocket技术。


websocket协议的实现方式:


它是一种长连接,只能通过一次请求来初始化连接,然后所有的请求和响应都是通过这个TCP连接进行通讯,这意味着它是基于事件驱动,异步的消息机制。

3、websocket与springboot应用程序交互的图解

4、websocket与http协议对比

二、Springboot实现websocket技术的案例

1、引入依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

2、注入IOC容器

@Configuration
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
        return new ServerEndpointExporter();
    }
}

3、websocket服务类

package com.dragonwu.config.websocket;

import com.alibaba.fastjson2.JSON;
import org.springframework.stereotype.Component;

import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * @author Dragon Wu
 * @since 2022-10-12 19:11
 **/
//ws://localhost:port/websocket/A
@ServerEndpoint(value = "/websocket/{userId}")
@Component
public class WebSocketEndpoint {

    //与某个客户端的连接会话,需要通过它来给客户端发生数据
    private Session session;

    /*
    连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") String userId){
        //把会话存入到连接池中
        SessionPool.sessions.put(userId,session);
    }

    /*
    关闭连接
     */
    @OnClose
    public void onClose(Session session) throws IOException {
        SessionPool.close(session.getId());
        session.close();
    }

    /*
    收到客户端消息后调用的方法
     */
    @OnMessage
    public void onMessage(String message,Session session){
        // 如果是心跳检测的消息,则返回pong作为心跳回应
        if (message.equalsIgnoreCase("ping")) {
            try {
                Map<String, Object> params = new HashMap<String, Object>();
                params.put("type", "pong");
                session.getBasicRemote().sendText(JSON.toJSONString(params));
                System.out.println("应答客户端的消息:" + JSON.toJSONString(params));
            } catch (Exception e1) {
                e1.printStackTrace();
            }
        }
        else
        {
            SessionPool.sendMessage(message);
        }
    }

}


线程池操作类:

package com.dragonwu.config.websocket;

import javax.websocket.Session;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author Dragon Wu
 * @since 2022-10-12 19:19
 **/
public class SessionPool {

    //sessionId为系统随机生成的
    public static Map<String, Session> sessions = new ConcurrentHashMap<>();

    public static void close(String sessionId) throws IOException {
        for (String userId : SessionPool.sessions.keySet()) {
            Session session = SessionPool.sessions.get(userId);
            if (session.getId().equals(sessionId)) {
                sessions.remove(userId);
                break;
            }
        }
    }

    public static void sendMessage(String sessionId, String message) {
        sessions.get(sessionId).getAsyncRemote().sendText(message);
    }

    public static void sendMessage(String message) {
        for (String sessionId : SessionPool.sessions.keySet()) {
            SessionPool.sessions.get(sessionId).getAsyncRemote().sendText(message);
        }
    }

    public static void sendMessage(Map<String, Object> params) {
        //{"formUserId":userId,"toUserId":toUserId,"msg":msg}
        String toUserId = params.get("toUserId").toString();
        String msg = params.get("msg").toString();
        String fromUserId = params.get("fromUserId").toString();
        msg = "来自" + fromUserId + "的消息:" + msg;
        Session session = sessions.get(toUserId);
        if (session != null) {
            session.getAsyncRemote().sendText(msg);
        }
    }
}

三、前端websocket连接

1、websocket连接的js

var wsObj = null;
var wsUri = null;
var userId = -1;
var lockReconnect = false;
var wsCreateHandler = null;
function createWebSocket() {
    var host = window.location.host; // 带有端口号
    userId = GetQueryString("userId");
    // wsUri = "ws://" + host + "/websocket?userId=" + userId;
    wsUri = "ws://" + host + "/websocket/" + userId;

    try {
        wsObj = new WebSocket(wsUri);
        initWsEventHandle();
    } catch (e) {
        writeToScreen("执行关闭事件,开始重连");
        reconnect();
    }
}

function initWsEventHandle() {
    try {
        wsObj.onopen = function (evt) {
            onWsOpen(evt);
            heartCheck.start();
        };

        wsObj.onmessage = function (evt) {
            onWsMessage(evt);
            heartCheck.start();
        };

        wsObj.onclose = function (evt) {
            writeToScreen("执行关闭事件,开始重连");
            onWsClose(evt);
            reconnect();
        };
        wsObj.onerror = function (evt) {
            writeToScreen("执行error事件,开始重连");
            onWsError(evt);
            reconnect();
        };
    } catch (e) {
        writeToScreen("绑定事件没有成功");
        reconnect();
    }
}

function onWsOpen(evt) {
    writeToScreen("CONNECTED");
}

function onWsClose(evt) {
    writeToScreen("DISCONNECTED");
}

function onWsError(evt) {
    writeToScreen(evt.data);
}

function writeToScreen(message) {
    if(DEBUG_FLAG)
    {
        $("#debuggerInfo").val($("#debuggerInfo").val() + "\n" + message);
    }
}

function GetQueryString(name) {
    var reg = new RegExp("(^|&)" + name + "=([^&]*)(&|$)", "i");
    var r = window.location.search.substr(1).match(reg); //获取url中"?"符后的字符串并正则匹配
    var context = "";
    if (r != null)
        context = r[2];
    reg = null;
    r = null;
    return context == null || context == "" || context == "undefined" ? "" : context;
}

//断后重连
function reconnect() {
    if(lockReconnect) {
        return;
    };
    writeToScreen("1秒后重连");
    lockReconnect = true;
    //没连接上会一直重连,设置延迟避免请求过多
    wsCreateHandler && clearTimeout(wsCreateHandler);
    wsCreateHandler = setTimeout(function () {
        writeToScreen("重连..." + wsUri);
        createWebSocket();
        lockReconnect = false;
        writeToScreen("重连完成");
    }, 1000);
}

//心跳检测
var heartCheck = {
    //15s之内如果没有收到后台的消息,则认为是连接断开了,需要再次连接
    timeout: 15000,
    timeoutObj: null,
    serverTimeoutObj: null,
    //重启
    reset: function(){
        clearTimeout(this.timeoutObj);
        clearTimeout(this.serverTimeoutObj);
        this.start();
    },
    //开启定时器
    start: function(){
        var self = this;
        this.timeoutObj && clearTimeout(this.timeoutObj);
        this.serverTimeoutObj && clearTimeout(this.serverTimeoutObj);
        this.timeoutObj = setTimeout(
            function(){
                writeToScreen("发送ping到后台");
                try
                {
                    wsObj.send("ping");
                }
                catch(ee)
                {
                    writeToScreen("发送ping异常");
                }
                //内嵌计时器
                self.serverTimeoutObj = setTimeout(function(){
                    //如果onclose会执行reconnect,我们执行ws.close()就行了.如果直接执行reconnect 会触发onclose导致重连两次
                    writeToScreen("没有收到后台的数据,关闭连接");
                    //wsObj.close();
                    reconnect();
                }, self.timeout);
            },
            this.timeout)

    },
}

2、在线访问的html

<!DOCTYPE html>
<html>

<head>
    <meta charset="UTF-8">
    <title>Floor View</title>
    <script src="/js/websocket.js"></script>
    <script type="text/javascript" src="https://cdn.bootcss.com/jquery/3.2.1/jquery.min.js"></script>
    <script id="code">
        var DEBUG_FLAG = true;
        $(function()
        {
            //启动websocket
            createWebSocket();

        });

        // 当有消息推送后触发下面事件
        function onWsMessage(evt) {
            var jsonStr = evt.data;
            writeToScreen(jsonStr);
        }

        function writeToScreen(message) {
            if(DEBUG_FLAG)
            {
                $("#debuggerInfo").val($("#debuggerInfo").val() + "\n" + message);
            }
        }

        function sendMessageBySocket()
        {
            var toUserId = $("#userId").val();
            var msg = $("#msg").val();
            var data = {"fromUserId": userId,"toUserId": toUserId,"msg": msg};
            wsObj.send(JSON.stringify(data));
        }
    </script>
</head>

<body style="margin: 0px;padding: 0px;overflow: hidden; ">
<!-- 显示消息-->
<textarea id="debuggerInfo" style="width:100%;height:200px;"></textarea>
<!-- 发送消息-->
<div>用户:<input type="text" id="userId"></input></div>
<div>消息:<input type="text" id="msg"></input></div>
<div><input type="button" value="发送消息" onclick="sendMessageBySocket()"></input></div>
</body>
</html>

四、代码设计与测试

1、失败重连

在客户端和服务端连接失败是进行失败重连,代码如下:

2、心跳检测

服务器:

在客户端和服务器长时间未通信时,客户端会向服务器发一个ping,若服务器没问题则会给客户端返回一个pong的回复,以确保连接正常。

3、用户在线聊天测试

可以看到聊天可以正常进行。

五、案例源码


Websocket案例代码: Websocket案例代码



版权声明:本文为qq_50909707原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。