spring stomp+rabbitmq实现点对点消息推送

  • Post author:
  • Post category:其他


当前的项目需要用到消息推送这个功能,领导拍板说用stomp+rabbitmq。但是我百度了一下,没找到特别好的博客,特此写篇博客记录下,希望对初学stomp的人一点帮助。

学习stomp之前,先来扯扯tcp和http。我们知道,tcp是全双工的,也就是说建立tcp的双方是可以互相主动通信的。http呢,是一个应用层协议,使用tcp作为传输层,但是http的模型是request/response,也就是说每次http通信都要客户端主动发起请求,然后服务端响应。所以服务器端是无法主动推送消息到客户端,这样看起来,http就有点像单工的了。后面就出现了websocket协议,它利用了tcp全双工的特性,使得通信双方可以主动的向对方发送消息。不过websocket目前只在浏览器端支持,但是websocket协议出现的比较晚,所以还是有部分浏览器不支持websocket。websocket呢,只保证双方可以主动通信,除此之外,没有像http一样有什么POST,GET,什么restful这些语义,所以用起来还不是很方便。后面stomp就出来了,它和http协议很类似,建立连接时用connect,订阅消息用subscribe。对于stomp和websocket的细节这里就不做过多介绍了,如果对stomp还不是很了解的话,可以先切出去,先了解一下stomp,网上应该有很多比我讲的好的教程。

再来说说rabbitmq,他是一个消息队列,这里我们就用它来暂存消息,官网地址:www.rabbbitmq.com(rabbitmq的官方文档看起来是非常舒服的)。为了让rabbitmq支持stomp,需要安装插件,cd到bin目录,执行以下命令:


rabbitmq-plugins enable rabbitmq_web_stomp



rabbitmq-plugins enable rabbitmq_web_stomp_examples


重启rabbitmq即可。

现在可以开始实战了,使用idea去spring官网下一个spring boot的初始化项目,然后添加pom依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.2.8.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.webjars</groupId>
    <artifactId>stomp-websocket</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-net</artifactId>
    <version>2.0.8.RELEASE</version>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.2.5.RELEASE</version>
</dependency>
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.39.Final</version>
</dependency>
<dependency>
    <groupId>io.projectreactor.netty</groupId>
    <artifactId>reactor-netty</artifactId>
    <version>0.9.6.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

添加好了依赖之后,需要配置websocket,添加一个websocket配置类:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration
      implements WebSocketMessageBrokerConfigurer {
   // 添加拦截器,对于新建立的连接保存其用户信息
   @Autowired
   public ConnectInterceptor connectInterceptor() {
      return new ConnectInterceptor();
   }
   //设置建立websocket连接的端点,允许跨域
   @Override
   public void registerStompEndpoints(StompEndpointRegistry registry) {
      registry.addEndpoint("/websocket").setAllowedOrigins("*").withSockJS()
            .setSessionCookieNeeded(true);
   }
   //设置stompbroker,这里填写rabbitmq的地址和密码即可,端口默认61613
   @Override
   public void configureMessageBroker(MessageBrokerRegistry registry) {
      registry.setPathMatcher(new AntPathMatcher("."));
      registry.enableStompBrokerRelay("/topic", "/queue")
            .setRelayHost("localhost").setRelayPort(61613)
            .setClientLogin("guest").setClientPasscode("guest")
            .setAutoStartup(true);
      registry.setApplicationDestinationPrefixes("/app", "/topic", "/queue");
   }
   //添加拦截器
   @Override
   public void configureClientInboundChannel(
         ChannelRegistration registration) {
      registration.interceptors(connectInterceptor());
   }

拦截用户的拦截器代码如下:

public class ConnectInterceptor implements ChannelInterceptor {
    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message,StompHeaderAccessor.class);
        if(!Objects.isNull(accessor) && StompCommand.CONNECT.equals(accessor.getCommand())){
            OAuth2Authentication simpUser = (OAuth2Authentication) message.getHeaders().get("simpUser");
            String userId = simpUser.getUserAuthentication().getPrincipal().toString();
            accessor.setUser(() -> userId);
        }
        return message;
    }
}

由于项目中登陆模块使用的是oauth2,用户的登录成功后会颁发token,所以这里主要是做解析token,取出用户信息,如果你们不想用oauth2的话,可以参考以下配置:

@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
   StompHeaderAccessor accessor = MessageHeaderAccessor
         .getAccessor(message, StompHeaderAccessor.class);
   if (StompCommand.CONNECT.equals(accessor.getCommand())) {
      Object raw = message.getHeaders()
            .get(SimpMessageHeaderAccessor.NATIVE_HEADERS);

      if (raw instanceof Map) {
        //获取name参数的值,注意,这个值是要在前端建立连接的时候加上的
         Object name = ((Map) raw).get("name");
         String token = ((Map) raw).get("access_token").toString();
         if (name instanceof LinkedList) {
            // 设置当前访问器的认证用户
            accessor.setUser(
                  ()-> (String) name);
            System.out.println(
                  "userInterceptor:" + accessor.getUser().getName());
         }
      }
   }
   return message;
}

