当前的项目需要用到消息推送这个功能,领导拍板说用stomp+rabbitmq。但是我百度了一下,没找到特别好的博客,特此写篇博客记录下,希望对初学stomp的人一点帮助。
学习stomp之前,先来扯扯tcp和http。我们知道,tcp是全双工的,也就是说建立tcp的双方是可以互相主动通信的。http呢,是一个应用层协议,使用tcp作为传输层,但是http的模型是request/response,也就是说每次http通信都要客户端主动发起请求,然后服务端响应。所以服务器端是无法主动推送消息到客户端,这样看起来,http就有点像单工的了。后面就出现了websocket协议,它利用了tcp全双工的特性,使得通信双方可以主动的向对方发送消息。不过websocket目前只在浏览器端支持,但是websocket协议出现的比较晚,所以还是有部分浏览器不支持websocket。websocket呢,只保证双方可以主动通信,除此之外,没有像http一样有什么POST,GET,什么restful这些语义,所以用起来还不是很方便。后面stomp就出来了,它和http协议很类似,建立连接时用connect,订阅消息用subscribe。对于stomp和websocket的细节这里就不做过多介绍了,如果对stomp还不是很了解的话,可以先切出去,先了解一下stomp,网上应该有很多比我讲的好的教程。
再来说说rabbitmq,他是一个消息队列,这里我们就用它来暂存消息,官网地址:www.rabbbitmq.com(rabbitmq的官方文档看起来是非常舒服的)。为了让rabbitmq支持stomp,需要安装插件,cd到bin目录,执行以下命令:
rabbitmq-plugins enable rabbitmq_web_stomp
rabbitmq-plugins enable rabbitmq_web_stomp_examples
重启rabbitmq即可。
现在可以开始实战了,使用idea去spring官网下一个spring boot的初始化项目,然后添加pom依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.2.8.RELEASE</version>
</dependency>
<dependency>
<groupId>org.webjars</groupId>
<artifactId>stomp-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-net</artifactId>
<version>2.0.8.RELEASE</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.39.Final</version>
</dependency>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
<version>0.9.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
添加好了依赖之后,需要配置websocket,添加一个websocket配置类:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration
implements WebSocketMessageBrokerConfigurer {
// 添加拦截器,对于新建立的连接保存其用户信息
@Autowired
public ConnectInterceptor connectInterceptor() {
return new ConnectInterceptor();
}
//设置建立websocket连接的端点,允许跨域
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/websocket").setAllowedOrigins("*").withSockJS()
.setSessionCookieNeeded(true);
}
//设置stompbroker,这里填写rabbitmq的地址和密码即可,端口默认61613
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setPathMatcher(new AntPathMatcher("."));
registry.enableStompBrokerRelay("/topic", "/queue")
.setRelayHost("localhost").setRelayPort(61613)
.setClientLogin("guest").setClientPasscode("guest")
.setAutoStartup(true);
registry.setApplicationDestinationPrefixes("/app", "/topic", "/queue");
}
//添加拦截器
@Override
public void configureClientInboundChannel(
ChannelRegistration registration) {
registration.interceptors(connectInterceptor());
}
拦截用户的拦截器代码如下:
public class ConnectInterceptor implements ChannelInterceptor {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message,StompHeaderAccessor.class);
if(!Objects.isNull(accessor) && StompCommand.CONNECT.equals(accessor.getCommand())){
OAuth2Authentication simpUser = (OAuth2Authentication) message.getHeaders().get("simpUser");
String userId = simpUser.getUserAuthentication().getPrincipal().toString();
accessor.setUser(() -> userId);
}
return message;
}
}
由于项目中登陆模块使用的是oauth2,用户的登录成功后会颁发token,所以这里主要是做解析token,取出用户信息,如果你们不想用oauth2的话,可以参考以下配置:
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor
.getAccessor(message, StompHeaderAccessor.class);
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
Object raw = message.getHeaders()
.get(SimpMessageHeaderAccessor.NATIVE_HEADERS);
if (raw instanceof Map) {
//获取name参数的值,注意,这个值是要在前端建立连接的时候加上的
Object name = ((Map) raw).get("name");
String token = ((Map) raw).get("access_token").toString();
if (name instanceof LinkedList) {
// 设置当前访问器的认证用户
accessor.setUser(
()-> (String) name);
System.out.println(
"userInterceptor:" + accessor.getUser().getName());
}
}
}
return message;
}
到这里,stomp的配置基本上已经配好了。之前看别人的教程,以为整合rabbitmq很复杂,需要在建立连接的时候新建queue,然后动态绑定exchange,消费者呢,需要实时监控queue,这个过程是很麻烦的,因为一般来讲消费者是直接通过配置进行静态绑定queue,而现在需要实时的监听queue的增加和减少,个人觉得十分麻烦。后来发现完全不需要这么做,安装好了rabbitmq的插件之后就可以直接使用了。当需要对一个新用户发消息时,会在rabbitmq自动的生成一个随机queue,而java端会进行自动的绑定这个queue,完全屏蔽了底层操作,使用起来还是比较方便的。
当我们要发送消息给建立连接的用户的时候,拿到simMessagingTemplate这个bean
@Autowired
private SimpMessagingTemplate simpMessagingTemplate;
simpMessagingTemplate.convertAndSenfToUser(username, queueName,message)
注意,上面的queuName不是指rabbitmq的queuename(这个name在我们这里是随机的),这个queueName是用户在连接之后订阅的queueName.
到了这里,后端基本上是写完了,由于本人不会前端,所以前端的代码就放一份别人的吧:
<!DOCTYPE HTML>
<html>
<head>
<meta charset=”UTF-8″>
<title>My WebSocket</title>
<script src=”js/sockjs.min.js”></script>
<script src=”js/jquery.min.js”></script>
<script src=”js/stomp.min.js”></script>
<!–<script type=”text/javascript”></script>–>
<style>
#message22{
margin-top:40px;
border:1px solid gray;
padding:20px;
}
</style>
<style>
#message{
margin-top:40px;
border:1px solid gray;
padding:20px;
}
</style>
</head>
<body>
频道号:<input id=”room” type=”text”/>
<button οnclick=”conectWebSocket()”>连接WebSocket</button>
<button οnclick=”disconnect()”>断开连接</button>
<hr />
<div id=”message22″></div>
<br />
做题区:<input id=”text” type=”text” />
<!– 频道号:<input id=”toUser” type=”text” /> –>
<button οnclick=”sendMessage()”>发送消息</button>
<div id=”message”></div>
</body>
<script type=”text/javascript”>
var stompClient;
var serverUrl = “http://localhost:8026/websocket?access_token=ed3621ca-44ea-4fc7-a3a4-abe0a84a9155”;
var room;//频道号
var websocket = null;
//websocket连接
function conectWebSocket(){
this.room = document.getElementById(‘room’).value;//频道号
console.log(this.room);
console.log(this.serverUrl);
var socket = new SockJS(this.serverUrl);
this.stompClient = Stomp.over(socket);
var that = this;
this.stompClient.connect({name:”1″,access_token:”4c7695f8-4dbc-4dee-9d51-440dab79b320″}, function (frame) {
//that.stompClient.subscribe(‘/topic/’+that.room ,function(txt) {
// console.log(“websocket连接成功”);
// console.log(txt);
// document.getElementById(‘message’).innerHTML += JSON.parse(txt.body)[‘content’]+ ‘<br/>’;
// const sender = JSON.parse(message.body)[‘sender’];
const language = JSON.parse(message.body)[‘language’];
const content = JSON.parse(message.body)[‘content’];
const type = JSON.parse(message.body)[‘type’];
// });
that.stompClient.subscribe(‘/user/queue/message’,function(txt) {
document.getElementById(‘message’).innerHTML += JSON.parse(txt.body)[‘content’]+ ‘<br/>’;
});
});
}
//发送消息
function sendMessage() {
//获取输入的文本信息进行发送
var message = document.getElementById(‘text’).value;
// var room = document.getElementById(‘toUser’).value;
var socketMsg = {msg:message,toUser:room};
var that = this
this.stompClient.send(
‘/app/test/pointToPoint’,
{},
JSON.stringify({
‘room’: that.room,
‘type’: “1”,//1,2
‘content’: message,
‘userId’:”test”,//小明
‘questionId’:”222299023″,//题目1
‘createTime’:””,
})
);
}
function disconnect() {
//断开连接的方法
if (this.stompClient !== undefined) {
this.stompClient.disconnect();
alert(“Disconnected”);
}else{
alert(“当前没有连接websocket”)
}
this.stompClient = undefined;
}
</script>
</html>
这个是用原生js写的,直接打开即可运行,里面的服务器端的地址和端口根据自己的实际情况进行修改。
———————————这是有一个分割线—————————————————————————–
二更,想了几天,发现之前的版本是有问题的,使用上面的方案rabbitmq不会自动删除queue,也就是说当有用户断开连接后,queu会一直存在于rabbitmq中。当项目运行时间长了以后,rabbitmq中无效的queue会非常多,所以必须对以上的方案进行改进。后来发现如果后端stomp消息代理设置的前缀为/exchange时,这个时候当连接的用户断开连接时。rabbitmq会删除这个queue。
功能虽然基本上实现了,但是还是有一些缺陷。主要在于stomp推送消息是根据userId而不是根据session,也就是说如果用户在两个浏览器同时连接并且订阅了同一个主题时,应该会收到另一个浏览器本应该收到消息(未验证)。后续应该还是应该基于websession来进行精准推送。
ok,写完了。如果你运行上面的代码有什么问题(有点乱,目前还不会用md),欢迎留言评论。
————————————三更——————————————————————————————–
回过头来看了一下这篇文章,发现还有很多可以优化的地方,比如前端可以订阅一个不存在的路径,用户的信息用拦截器来做也不是很好(本文是因为只有一个路径,所以没什么问题)。后面考虑这周重写一篇。
以上。