spring整合websocket

  • Post author:
  • Post category:其他


最近业务需要用到websocket,经过几天研究终于有小成,记录一下。


参考文档:

先引入spring websocket需要的两个依赖

      <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-websocket</artifactId>
            <version>4.1.5.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-messaging</artifactId>
            <version>4.1.5.RELEASE</version>
        </dependency>

在mvc 的xml加入一下代码,扫描注解方式的配置

<context:component-scan
   base-package="com.exmple.demo*.*" />

配置类

@EnableWebSocketMessageBroker 的作用是在WebSocket 上启用 STOMP,registerStompEndpoints方法的作用是websocket建立连接用的(也就是所谓的注册到指定的url),configureMessageBroker方法作用是配置一个简单的消息代理。如果补充在,默认情况下会自动配置一个简单的内存消息队列,用来处理“/topic”为前缀的消息,但经过重载后,消息队列将会处理前缀为“/topic”、“/user”的消息,并会将“/app”的消息转给controller处理。

/**
 * @program: common
 * @description:
 * @author: liulian
 * @create: 2020-12-31 11:17
 **/
@Configuration
@EnableWebSocketMessageBroker
public class StompWebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

    @Autowired
    private WebSocketDecoratorFactory webSocketDecoratorFactory;
    @Autowired
    private PrincipalHandshakeHandler principalHandshakeHandler;


    @Override
    public void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) {
        /**
         * myUrl表示 你前端到时要对应url映射
         */
        stompEndpointRegistry.addEndpoint("/myUrl.do")
                .setAllowedOrigins("*")
                .setHandshakeHandler(principalHandshakeHandler);
        /**
         * myUrl表示,使用withSockJS()开启支持socketJS
         */
        stompEndpointRegistry.addEndpoint("/sockjs")
                .setAllowedOrigins("*")
                .setHandshakeHandler(principalHandshakeHandler)
                .withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        /**
         * queue 点对点
         * topic 广播
         * user 点对点前缀
         */
        registry.enableSimpleBroker("/queue", "/topic");
        registry.setUserDestinationPrefix("/user");
    }

    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        registration.addDecoratorFactory(webSocketDecoratorFactory);
        super.configureWebSocketTransport(registration);
    }


}

PrincipalHandshakeHandler:

/**
 * @program: common
 * @description:
 * @author: liulian
 * @create: 2020-12-31 11:15
 **/
@Component
public class PrincipalHandshakeHandler extends DefaultHandshakeHandler{

    @Override
    protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
        /**
         * 这边可以按你的需求,如何获取唯一的值,既unicode
         * 得到的值,会在监听处理连接的属性中,既WebSocketSession.getPrincipal().getName()
         * 也可以自己实现Principal()
         */
        if (request instanceof ServletServerHttpRequest) {
            ServletServerHttpRequest servletServerHttpRequest = (ServletServerHttpRequest) request;
            HttpServletRequest httpRequest = servletServerHttpRequest.getServletRequest();
            /**
             * 这边就获取你最熟悉的陌生人,携带参数,你可以cookie,请求头,或者url携带,这边我采用url携带
             */
            String token = httpRequest.getParameter("userId");

            return new MyPrincipal(token);

        }
        return null;
    }

定义一个socket管理器SocketManager:

/**
 * @program: common
 * @description:
 * @author: liulian
 * @create: 2020-12-31 11:05
 **/
public class SocketManager {

    private static final Logger log = LoggerFactory.getLogger(SocketManager.class);


    private static ConcurrentHashMap<String, WebSocketSession> manager = new ConcurrentHashMap<String, WebSocketSession>();

    public static void add(String key, WebSocketSession webSocketSession) {
        log.info("新添加webSocket连接 {} ", key);
        manager.put(key, webSocketSession);
    }

    public static void remove(String key) {
        log.info("移除webSocket连接 {} ", key);
        manager.remove(key);
    }

