一、长连接
1. 前端代码
<template>
<div>
<button @click="initSocket">建立websocket连接</button>
<el-input v-model="types">
</el-input>
<button @click="sendmessage()">点击发送消息了</button>
</div>
</template>
<script>
export default {
data() {
return {
webSocket: null,
//自己的IP地址+端口号+端口号
url: '110.128.131.1:9003/ws',
types: '给后台参数'
}
},
methods: {
// 建立连接
initSocket() {
// 有参数的情况下:
let url = `ws://${this.url}/${参数}`
// 没有参数的情况:接口
// let url1 = 'ws://localhost:9998'
//建立连接
this.webSocket = new WebSocket(url)
//连接成功调用
this.webSocket.onopen = this.webSocketOnOpen
//关闭连接调用
this.webSocket.onclose = this.webSocketOnClose
//发生错误是调用
this.webSocket.onerror = this.webSocketOnError
//发送消息时调用
this.webSocket.onmessage = this.webSocketOnMessage
},
// 主动给后端发送消息
sendmessage() {
this.webSocket.send(this.types);
},
// 建立连接成功后的状态
webSocketOnOpen() {
console.log('websocket连接成功');
},
// 获取到后台消息的事件,操作数据的代码在onmessage中书写
webSocketOnMessage(res) {
// res就是后台实时传过来的数据
console.log(res.data);
//给后台发送数据
},
// 关闭连接
webSocketOnClose() {
this.webSocket.close()
console.log('websocket连接已关闭');
},
//连接失败的事件
webSocketOnError(res) {
console.log('websocket连接失败');
// 打印失败的数据
console.log(res);
}
},
created() {
// 页面打开就建立连接,根据业务需要
this.initSocket()
},
destroyed() {
// 页面销毁关闭连接
this.webSocket.close()
},
}
</script>
2. 后端代码
1. 配置类
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>2.7.0</version>
</dependency>
package com.hua.dentistryschedule.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
@EnableWebMvc
@EnableWebSocket
public class WebSocketconfig {
@Bean
public ServerEndpointExporter serverEndpointExporter () {
return new ServerEndpointExporter();
}
}
2. service类
无参
package com.hua.dentistryschedule.config;
import lombok.extern.log4j.Log4j2;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
@Log4j2
@Component
//主要是将目前的类定义成一个websocket服务器端, 注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端
@ServerEndpoint(value = "/ws")//无参
public class WebSocketServer {
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
private static final AtomicInteger OnlineCount = new AtomicInteger(0);
// concurrent包的线程安全Set,用来存放每个客户端对应的Session对象。
private static CopyOnWriteArraySet<Session> SessionSet = new CopyOnWriteArraySet<Session>();
private static int num = 0;
private Timer timer = new Timer();
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session) {
SessionSet.add(session);
this.session = session;
int cnt = OnlineCount.incrementAndGet(); // 在线数加1
log.info("有连接加入,当前连接数为:{}", cnt);
SendMessage(this.session, num+"");
heartBeat(session);
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
SessionSet.remove(this.session);
int cnt = OnlineCount.decrementAndGet();
log.info("有连接关闭,当前连接数为:{}", cnt);
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
log.info("收到客户端消息:"+message);
if("ping".equalsIgnoreCase(message)){
SendMessage(session, "pong" );
timer.cancel();
heartBeat(session);
return;
}
if("+1".equalsIgnoreCase(message)){
this.num ++;
}
if("-1".equalsIgnoreCase(message)){
this.num --;
}
BroadCastInfo(""+num );
}
/**
* 出现错误
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("发生错误:{},Session ID: {}", error.getMessage(), session.getId());
error.printStackTrace();
}
/**
* 心跳
* @param session
*/
private void heartBeat(Session session) {
timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
try {
session.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}, 40000);
}
/**
* 发送消息,实践表明,每次浏览器刷新,session会发生变化。
*
* @param session
* @param message
*/
public static void SendMessage(Session session, String message) {
try {
if (session.isOpen()) {
session.getBasicRemote().sendText(message);
}
} catch (IOException e) {
log.error("发送消息出错:{}", e.getMessage());
e.printStackTrace();
}
}
/**
* 群发消息
*
* @param message
* @throws IOException
*/
public static void BroadCastInfo(String message) {
for (Session session : SessionSet) {
SendMessage(session, message);
}
}
/**
* 指定Session发送消息
*
* @param sessionId
* @param message
* @throws IOException
*/
public static void SendMessage(String message, String sessionId) {
Session session = null;
for (Session s : SessionSet) {
if (s.getId().equals(sessionId)) {
session = s;
break;
}
}
if (session != null) {
SendMessage(session, message);
} else {
log.warn("没有找到你指定ID的会话:{}", sessionId);
}
}
}
有参
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
/**
* websocket的处理类。
* 作用相当于HTTP请求
* 中的controller
*/
@Component
@Slf4j
@ServerEndpoint("/api/pushMessage/{userId}")
public class WebSocketServer {
/**静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。*/
private static int onlineCount = 0;
/**concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。*/
private static ConcurrentHashMap<String,WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
/**与某个客户端的连接会话,需要通过它来给客户端发送数据*/
private Session session;
/**接收userId*/
private String userId = "";
/**
* 连接建立成
* 功调用的方法
*/
@OnOpen
public void onOpen(Session session,@PathParam("userId") String userId) {
this.session = session;
this.userId=userId;
if(webSocketMap.containsKey(userId)){
webSocketMap.remove(userId);
//加入set中
webSocketMap.put(userId,this);
}else{
//加入set中
webSocketMap.put(userId,this);
//在线数加1
addOnlineCount();
}
log.info("用户连接:"+userId+",当前在线人数为:" + getOnlineCount());
sendMessage("连接成功");
}
/**
* 连接关闭
* 调用的方法
*/
@OnClose
public void onClose() {
if(webSocketMap.containsKey(userId)){
webSocketMap.remove(userId);
//从set中删除
subOnlineCount();
}
log.info("用户退出:"+userId+",当前在线人数为:" + getOnlineCount());
}
/**
* 收到客户端消
* 息后调用的方法
* @param message
* 客户端发送过来的消息
**/
@OnMessage
public void onMessage(String message, Session session) {
log.info("用户消息:"+userId+",报文:"+message);
//可以群发消息
//消息保存到数据库、redis
if(StringUtils.isNotBlank(message)){
try {
//解析发送的报文
JSONObject jsonObject = JSON.parseObject(message);
//追加发送人(防止串改)
jsonObject.put("fromUserId",this.userId);
String toUserId=jsonObject.getString("toUserId");
//传送给对应toUserId用户的websocket
if(StringUtils.isNotBlank(toUserId)&&webSocketMap.containsKey(toUserId)){
webSocketMap.get(toUserId).sendMessage(message);
}else{
//否则不在这个服务器上,发送到mysql或者redis
log.error("请求的userId:"+toUserId+"不在该服务器上");
}
}catch (Exception e){
e.printStackTrace();
}
}
}
/**
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("用户错误:"+this.userId+",原因:"+error.getMessage());
error.printStackTrace();
}
/**
* 实现服务
* 器主动推送
*/
public void sendMessage(String message) {
try {
this.session.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
*发送自定
*义消息
**/
public static void sendInfo(String message, String userId) {
log.info("发送消息到:"+userId+",报文:"+message);
if(StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)){
webSocketMap.get(userId).sendMessage(message);
}else{
log.error("用户"+userId+",不在线!");
}
}
/**
* 获得此时的
* 在线人数
* @return
*/
public static synchronized int getOnlineCount() {
return onlineCount;
}
/**
* 在线人
* 数加1
*/
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
/**
* 在线人
* 数减1
*/
public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
}
二、定时任务
1. 依赖
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
</dependency>
2. 配置类
package com.hua.dentistryschedule.schedule;
import com.hua.dentistryschedule.Service.scheduleSercive;
import org.quartz.JobDetail;
import org.quartz.Trigger;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.quartz.CronTriggerFactoryBean;
import org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
@Configuration
public class schedulemethod {
//定时任务
//配置JObDetail
@Bean(name = "changeTimeoutConditionDetail")
public MethodInvokingJobDetailFactoryBean changeTimeoutConditionDetail(scheduleSercive scheduleSercive) {
//MethodInvokingJobDetailFactoryBean 对于需要定时执行的方法,需要通过此类进行包装
MethodInvokingJobDetailFactoryBean jobDetail = new MethodInvokingJobDetailFactoryBean();
//是否并发执行
jobDetail.setConcurrent(false);
//设置定时任务的名字
jobDetail.setTargetObject(scheduleSercive);
//设置定时任务的方法
jobDetail.setTargetMethod("changeTimeoutCondition");
return jobDetail;
}
@Bean(name = "messageToRemindDetail")
public MethodInvokingJobDetailFactoryBean messageToRemindDetail(scheduleSercive scheduleSercive) {
MethodInvokingJobDetailFactoryBean jobDetail = new MethodInvokingJobDetailFactoryBean();
jobDetail.setConcurrent(false);
jobDetail.setTargetObject(scheduleSercive);
jobDetail.setTargetMethod("messageToRemind");
return jobDetail;
}
//配置触发器
@Bean(name = "changeTimeoutConditionTrigger")
public CronTriggerFactoryBean changeTimeoutConditionTrigger(JobDetail changeTimeoutConditionDetail) {
//CronTriggerFactoryBean 对于需要定时执行的方法,需要通过此类进行包装
CronTriggerFactoryBean trigger = new CronTriggerFactoryBean();
//设置定时任务
trigger.setJobDetail(changeTimeoutConditionDetail);
//设置cron表达式
trigger.setCronExpression("0/5 * * * * ?");//每5秒执行一次
return trigger;
}
@Bean(name = "messageToRemindTrigger")
public CronTriggerFactoryBean messageToRemindTrigger(JobDetail messageToRemindDetail) {
CronTriggerFactoryBean trigger = new CronTriggerFactoryBean();
trigger.setJobDetail(messageToRemindDetail);
trigger.setCronExpression("0/5 * * * * ?");//每5秒执行一次
return trigger;
}
//配置Scheduler
@Bean(name = "scheduler")
public SchedulerFactoryBean schedulerFactory(Trigger changeTimeoutConditionTrigger, Trigger messageToRemindTrigger) {
//SchedulerFactoryBean 调度器,设置定时任务
SchedulerFactoryBean bean = new SchedulerFactoryBean();
// 用于quartz集群,QuartzScheduler启动时更新己存在的Job
bean.setOverwriteExistingJobs(true);
// 延时启动,应用启动1秒后
bean.setStartupDelay(1);
// 注册触发器
bean.setTriggers(changeTimeoutConditionTrigger,messageToRemindTrigger);
return bean;
}
}
版权声明:本文为qq_52889028原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。