后端刷新前端页面-springboot使用sse向vue前端发送数据

  • Post author:
  • Post category:vue


适用场景:

后端更新状态通知前端刷新

优点:

简洁轻量,避免了前端需要主动刷新页面才能获取到更新的数据

缺点:

只能传送字符串,只能有后端向前端发送不可逆向

vue部分:

data() {
  return {
    source: null,
    username:Vue.ls.get(USER_NAME)
  };
},
beforeDestroy() {
  //离开页面前关闭sse连接
  this.source.close()
  closeSSE(this.username) //后端关闭连接方法
},
mounted() {
  this.createEventSource()
},
//创建sse连接
methods: {
    createEventSource() {
      let that = this
      if (window.EventSource) {
        this.source = new EventSource('http://localhost:8110/pda/v1/account/sse/connect/' + Vue.ls.get(USER_NAME))
        this.source.addEventListener('open', function (e) {
          console.log('建立连接。。。')
        }, false)   
        this.source.addEventListener('message', function (e) {
          console.log('收到消息', e.data)
          // 具体业务
        })
        this.source.addEventListener('error', function (e) {
          if (e.readyState === EventSource.CLOSED) {
            console.log('连接关闭')
          } else {
            console.log(e)
          }
        }, false)
      } else {
        alert('你的浏览器不支持SSE')
      }
    },
 }

springboot部分:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

/**
 * 主动向前端推送信息
 */
public class SseEmitterServer {

    private static final Logger logger = LoggerFactory.getLogger(SseEmitterServer.class);

    /**
     * 当前连接数
     */
    private static AtomicInteger count = new AtomicInteger(0);

    /**
     * 使用map对象,便于根据userId来获取对应的SseEmitter,或者放redis里面
     */
    private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();

    /**
     * 创建用户连接并返回 SseEmitter
     * @param employeeCode 用户ID
     * @return SseEmitter
     */
    public static SseEmitter connect(String employeeCode) {
        // 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
        SseEmitter sseEmitter = new SseEmitter(0L);
        // 注册回调
        sseEmitter.onCompletion(completionCallBack(employeeCode));
        sseEmitter.onError(errorCallBack(employeeCode));
        sseEmitter.onTimeout(timeoutCallBack(employeeCode));
        sseEmitterMap.put(employeeCode, sseEmitter);
        // 数量+1
        count.getAndIncrement();
        logger.info("创建新的sse连接,当前用户:{}", employeeCode);
        return sseEmitter;
    }

    /**
     * 给指定用户发送信息
     * @param employeeCode
     * @param jsonMsg
     */
    public static void sendMessage(String employeeCode, String jsonMsg) {
        try {
            SseEmitter emitter = sseEmitterMap.get(employeeCode);
            if (emitter == null) {
                logger.warn("sse用户[{}]不在注册表,消息推送失败", employeeCode);
                return;
            }
            emitter.send(jsonMsg, MediaType.APPLICATION_JSON);
        } catch (IOException e) {
            logger.error("sse用户[{}]推送异常:{}", employeeCode, e.getMessage());
            removeUser(employeeCode);
        }
    }

    /**
     * 群发消息
     * @param jsonMsg
     * @param employeeCodes
     */
    public static void batchSendMessage(String jsonMsg, List<String> employeeCodes) {
        employeeCodes.forEach(employeeCode -> sendMessage(jsonMsg, employeeCode));
    }

    /**
     * 群发所有人
     * @param jsonMsg
     */
    public static void batchSendMessage(String jsonMsg) {
        sseEmitterMap.forEach((k, v) -> {
            try {
                v.send(jsonMsg, MediaType.APPLICATION_JSON);
            } catch (IOException e) {
                logger.error("用户[{}]推送异常:{}", k, e.getMessage());
                removeUser(k);
            }
        });
    }

    /**
     * 移除用户连接
     */
    public static void removeUser(String employeeCode) {
        SseEmitter emitter = sseEmitterMap.get(employeeCode);
        if(emitter != null){
            emitter.complete();
        }
        sseEmitterMap.remove(employeeCode);
        // 数量-1
        count.getAndDecrement();
        logger.info("移除sse用户:{}", employeeCode);
    }

    /**
     * 获取当前连接信息
     */
    public static List<String> getIds() {
        return new ArrayList<>(sseEmitterMap.keySet());
    }

    /**
     * 获取当前连接数量
     */
    public static int getUserCount() {
        return count.intValue();
    }

    private static Runnable completionCallBack(String employeeCode) {
        return () -> {
            logger.info("结束sse用户连接:{}", employeeCode);
            removeUser(employeeCode);
        };
    }

    private static Runnable timeoutCallBack(String employeeCode) {
        return () -> {
            logger.info("连接sse用户超时:{}", employeeCode);
            removeUser(employeeCode);
        };
    }

    private static Consumer<Throwable> errorCallBack(String employeeCode) {
        return throwable -> {
            logger.info("sse用户连接异常:{}", employeeCode);
            removeUser(employeeCode);
        };
    }

}

controller

/**
 * 用于创建连接(将用户注册到server中)
 */
@GetMapping("/sse/connect/{userId}")
public SseEmitter connect(@PathVariable String userId) {
   return SseEmitterServer.connect(userId);
}

/**
 * 用于删除连接(将用户从server中移除)
 */
@GetMapping("/sse/closed/{userId}")
public void closed(@PathVariable String userId) {
   SseEmitterServer.removeUser(userId);
}

//业务中调用方法向vue发送数据

SseEmitterServer.batchSendMessage("111");



版权声明:本文为PPPPei原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。