    public static WebSocketSession get(String key) {
        log.info("获取webSocket连接 {}", key);
        return manager.get(key);
    }

WebSocketDecoratorFactory:

/**
 * @program: common
 * @description:
 * @author: liulian
 * @create: 2020-12-31 11:11
 **/
@Component
public class WebSocketDecoratorFactory implements WebSocketHandlerDecoratorFactory {

    private static final Logger log = LoggerFactory.getLogger(WebSocketDecoratorFactory.class);

    private static final int time = 86400;

    @Override
    public WebSocketHandler decorate(WebSocketHandler handler) {

        return new WebSocketHandlerDecorator(handler) {
            @Override
            public void afterConnectionEstablished(WebSocketSession session) throws Exception {
                log.info("有人连接啦  sessionId = {}", session.getId());
                Principal principal = session.getPrincipal();
                if (principal != null) {
                    log.info("key = {} 存入", principal.getName());
                    // 身份校验成功,缓存socket连接
                    SocketManager.add(principal.getName(), session);
                    /**
                     * 如果用户不为空,添加用户到redis
                     * 首先获取当当前服务器的ip和端口,作为用户连接服务器的唯一标识
                     */
                    String redisKey = Constants.SOCKET_TOKEN_PREFIX+principal.getName();
                    String server = ServerUtil.getServerName();
                    JedisUtil.setString(redisKey,server,time);
                    log.info("当前连接服务器--------->{}",JedisUtil.getByKey(redisKey));
                }


                super.afterConnectionEstablished(session);
            }

            @Override
            public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
                log.info("有人退出连接啦  sessionId = {}", session.getId());
                Principal principal = session.getPrincipal();
                if (principal != null) {
                    // 身份校验成功,移除socket连接
                    SocketManager.remove(principal.getName());
                    String redisKey = Constants.SOCKET_TOKEN_PREFIX+principal.getName();
                    JedisUtil.delByKey(redisKey);
                }
                super.afterConnectionClosed(session, closeStatus);
            }
        };
    }
}

自己实现Principal

/**
 * @program: common
 * @description:
 * @author: liulian
 * @create: 2021-01-08 13:59
 **/
public class MyPrincipal implements Principal {

    private String token;


    public MyPrincipal(String token) {
        token = StringUtils.isBlank(token) ? token : token.replaceAll(" ","+");
        this.token = token;
    }

    @Override
    public String getName() {
        return token;
    }
}

controller

应用程序可以使用带注解的@Controller类来处理来自 Client 端的消息。这样的类可以声明@MessageMapping,@SubscribeMapping和@ExceptionHandler方法,如下所述。

@MessageMapping

@MessageMapping注解可用于根据消息的目的地路由消息的方法。在方法级别和类型级别都支持它。在类型级别@MessageMapping上用于表示控制器中所有方法之间的共享 Map。

默认情况下,目标 Map 应为 Ant 样式的路径模式,例如“/foo *”,“/foo/**”。这些模式包括对模板变量的支持,例如“/foo /{id}”,可以使用@DestinationVariable个方法参数来引用。

/**
 * @program: common
 * @description:
 * @author: liulian
 * @create: 2020-12-31 11:23
 **/
@RestController
public class MessageController {

    private static final Logger log = LoggerFactory.getLogger(MessageController.class);

    @Autowired
    private SimpMessagingTemplate template;


    /**
     * *服务器指定用户进行推送,需要前端开通 var socket = new SockJS(host+'/myUrl' + '?token=1234');
     */
    @RequestMapping("/sendUser")
    public void sendUser(String token,String msg) {
        log.info("token = {} ,服务器指定用户进行推送", token);
        log.info("sendUser--->{}"+ServerUtil.getServerName());
        WebSocketSession webSocketSession = SocketManager.get(token);
        log.info("webSession---->"+webSocketSession);
        if (webSocketSession != null) {
            log.info("获取到了连接----->"+webSocketSession);
            /**
             * 主要防止broken pipe
             */
            template.convertAndSendToUser(token, "/queue/sendUser", msg);
        }

    }

