vue+axios+springboot长连接和定时任务

  • Post author:
  • Post category:vue




一、长连接



1. 前端代码

<template>
    <div>
        <button @click="initSocket">建立websocket连接</button>
        <el-input v-model="types">

        </el-input>
        <button @click="sendmessage()">点击发送消息了</button>
    </div>
</template>
<script>
export default {
    data() {
        return {
            webSocket: null,
            //自己的IP地址+端口号+端口号
            url: '110.128.131.1:9003/ws',
            types: '给后台参数'
        }
    },
    methods: {
        // 建立连接
        initSocket() {
            // 有参数的情况下:
            let url = `ws://${this.url}/${参数}`
            // 没有参数的情况:接口
            // let url1 = 'ws://localhost:9998'
            //建立连接
            this.webSocket = new WebSocket(url)
            //连接成功调用
            this.webSocket.onopen = this.webSocketOnOpen
            //关闭连接调用
            this.webSocket.onclose = this.webSocketOnClose
            //发生错误是调用
            this.webSocket.onerror = this.webSocketOnError
            //发送消息时调用
            this.webSocket.onmessage = this.webSocketOnMessage


        },
       // 主动给后端发送消息
        sendmessage() {
            this.webSocket.send(this.types);
        },
        // 建立连接成功后的状态
        webSocketOnOpen() {
            console.log('websocket连接成功');
        },
        // 获取到后台消息的事件,操作数据的代码在onmessage中书写
        webSocketOnMessage(res) {
            // res就是后台实时传过来的数据
            console.log(res.data);
            //给后台发送数据
        },
        // 关闭连接
        webSocketOnClose() {
            this.webSocket.close()
            console.log('websocket连接已关闭');
        },
        //连接失败的事件
        webSocketOnError(res) {
            console.log('websocket连接失败');
            // 打印失败的数据
            console.log(res);
        }
    },
    created() {
        // 页面打开就建立连接,根据业务需要
        this.initSocket()
    },
    destroyed() {
        // 页面销毁关闭连接
        this.webSocket.close()
    },
}
</script>



2. 后端代码



1. 配置类



依赖
   <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
            <version>2.7.0</version>
        </dependency>
package com.hua.dentistryschedule.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
@EnableWebMvc
@EnableWebSocket
public class WebSocketconfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter () {
        return new ServerEndpointExporter();
    }

}



2. service类



无参
package com.hua.dentistryschedule.config;

import lombok.extern.log4j.Log4j2;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;


@Log4j2
@Component
//主要是将目前的类定义成一个websocket服务器端, 注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端
@ServerEndpoint(value = "/ws")//无参
public class WebSocketServer {

    //与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;

    private static final AtomicInteger OnlineCount = new AtomicInteger(0);

    // concurrent包的线程安全Set,用来存放每个客户端对应的Session对象。
    private static CopyOnWriteArraySet<Session> SessionSet = new CopyOnWriteArraySet<Session>();

    private static int num = 0;