到这里,stomp的配置基本上已经配好了。之前看别人的教程,以为整合rabbitmq很复杂,需要在建立连接的时候新建queue,然后动态绑定exchange,消费者呢,需要实时监控queue,这个过程是很麻烦的,因为一般来讲消费者是直接通过配置进行静态绑定queue,而现在需要实时的监听queue的增加和减少,个人觉得十分麻烦。后来发现完全不需要这么做,安装好了rabbitmq的插件之后就可以直接使用了。当需要对一个新用户发消息时,会在rabbitmq自动的生成一个随机queue,而java端会进行自动的绑定这个queue,完全屏蔽了底层操作,使用起来还是比较方便的。

当我们要发送消息给建立连接的用户的时候,拿到simMessagingTemplate这个bean

@Autowired
private SimpMessagingTemplate simpMessagingTemplate;

simpMessagingTemplate.convertAndSenfToUser(username, queueName,message)

注意,上面的queuName不是指rabbitmq的queuename(这个name在我们这里是随机的),这个queueName是用户在连接之后订阅的queueName.

到了这里,后端基本上是写完了,由于本人不会前端,所以前端的代码就放一份别人的吧:


<!DOCTYPE HTML>

<html>

<head>

<meta charset=”UTF-8″>

<title>My WebSocket</title>

<script src=”js/sockjs.min.js”></script>

<script src=”js/jquery.min.js”></script>

<script src=”js/stomp.min.js”></script>

<!–<script type=”text/javascript”></script>–>

<style>

#message22{


margin-top:40px;

border:1px solid gray;

padding:20px;

}

</style>

<style>

#message{


margin-top:40px;

border:1px solid gray;

padding:20px;

}

</style>

</head>

<body>

频道号:<input id=”room” type=”text”/>

<button οnclick=”conectWebSocket()”>连接WebSocket</button>

<button οnclick=”disconnect()”>断开连接</button>

<hr />

<div id=”message22″></div>

<br />

做题区:<input id=”text” type=”text” />

<!– 频道号:<input id=”toUser” type=”text” /> –>

<button οnclick=”sendMessage()”>发送消息</button>

<div id=”message”></div>

</body>

<script type=”text/javascript”>

var stompClient;

var serverUrl = “http://localhost:8026/websocket?access_token=ed3621ca-44ea-4fc7-a3a4-abe0a84a9155”;

var room;//频道号

var websocket = null;

//websocket连接

function conectWebSocket(){

this.room = document.getElementById(‘room’).value;//频道号

console.log(this.room);

console.log(this.serverUrl);

var socket = new SockJS(this.serverUrl);

this.stompClient = Stomp.over(socket);

var that = this;

this.stompClient.connect({name:”1″,access_token:”4c7695f8-4dbc-4dee-9d51-440dab79b320″}, function (frame) {


//that.stompClient.subscribe(‘/topic/’+that.room ,function(txt) {


// console.log(“websocket连接成功”);

// console.log(txt);

//  document.getElementById(‘message’).innerHTML += JSON.parse(txt.body)[‘content’]+ ‘<br/>’;

// const sender = JSON.parse(message.body)[‘sender’];

const language = JSON.parse(message.body)[‘language’];

const content = JSON.parse(message.body)[‘content’];

const type = JSON.parse(message.body)[‘type’];

//  });

that.stompClient.subscribe(‘/user/queue/message’,function(txt) {


document.getElementById(‘message’).innerHTML += JSON.parse(txt.body)[‘content’]+ ‘<br/>’;

});

});

}

//发送消息

function sendMessage() {


//获取输入的文本信息进行发送

var message = document.getElementById(‘text’).value;

// var room = document.getElementById(‘toUser’).value;

var socketMsg = {msg:message,toUser:room};

var that = this

this.stompClient.send(

‘/app/test/pointToPoint’,

{},

JSON.stringify({


‘room’: that.room,

‘type’: “1”,//1,2

‘content’: message,

‘userId’:”test”,//小明

‘questionId’:”222299023″,//题目1

‘createTime’:””,

})

);

}

function disconnect() {

//断开连接的方法

if (this.stompClient !== undefined) {

this.stompClient.disconnect();

alert(“Disconnected”);

}else{


alert(“当前没有连接websocket”)

}

this.stompClient = undefined;

}

</script>

</html>

这个是用原生js写的,直接打开即可运行,里面的服务器端的地址和端口根据自己的实际情况进行修改。

———————————这是有一个分割线—————————————————————————–

二更,想了几天,发现之前的版本是有问题的,使用上面的方案rabbitmq不会自动删除queue,也就是说当有用户断开连接后,queu会一直存在于rabbitmq中。当项目运行时间长了以后,rabbitmq中无效的queue会非常多,所以必须对以上的方案进行改进。后来发现如果后端stomp消息代理设置的前缀为/exchange时,这个时候当连接的用户断开连接时。rabbitmq会删除这个queue。

功能虽然基本上实现了,但是还是有一些缺陷。主要在于stomp推送消息是根据userId而不是根据session,也就是说如果用户在两个浏览器同时连接并且订阅了同一个主题时,应该会收到另一个浏览器本应该收到消息(未验证)。后续应该还是应该基于websession来进行精准推送。

ok,写完了。如果你运行上面的代码有什么问题(有点乱,目前还不会用md),欢迎留言评论。

————————————三更——————————————————————————————–

回过头来看了一下这篇文章,发现还有很多可以优化的地方,比如前端可以订阅一个不存在的路径,用户的信息用拦截器来做也不是很好(本文是因为只有一个路径,所以没什么问题)。后面考虑这周重写一篇。

以上。



版权声明:本文为qq_41452805原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。