    /**
     * 广播,服务器主动推给连接的客户端
     */
    @RequestMapping("/sendTopic")
    public void sendTopic() {
        template.convertAndSend("/topic/sendTopic", "大家晚上好");

    }

    /**
     * 客户端发消息,服务端接收
     *
     * @param message
     */
    // 相当于RequestMapping
    @MessageMapping("/sendServer")
    public void sendServer(String message) {
        log.info("message:{}", message);
    }

    /**
     * 客户端发消息,大家都接收,相当于直播说话
     *
     * @param message
     * @return
     */
    @MessageMapping("/sendAllUser/{id}")
    @SendTo("/topic/sendTopic/{id}")
    public String sendAllUser(String message) {
        // 也可以采用template方式
        return message;
    }

    /**
     * 点对点用户聊天,这边需要注意,由于前端传过来json数据,所以使用@RequestBody
     * 这边需要前端开通var socket = new SockJS(host+'/myUrl' + '?token=4567');   token为指定name
     * 收到消息判断接受用户是否和当前发送者在同一台服务器,如果不在的话,转发消息到接收者服务器
     * @param map
     */
    @MessageMapping("/sendMyUser.do")
    public void sendMyUser(@RequestBody Map<String, String> map) {
        log.info("map = {}", map);
        String toUserId = map.get("name");
        WebSocketSession webSocketSession = SocketManager.get(toUserId);
        if (webSocketSession != null) {
            log.info("sessionId = {}", webSocketSession.getId());
            template.convertAndSendToUser(toUserId, "/queue/sendUser", map.get("message"));
        }else{
            String redisKey = Constants.SOCKET_TOKEN_PREFIX+toUserId;
            String redisServer = PubFun.convert2String(JedisUtil.getByKey(redisKey),"");
            //如果redis里没有数据,证明用户没有登录,直接return
            if(StringUtils.isBlank(redisServer)) return;
            String serverName = ServerUtil.getServerName();
            if(!StringUtils.equals(redisServer,serverName)){

                String url = redisServer+"/socket/sockjs";
                try {
                    HelloClient.send2User(url, JSON.toJSONString(map));
                } catch (ExecutionException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 查询用户是否在线
     * @param token
     * @return
     */
    @RequestMapping("/queryIsconnected")
    public RequestResult queryIsconnected(String token){

        RequestResult result = new RequestResult();

        log.info("token = {} ,查询用户是否在线", token);
        WebSocketSession webSocketSession = SocketManager.get(token);

        if(null == webSocketSession || !webSocketSession.isOpen()){
            return result.addState(false).addStateMsg("用户未连接");
        }
        return result.addState(true).addStateMsg("用户已连接").addJSONData(token);
    }

运行项目时,异常信息含有

java.lang.IllegalArgumentException: Async support must be enabled on a servlet and for all filters involved in async request processing. This is done in Java code using the Servlet API or by adding “true” to servlet and filter declarations in web.xml. Also you must use a Servlet 3.0+ container

(spring mvc整合才有可能发生的异常,spring boot没有)

true

这段代码等字样,在web.xml的和两个标签里面加入这段代码就可以解决

true

若页面初始化websocket出现错误,打开浏览器的控制台看地址带有info地址,是否显示200。

解决方案:在web.xml中配置拦截/*的路径,访问连接是不能添加后缀.do

现在还有个问题,就是用户断开连接的时候会抛出一个异常,欢迎留言讨论。。。

移除webSocket连接 456

2021/01/11 19:59:25,236 2110254 [clientOutboundChannel-90] DEBUG org.springframework.web.socket.messaging.StompSubProtocolHandler.sendToClient  – Failed to send WebSocket message to client in session ssxwtqdr

java.lang.IllegalStateException: Cannot send a message when session is closed



版权声明:本文为qq_31546841原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。