    private Timer timer = new Timer();

    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session) {
        SessionSet.add(session);
        this.session = session;
        int cnt = OnlineCount.incrementAndGet(); // 在线数加1
        log.info("有连接加入,当前连接数为:{}", cnt);
        SendMessage(this.session, num+"");
        heartBeat(session);
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        SessionSet.remove(this.session);
        int cnt = OnlineCount.decrementAndGet();
        log.info("有连接关闭,当前连接数为:{}", cnt);
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("收到客户端消息:"+message);
        if("ping".equalsIgnoreCase(message)){
            SendMessage(session, "pong" );
            timer.cancel();
            heartBeat(session);
            return;
        }
        if("+1".equalsIgnoreCase(message)){
            this.num ++;
        }
        if("-1".equalsIgnoreCase(message)){
            this.num --;
        }
        BroadCastInfo(""+num );
    }

    /**
     * 出现错误
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("发生错误:{},Session ID: {}", error.getMessage(), session.getId());
        error.printStackTrace();
    }

    /**
     * 心跳
     * @param session
     */
    private void heartBeat(Session session) {
        timer = new Timer();
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                try {
                    session.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }, 40000);
    }

    /**
     * 发送消息,实践表明,每次浏览器刷新,session会发生变化。
     *
     * @param session
     * @param message
     */
    public static void SendMessage(Session session, String message) {
        try {
            if (session.isOpen()) {
                session.getBasicRemote().sendText(message);
            }
        } catch (IOException e) {
            log.error("发送消息出错:{}", e.getMessage());
            e.printStackTrace();
        }
    }

    /**
     * 群发消息
     *
     * @param message
     * @throws IOException
     */
    public static void BroadCastInfo(String message) {
        for (Session session : SessionSet) {
            SendMessage(session, message);
        }
    }

    /**
     * 指定Session发送消息
     *
     * @param sessionId
     * @param message
     * @throws IOException
     */
    public static void SendMessage(String message, String sessionId) {
        Session session = null;
        for (Session s : SessionSet) {
            if (s.getId().equals(sessionId)) {
                session = s;
                break;
            }
        }
        if (session != null) {
            SendMessage(session, message);
        } else {
            log.warn("没有找到你指定ID的会话:{}", sessionId);
        }
    }

}



有参

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;

import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
/**
 * websocket的处理类。
 * 作用相当于HTTP请求
 * 中的controller
 */
@Component
@Slf4j
@ServerEndpoint("/api/pushMessage/{userId}")
public class WebSocketServer {

    /**静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。*/
    private static int onlineCount = 0;
    /**concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。*/
    private static ConcurrentHashMap<String,WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
    /**与某个客户端的连接会话,需要通过它来给客户端发送数据*/
    private Session session;
    /**接收userId*/
    private String userId = "";

    /**
     * 连接建立成
     * 功调用的方法
     */
    @OnOpen
    public void onOpen(Session session,@PathParam("userId") String userId) {
        this.session = session;
        this.userId=userId;
        if(webSocketMap.containsKey(userId)){
            webSocketMap.remove(userId);
            //加入set中
            webSocketMap.put(userId,this);
        }else{
            //加入set中
            webSocketMap.put(userId,this);
            //在线数加1
            addOnlineCount();
        }
        log.info("用户连接:"+userId+",当前在线人数为:" + getOnlineCount());
        sendMessage("连接成功");
    }

    /**
     * 连接关闭
     * 调用的方法
     */
    @OnClose
    public void onClose() {
        if(webSocketMap.containsKey(userId)){
            webSocketMap.remove(userId);
            //从set中删除
            subOnlineCount();
        }
        log.info("用户退出:"+userId+",当前在线人数为:" + getOnlineCount());
    }

    /**
     * 收到客户端消
     * 息后调用的方法
     * @param message
     * 客户端发送过来的消息
     **/
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("用户消息:"+userId+",报文:"+message);
        //可以群发消息
        //消息保存到数据库、redis
        if(StringUtils.isNotBlank(message)){
            try {
                //解析发送的报文
                JSONObject jsonObject = JSON.parseObject(message);
                //追加发送人(防止串改)
                jsonObject.put("fromUserId",this.userId);
                String toUserId=jsonObject.getString("toUserId");
                //传送给对应toUserId用户的websocket
                if(StringUtils.isNotBlank(toUserId)&&webSocketMap.containsKey(toUserId)){
                    webSocketMap.get(toUserId).sendMessage(message);
                }else{
                    //否则不在这个服务器上,发送到mysql或者redis
                    log.error("请求的userId:"+toUserId+"不在该服务器上");
                }
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }


    /**
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {

        log.error("用户错误:"+this.userId+",原因:"+error.getMessage());
        error.printStackTrace();
    }

    /**
     * 实现服务
     * 器主动推送
     */
    public void sendMessage(String message) {
        try {
            this.session.getBasicRemote().sendText(message);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     *发送自定
     *义消息
     **/
    public static void sendInfo(String message, String userId) {
        log.info("发送消息到:"+userId+",报文:"+message);
        if(StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)){
            webSocketMap.get(userId).sendMessage(message);
        }else{
            log.error("用户"+userId+",不在线!");
        }
    }

    /**
     * 获得此时的
     * 在线人数
     * @return
     */
    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    /**
     * 在线人
     * 数加1
     */
    public static synchronized void addOnlineCount() {
        WebSocketServer.onlineCount++;
    }

    /**
     * 在线人
     * 数减1
     */
    public static synchronized void subOnlineCount() {
        WebSocketServer.onlineCount--;
    }

}



二、定时任务



1. 依赖

<dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz</artifactId>
 </dependency>



2. 配置类

package com.hua.dentistryschedule.schedule;

import com.hua.dentistryschedule.Service.scheduleSercive;
import org.quartz.JobDetail;
import org.quartz.Trigger;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.quartz.CronTriggerFactoryBean;
import org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;

@Configuration
public class schedulemethod {
    //定时任务
    //配置JObDetail
    @Bean(name = "changeTimeoutConditionDetail")
    public MethodInvokingJobDetailFactoryBean changeTimeoutConditionDetail(scheduleSercive scheduleSercive) {
        //MethodInvokingJobDetailFactoryBean 对于需要定时执行的方法,需要通过此类进行包装
        MethodInvokingJobDetailFactoryBean jobDetail = new MethodInvokingJobDetailFactoryBean();
        //是否并发执行
        jobDetail.setConcurrent(false);
        //设置定时任务的名字
        jobDetail.setTargetObject(scheduleSercive);
        //设置定时任务的方法
        jobDetail.setTargetMethod("changeTimeoutCondition");
        
        return jobDetail;
    }

    @Bean(name = "messageToRemindDetail")
    public MethodInvokingJobDetailFactoryBean messageToRemindDetail(scheduleSercive scheduleSercive) {
        MethodInvokingJobDetailFactoryBean jobDetail = new MethodInvokingJobDetailFactoryBean();
        jobDetail.setConcurrent(false);
        jobDetail.setTargetObject(scheduleSercive);
        jobDetail.setTargetMethod("messageToRemind");
        return jobDetail;
    }

    //配置触发器
    @Bean(name = "changeTimeoutConditionTrigger")
    public CronTriggerFactoryBean changeTimeoutConditionTrigger(JobDetail changeTimeoutConditionDetail) {
        //CronTriggerFactoryBean 对于需要定时执行的方法,需要通过此类进行包装
        CronTriggerFactoryBean trigger = new CronTriggerFactoryBean();
        //设置定时任务
        trigger.setJobDetail(changeTimeoutConditionDetail);
        //设置cron表达式
        trigger.setCronExpression("0/5 * * * * ?");//每5秒执行一次
        return trigger;
    }

    @Bean(name = "messageToRemindTrigger")
    public CronTriggerFactoryBean messageToRemindTrigger(JobDetail messageToRemindDetail) {
        CronTriggerFactoryBean trigger = new CronTriggerFactoryBean();
        trigger.setJobDetail(messageToRemindDetail);
        trigger.setCronExpression("0/5 * * * * ?");//每5秒执行一次
        return trigger;
    }


//配置Scheduler
    @Bean(name = "scheduler")
    public SchedulerFactoryBean schedulerFactory(Trigger changeTimeoutConditionTrigger, Trigger messageToRemindTrigger) {
       //SchedulerFactoryBean 调度器,设置定时任务
        SchedulerFactoryBean bean = new SchedulerFactoryBean();
        // 用于quartz集群,QuartzScheduler启动时更新己存在的Job
        bean.setOverwriteExistingJobs(true);
        // 延时启动,应用启动1秒后
        bean.setStartupDelay(1);
        // 注册触发器
        bean.setTriggers(changeTimeoutConditionTrigger,messageToRemindTrigger);
        return bean;
    }

}



版权声明:本文为qq_52889028原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。