NIO 通讯 netty 实现的socket 通讯
应用netty 实现异步通信.实现消息推送和反馈,广播的功能
netty:
port: 8888 #监听端口
bossThread: 2 #线程数
workerThread: 2 #线程数
keepalive: true #保持连接
backlog: 100
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha1</version>
</dependency>
//配置参数类
@Data
@Component
@ConfigurationProperties(prefix = "netty")
public class NettyAcctConfig {
private int port;
private int bossThread;
private int workerThread;
private boolean keepalive;
private int backlog;
}
// 启动协助类
@Component
public class TCPServer {
@Autowired
@Qualifier("serverBootstrap")
private ServerBootstrap serverBootstrap;
@Autowired
@Qualifier("tcpSocketAddress")
private InetSocketAddress tcpPort;
private Channel serverChannel;
public void start() throws Exception {
System.out.println("-------------------------###########-------------------------------------"+tcpPort);
serverChannel = serverBootstrap.bind(tcpPort).sync().channel().closeFuture().sync().channel();
}
@PreDestroy
public void stop() throws Exception {
serverChannel.close();
serverChannel.parent().close();
}
}
//创建 程序的启动入口
@Component
public class WebSocketService {
@Autowired
private NettyAcctConfig nettyAccountConfig;
@Bean(name = "bossGroup", destroyMethod = "shutdownGracefully")
public NioEventLoopGroup bossGroup(){
return new NioEventLoopGroup(nettyAccountConfig.getBossThread());
}
@Bean(name = "workerGroup", destroyMethod = "shutdownGracefully")
public NioEventLoopGroup workerGroup(){
return new NioEventLoopGroup(nettyAccountConfig.getWorkerThread());
}
@Bean(name = "tcpSocketAddress")
public InetSocketAddress tcpPost(){
return new InetSocketAddress(nettyAccountConfig.getPort());
}
@Bean(name = "tcpChannelOptions")
public Map<ChannelOption<?>, Object> tcpChannelOptions(){
Map<ChannelOption<?>, Object> options = new HashMap<ChannelOption<?>, Object>();
options.put(ChannelOption.SO_KEEPALIVE, nettyAccountConfig.isKeepalive());
options.put(ChannelOption.SO_BACKLOG, nettyAccountConfig.getBacklog());
return options;
}
@Autowired
@Qualifier("selfChannelInitializer")
private WebSocketChannelHandler webSocketChannelHandler;
@Bean(name = "serverBootstrap")
public ServerBootstrap bootstrap(){
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup(), workerGroup())
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(webSocketChannelHandler); //配置 请求协议和处理 handler
Map<ChannelOption<?>, Object> tcpChannelOptions = tcpChannelOptions();
Set<ChannelOption<?>> keySet = tcpChannelOptions.keySet();
for (@SuppressWarnings("rawtypes") ChannelOption option : keySet) {
b.option(option, tcpChannelOptions.get(option));
}
return b;
}
}
//配置 请求协议和处理 handler
@Component
@Qualifier("selfChannelInitializer")
public class WebSocketChannelHandler extends ChannelInitializer<SocketChannel> {
@Autowired
private WebSocketHandler webSocketHandler;
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast("http-codec",new HttpServerCodec());
socketChannel.pipeline().addLast("aggregator",new HttpObjectAggregator(65535));
socketChannel.pipeline().addLast("http-chunked",new ChunkedWriteHandler());
socketChannel.pipeline().addLast("handler",webSocketHandler); //这里不能使用new,不然在handler中不能注入依赖
socketChannel.pipeline().addLast(new WebSocketServerProtocolHandler("/ws"));//如果需要拦截 第一次链接 就需要放在末尾
}
}
// 客户请求业务逻辑
@Data
@Component
@Qualifier("webSocketHandler")
@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<Object>
{
private WebSocketServerHandshaker handshaker;
private static ConcurrentHashMap<String,Map<String,Object>> userChannelMap = new ConcurrentHashMap<String,Map<String,Object>>();
@Autowired
@Qualifier("tcpSocketAddress")
private InetSocketAddress tcpPort;
private static final String WEB_SOCKET_URL="ws://localhost:8888/websocket";
private ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
//服务端处理客户端的核心方法
@Override
protected void messageReceived(ChannelHandlerContext ctx, Object o) throws Exception {
//处理客户端向服务端发起http握手请求
if(o instanceof FullHttpRequest){
handHttpRequest(ctx,(FullHttpRequest) o);
}else if(o instanceof WebSocketFrame ){ //处理websocket连接业务
handlerWebsocketFrame(ctx, (WebSocketFrame) o);
}
}
private void handlerWebsocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame){
//判断是否未关闭websocket的请求
if(frame instanceof CloseWebSocketFrame){
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
}
//判断是否为ping消息
if(frame instanceof PingWebSocketFrame){
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}
//判断是否为二进制消息
if(!(frame instanceof TextWebSocketFrame)){
System.out.println("目前不支持二进制消息");
throw new RuntimeException("["+this.getClass().getName()+"] 不支持消息");
}
//返回应答消息
//获取客户端向服务端发送的消息
String request = ((TextWebSocketFrame) frame).text();
//获取用户信息
String userId = "";
for(String key : userChannelMap.keySet()){
Map channelMap = userChannelMap.get(key);
if(ctx.channel().id().equals(channelMap.get("channelId"))){
userId = key;
}
}
TextWebSocketFrame tws=new TextWebSocketFrame(sendMsgObjStr(userId,request));
//所有连接上的channel 都发送消息
channels.writeAndFlush(tws);
}
/**
* 功能描述: <br>
* 〈处理客户端向服务端发起的握手请求的〉
*
* @return:
* @since: 1.0.0
* @Author:JG
* @Date: 2019/9/19 21:12
*/
private void handHttpRequest(ChannelHandlerContext ctx,FullHttpRequest req){
//
String uri = req.getUri();//连接时配置当前用户 this.websocket = new WebSocket('ws://localhost:8888/websocket?name='+ this.userName)
String userId = null;
if (null != uri && uri.contains("/websocket") && uri.contains("?")) {
String[] uriArray = uri.split("\\?");
if (null != uriArray && uriArray.length > 1) {
String[] paramsArray = uriArray[1].split("=");
if (null != paramsArray && paramsArray.length > 1) {
userId = paramsArray[1];
Map<String,Object> channelMap = new HashMap<>();
channelMap.put("channelId",ctx.channel().id());
channelMap.put("channelObj",ctx.channel());
userChannelMap.put(userId,channelMap);
}
}
}
if(!req.getDecoderResult().isSuccess() || !("websocket".equals(req.headers().get("upgrade")))){
sendHttpResponse(ctx,req,new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}
//http websocket 握手请求处理逻辑
String url="ws://"+tcpPort+"/websocket";
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(url,null,false);
handshaker=wsFactory.newHandshaker(req);
if(handshaker==null){
WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
}else{
handshaker.handshake(ctx.channel(),req);
}
//添加channel
channels.add(ctx.channel());
channels.writeAndFlush(new TextWebSocketFrame(sendMsgObjStr(userId,"加入")));
}
/**
* 功能描述: <br>
* 〈处理 服务端向客户端的响应信息〉
*
[ctx, request]
* @return:void
* @since: 1.0.0
* @Author:JG
* @Date: 2019/9/19 21:15
*/
private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse resp){
if(resp.getStatus().code()!=200){
ByteBuf buffer = Unpooled.copiedBuffer(resp.getStatus().toString(), CharsetUtil.UTF_8);
resp.content().writeBytes(buffer);
buffer.release();
}
//服务端向客户端发送数据
ChannelFuture future=ctx.channel().writeAndFlush(resp);
if(resp.getStatus().code()!=200){
future.addListener(ChannelFutureListener.CLOSE);
}
}
//出现异常时调用
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
//关闭连接
ctx.close();
}
//服务端读取信息结束时调用
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
public String sendMsgObjStr(String userId,String message){
Gson g = new Gson();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:MM:ss");
Map<String,Object> messageMap = new HashMap();
messageMap.put("userId",userId);
messageMap.put("message",message);
messageMap.put("createTime",dateFormat.format(new Date()));
return g.toJson(messageMap);
}
//客户端与服务端创建连接时调用
/*
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
channels.add(ctx.channel())
channels.writeAndFlush(new TextWebSocketFrame("[服务器] - "+ctx.channel().remoteAddress()+"加入"));
}
*/
//客户端与服务端断开
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
channels.remove(ctx.channel());
String userId = "";
for(String key : userChannelMap.keySet()){
Map channelMap = userChannelMap.get(key);
if(ctx.channel().id().equals(channelMap.get("channelId"))){
userId = key;
}
}
userChannelMap.remove(userId);
channels.writeAndFlush(new TextWebSocketFrame(sendMsgObjStr(userId,"下线")));
}
public void sendAllMessage(String message) {
channels.writeAndFlush(new TextWebSocketFrame(sendMsgObjStr("System",message)));
}
// 此为单点消息
public void sendOneMessage(String userId, String message) {
for(String key : userChannelMap.keySet()){
if(key.equals(userId)){
Map channelMap = userChannelMap.get(key);
Channel channel = (Channel) channelMap.get("channelObj");
channel.writeAndFlush(new TextWebSocketFrame(sendMsgObjStr("System",message)));
}
}
}
}
//前台
<script type="text/javascript">
var socket;
if(!window.WebSocket){
window.WebSocket=window.MozWebSocket;
}
if(window.WebSocket){
socket = new WebSocket("ws://localhost:8888/websocket");
socket.onmessage=function (e) {
var tag= document.getElementById("responseContext");
tag.value += e.data + "\r\n";
};
socket.onopen =function (p1) {
var tag= document.getElementById("responseContext");
tag.value =" 连接已经建立 请进行后续操作 \r\n";
}
socket.onclose= function (p1) {
var tag= document.getElementById("responseContext");
tag.value = "";
tag.value= "连接已经关闭"
}
}else{
alert("你的浏览器不支持websocket")
}
function send (messgae) {
if(!window.WebSocket){
return
}
if(socket.readyState == WebSocket.OPEN){
socket.send(messgae);
}else{
alert("websocket 连接未建立")
}
}
</script>
SpringBoot websocket
STOMP(Simple Text-Orientated Messaging Protocol) 面向消息的简单文本协议
WebSocket是一个消息架构,不强制使用任何特定的消息协议,它依赖于应用层解释消息的含义;
与处在应用层的HTTP不同,WebSocket处在TCP上非常薄的一层,会将字节流转换为文本/二进制消息,因此,对于实际应用来说,WebSocket的通信形式层级过低,因此,可以在 WebSocket 之上使用 STOMP协议,来为浏览器 和 server间的 通信增加适当的消息语义。
如何理解 STOMP 与 WebSocket 的关系:
1) HTTP协议解决了 web 浏览器发起请求以及 web 服务器响应请求的细节,假设 HTTP 协议 并不存在,只能使用 TCP 套接字来 编写 web 应用,你可能认为这是一件疯狂的事情;
2) 直接使用 WebSocket(SockJS) 就很类似于 使用 TCP 套接字来编写 web 应用,因为没有高层协议,就需要我们定义应用间所发送消息的语义,还需要确保连接的两端都能遵循这些语义;
3) 同 HTTP 在 TCP 套接字上添加请求-响应模型层一样,STOMP 在 WebSocket 之上提供了一个基于帧的线路格式层,用来定义消息语义;
STOMP 实现socket 通讯
核心类: WebSocketMessageBrokerConfigurer 实现类
@EnableWebSocketMessageBroker
@Configuration
表示使用STOMP协议来传输基于消息代理的消息,此时可以在@Controller类中使用@MessageMapping 来接受stomp的send请求
主要实现:
实现 WebSocketMessageBrokerConfigurer 接口,
重写 registerStompEndpoints 通过 StompEndpointRegistry 注册连接地址,注册HandshakeInterceptor 实现类,实现在握手前后的操作。 注册DefaultHandshakeHandler 实现类 ,用于配置用户名称或者一些权限操作
重写 configureMessageBroker 配置代理目的地的前缀为 /topic 或者 /queue , /app 客户端发送前缀 即请求MessageMapping 的路径前缀。/user 点对点通讯时broke前缀,即配置心跳测试
发布/订阅:Topic 可以重复消费,不管消费与否不会保存 广播
点对点:Queue 不可以重复消费, 没人消费会保存
重写 configureWebSocketTransport 设置 消息字节大小,缓存大小,发送超时时间
重写 configureClientInboundChannel 设置输入消息通道 配置线程池 通过注册 ChannelInterceptor 实现类,实现 发送前和发送完毕的 一些核心操作,信息过滤 ,持久等。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
WebSocketMessageBrokerConfigurer 实现类
/**
* STOMP协议
* EnableWebSocketMessageBroker 表示使用STOMP协议来传输基于消息代理的消息,此时可以在@Controller类中使用@MessageMapping
*/
@Configuration
@EnableWebSocketMessageBroker
public class StompMessageBroke implements WebSocketMessageBrokerConfigurer {
public void registerStompEndpoints(StompEndpointRegistry registry) {
/**
* 注册 Stomp的端点
* addEndpoint:添加STOMP协议的端点。这个HTTP URL是供WebSocket或SockJS客户端访问的地址
* withSockJS:指定端点使用SockJS协议
* setHandshakeHandler 可以设置监控 socket运行和用户等
*/
registry.addEndpoint("/websocket-simple")
.setAllowedOrigins("*") // 添加允许跨域访问
.addInterceptors(new MyHandShakeInterceptor())
.setHandshakeHandler(new PrincipalHandshakeHandler())
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
/**
* 配置消息代理
* enableStompBrokerRelay 配置外部的STOMP服务 设置密码 代理host 默认为localhost 代理端口 默认为61613
* 启动简单Broker
* 可以配置多个,此段代码配置代理目的地的前缀为 /topic 或者 /queue 我们就可以给订阅配置的域的客户端推送消息
* 固定到某个用户是用/queue 的前缀 前端订阅 /user/queue 的前缀
* 发布/订阅:Topic 可以重复消费,不管消费与否不会保存 广播
* 点对点:Queue 不可以重复消费, 没人消费会保存
*/
// 自定义调度器,用于控制心跳线程
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(1);
taskScheduler.setThreadNamePrefix("websocket-heartbeat-thread-");
taskScheduler.initialize();
registry.enableSimpleBroker("/topic", "/queue")
.setHeartbeatValue(new long[]{10000,10000})
.setTaskScheduler(taskScheduler);
//客户端发送前缀 都会路由到带有@MessageMapping 注解的方法中
registry.setApplicationDestinationPrefixes("/app");
// 配置一对一消息前缀 默认也是/user/
registry.setUserDestinationPrefix("/user");
}
//配置发送与接收的消息参数,可以指定消息字节大小,缓存大小,发送超时时间
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
registration.setMessageSizeLimit(500 * 1024 * 1024);
registration.setSendBufferSizeLimit(1024 * 1024 * 1024);
registration.setSendTimeLimit(200000);
}
/**
* 设置输入消息通道 配置线程池
* 配置消息拦截器 实现ChannelInterceptor接口
* @param registration
*/
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.taskExecutor().corePoolSize(5)
.maxPoolSize(10)
.keepAliveSeconds(30);
registration.interceptors(new InMessageIntercepter());
}
/**
* 设置输出消息通道
* @param registration
*/
@Override
public void configureClientOutboundChannel(ChannelRegistration registration) {
registration.taskExecutor().corePoolSize(5)
.maxPoolSize(10)
.keepAliveSeconds(30);
registration.interceptors(new OutMessageIntercepter());
}
}
DefaultHandshakeHandler 实现类
public class PrincipalHandshakeHandler extends DefaultHandshakeHandler {
@Override
protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
/**
* 这边可以按你的需求,如何获取唯一的值,既unicode
* 得到的值,会在监听处理连接的属性中,既WebSocketSession.getPrincipal().getName()
* 也可以自己实现Principal()
*/
if (request instanceof ServletServerHttpRequest) {
ServletServerHttpRequest servletServerHttpRequest = (ServletServerHttpRequest) request;
HttpServletRequest httpRequest = servletServerHttpRequest.getServletRequest();
/**
* 这边就获取你最熟悉的陌生人,携带参数,你可以cookie,请求头,或者url携带,这边我采用url携带
*/
final String userId = httpRequest.getParameter("userId");
if (StringUtils.isEmpty(userId)) {
return null;
}
return new Principal() {
@Override
public String getName() {
return userId;
}
};
}
return null;
}
}
HandshakeInterceptor 实现类
//OriginHandshakeInterceptor:检查Origin头字段的合法性
public class MyHandShakeInterceptor implements HandshakeInterceptor {
@Override
public boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception {
System.out.println(this.getClass().getCanonicalName() + "http协议转换websoket协议进行前, 握手前"+serverHttpRequest.getURI());
return true;
}
@Override
public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {
System.out.println(this.getClass().getCanonicalName() + "握手成功后...");
}
}
ChannelInterceptor 实现类
/**
* 客户端发送消息处理
*/
public class InMessageIntercepter implements ChannelInterceptor {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
//发送前消息过滤
//离开进入操作
Object payload = message.getPayload();
MessageHeaders headers = message.getHeaders();
SimpMessageType simpMessageType = (SimpMessageType)headers.get("simpMessageType");
String simpleSessionId = (String) headers.get("simpSessionId");
if("CONNECT".equals(simpMessageType.name())){
}
if("DISCONNECT".equals(simpMessageType.name())){
}
if("SEND".equals(simpMessageType.name())){
System.out.println(simpMessageType.name()+"==in==");
}
if(headers.containsKey("stompCommand")){
StompCommand stompCommand = (StompCommand) headers.get("stompCommand");
}
return message;
}
@Override
public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
//发送成功后进行 数据存储 过滤心跳
Object payload = message.getPayload();
MessageHeaders headers = message.getHeaders();
}
}
/**
* 服务端发送消息处理
*/
public class OutMessageIntercepter implements ChannelInterceptor {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
//发送前消息过滤
Object payload = message.getPayload();
MessageHeaders headers = message.getHeaders();
SimpMessageType simpMessageType = (SimpMessageType)headers.get("simpMessageType");
String simpleSessionId = (String) headers.get("simpSessionId");
if("MESSAGE".equals(simpMessageType.name())){
System.out.println(simpMessageType.name()+"==out==");
}
return message;
}
@Override
public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
Object payload = message.getPayload();
MessageHeaders headers = message.getHeaders();
if(sent){
SimpMessageType simpMessageType = (SimpMessageType)headers.get("simpMessageType");
String simpleSessionId = (String) headers.get("simpSessionId");
if("MESSAGE".equals(simpMessageType.name())){
//发送成功后进行 数据存储 过滤心跳
}
}
}
}
controller
@Controller
public class BroadcastCtl {
// 收到消息记数
private AtomicInteger count = new AtomicInteger(0);
@Autowired
private SimpMessagingTemplate messagingTemplate;
/**
* @MessageMapping 指定要接收消息的地址,类似@RequestMapping。除了注解到方法上,也可以注解到类上
* @SendTo默认 消息将被发送到与传入消息相同的目的地
* 消息的返回值是通过{@link org.springframework.messaging.converter.MessageConverter}进行转换
* @param requestMessage
* @return
*/
@MessageMapping({"/receive"})
@SendTo("/topic/getResponse")
//@SendToUser("/topic/getResponse")
//前端推送的地址 send 调用的地址,发送到 /topic/getResponse 然后订阅 /topic/getResponse 用户就会收到 广播
public ResponseMessage broadcast(RequestMessage requestMessage){
ResponseMessage responseMessage = new ResponseMessage();
responseMessage.setContent("BroadcastCtl receive [" + count.incrementAndGet() + "] records");
return responseMessage;
}
@CrossOrigin
@MessageMapping({"/chatRoom"}) //前端推送的 地址 推送到指定的位置 room 进行广播
public void messageHandling(RequestMessage requestMessage) throws Exception {
String destination = "/topic/" + HtmlUtils.htmlEscape(requestMessage.getRoom());
String sender = HtmlUtils.htmlEscape(requestMessage.getSender()); //htmlEscape 转换为HTML转义字符表示
String type = HtmlUtils.htmlEscape(requestMessage.getType());
String content = HtmlUtils.htmlEscape(requestMessage.getContent());
ResponseMessage response = new ResponseMessage(sender, type, content,new Date());
messagingTemplate.convertAndSend(destination, response);
}
@CrossOrigin
@MessageMapping({"/toUser"}) //前端推送的 地址 针对某人推送
public void singleToUser(RequestMessage requestMessage) throws Exception {
String sender = HtmlUtils.htmlEscape(requestMessage.getSender()); //htmlEscape 转换为HTML转义字符表示
String type = HtmlUtils.htmlEscape(requestMessage.getType());
String content = HtmlUtils.htmlEscape(requestMessage.getContent());
ResponseMessage response = new ResponseMessage(sender, type, content,new Date());
// 点对点
messagingTemplate.convertAndSendToUser(requestMessage.getReceiver(), "/queue/toUser",response);
}
@GetMapping("socketStomp")
public ModelAndView page3(){
return new ModelAndView("socketStomp");
}
@GetMapping("testSendToAll")
public void sendQueueMessage() {
System.out.println("后台群发推送推送!");
ResponseMessage response = new ResponseMessage("admin", "text", "后台群发推送推送!",new Date());
messagingTemplate.convertAndSend("/topic/getResponse",response);
}
}
实体类 消息请求 和 返回实体
ublic class RequestMessage implements Serializable {
private static final long serialVersionUID = -141796491940503606L;
private Long messageId;
private String sender;//消息发送者
private String room;//房间号
private String type;//消息类型
private String content;//消息内容
private String receiver;//消息接受者
private Date sendTime;
public Date getSendTime() {
return sendTime;
}
public void setSendTime(Date sendTime) {
this.sendTime = sendTime;
}
public RequestMessage() {
}
public RequestMessage(String sender,String receiver,String room, String type, String content) {
this.sender = sender;
this.receiver = receiver;
this.room = room;
this.type = type;
this.content = content;
}
public String getSender() {
return sender;
}
public String getRoom() {
return room;
}
public String getType() {
return type;
}
public String getContent() {
return content;
}
public String getReceiver() {
return receiver;
}
public Long getMessageId() {
return messageId;
}
public void setMessageId(Long messageId) {
this.messageId = messageId;
}
public void setSender(String sender) {
this.sender = sender;
}
public void setRoom(String room) {
this.room = room;
}
public void setType(String type) {
this.type = type;
}
public void setContent(String content) {
this.content = content;
}
public void setReceiver(String receiver) {
this.receiver = receiver;
}
}
public class ResponseMessage implements Serializable {
private static final long serialVersionUID = -141964919405031606L;
private Long messageId;
private String sender;
private String type;
private String content;
private Date sendTime;
public Date getSendTime() {
return sendTime;
}
public void setSendTime(Date sendTime) {
this.sendTime = sendTime;
}
public ResponseMessage() {
}
public ResponseMessage(String sender, String type, String content,Date sendTime) {
this.sender = sender;
this.type = type;
this.sendTime =sendTime;
this.content = content;
}
public String getSender() {
return sender;
}
public String getType() {
return type;
}
public String getContent() {
return content;
}
public Long getMessageId() {
return messageId;
}
public void setMessageId(Long messageId) {
this.messageId = messageId;
}
public void setSender(String sender) {
this.sender = sender;
}
public void setType(String type) {
this.type = type;
}
public void setContent(String content) {
this.content = content;
}
}
stomp 前端操作
创建连接 订阅 发送

<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>SOCKET</title>
<!-- VUE -->
<script src="https://cdn.jsdelivr.net/npm/vue/dist/vue.js"></script>
<!-- FormMaking -->
<link rel="stylesheet" href="https://unpkg.com/element-ui/lib/theme-chalk/index.css">
<!-- 引入样式 -->
<link rel="stylesheet" href="https://unpkg.com/element-ui/lib/theme-chalk/index.css">
<!-- 引入组件库 -->
<script src="https://unpkg.com/element-ui/lib/index.js"></script>
<script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.js"></script>
<script type="application/javascript" src="http://cdn.bootcss.com/stomp.js/2.3.3/stomp.min.js"></script>
<script type="application/javascript" src="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.min.js"></script>
</head>
<body>
<div id="vue_det" style="margin:20px;">
<el-container>
<el-header>
<h1>HELLOW SOCKET</h1>
</el-header>
<el-main>
<div style="background-color: #b3e19d;height: 600px;overflow-y:auto">
<div v-for="item,index in messagList" style="height: 50px;" :class="item.self?'textSelf':'textOther'"> {{item.showMessage}} </div>
</div>
</el-main>
<el-footer>
<el-form :inline="true" style="margin-top: 10px" >
<el-form-item label="当前人" prop="userId">
<el-input type="input" v-model="userId" :disabled="true"></el-input>
</el-form-item>
<el-form-item label="输入信息" prop="text">
<el-input type="textarea" v-model="text"></el-input>
</el-form-item>
<el-form-item label="接收人" prop="receiver">
<el-input type="input" v-model="receiver"></el-input>
</el-form-item>
<el-form-item>
<el-button type="primary" @click="send">发送</el-button>
</el-form-item>
</el-form>
</el-footer>
</el-container>
</div>
</body>
<script type="text/javascript">
var vm = new Vue({
el: '#vue_det',
data: {
userId:"",
lockReconnect:false, //防止重复连接
text:"",
receiver:"",
messagList:[]
},
mounted() {
//测试 用户名 随机
var arr= ["a","b","c","d"];
var name = "";
for(var x =0 ; x <4;x++){
var key = Math.floor(Math.random()*10 /3)
name += arr[key];
}
this.userId = name;
// wss 为请求 https 安全协议
//非安全的
var url = 'http://localhost:28087/websocket-simple?userId='+ this.userId
this.createWebSocket(url);
},
methods: {
createWebSocket(wsUrl) {
var that = this;
console.log(wsUrl);
if(typeof(WebSocket) == "undefined") {
console.log("您的浏览器不支持WebSocket");
}else {
var socket1 = new SockJS(wsUrl);
that.stomp = Stomp.over(socket1);
//连接
that.stomp.connect({}, function (frame) {
//订阅主题
that.stomp.subscribe("/topic/getResponse", function (res) {
console.log(res.body);
that.msgRemake(res.body)
});
//群发订阅
that.stomp.subscribe("/queue/getResponse", function (res) {
console.log(res.body);
that.msgRemake(res.body)
});
//用户模式
that.stomp.subscribe("/user/queue/toUser", function (res) {
console.log(res.body);
that.msgRemake(res.body)
});
});
}
},
msgRemake(data){
var obj = JSON.parse(data)
obj.showMessage = obj.sender +" "+ obj.sendTime +"\r\n"+obj.content;
if(obj.sender === this.userId){
obj.self = true;
}else{
obj.self = false;
const h = this.$createElement;
this.$notify({
title: '标题名称',
message: h('i', { style: 'color: teal'}, obj.showMessage)
});
}
this.messagList.push(obj);
},
send(){
var msg = {
"sender":this.userId,
"receiver":this.receiver,
"room" :"",
"type" :"text",
"content":this.text,
"sendTime":new Date()
};
this.msgRemake(JSON.stringify(msg)); //增加到屏幕
this.stomp.send("/app/toUser", {}, JSON.stringify(msg));
},
close(){
this.stomp.disconnect();
}
},
beforeDestroy(){
this.close();
}
})
</script>
<style scoped>
.el-header {
background-color: #B3C0D1;
color: #333;
text-align: center;
height:180px;
}
.el-main {
padding: 0;
}
.el-footer {
background-color: #B3C0D1;
color: #333;
line-height: 30px;
}
.el-carousel__item h3 {
color: #475669;
font-size: 14px;
opacity: 0.75;
line-height: 200px;
margin: 0;
}
.el-carousel__item:nth-child(2n) {
background-color: #99a9bf;
}
.el-carousel__item:nth-child(2n+1) {
background-color: #d3dce6;
}
.textOther {
text-align: left;
background-color: #00a0e9;border: solid 1px black
}
.textSelf {
text-align: right;
background-color: #BAC498;border: solid 1px black
}
</style>
</html>
WebSocket 实现socket 通讯
核心类 实现 WebSocketConfigurer
@EnableWebSocket
@Configuration
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
/**
* WebSocketHandlerRegistry 方式注册不用的路径不用的 AbstractWebSocketHandler 实现类
* @param registry
*/
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry
.addHandler(new WebSocketAuthHandler(), "socketJs")
.addInterceptors(new MyHandShakeInterceptor()) //三次握手
.setAllowedOrigins("*")
.withSockJS();
}
/**
* 配置 ServerEndpointExporter 就可以 用@ServerEndpoint 来注册一个服务类.
* @ServerEndpoint("/imserver/{userId}")
* @OnOpen
* @OnClose
*
* @return
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
/**
雪花算法生成id 业务使用的我临时放在这里来注入容器
**/
@Bean
public IdWorker idWorker(){
return new IdWorker(1,1,1);
}
}
方式1 启动 ServerEndpointExporter 通过 @ServerEndpoint("/imserver/{userId}") 发布 websocket
@ServerEndpoint("/imserver/{userId}")
public class SocketEndPointService {
@Autowired
private IdWorker idWorker;
//针对独立的会话
private static ConcurrentHashMap<String,SocketEndPointService> singleSessions = new ConcurrentHashMap<>();
//针对房间群里session管理
private static ConcurrentHashMap<String, Set<SocketEndPointService>> roomSessions = new ConcurrentHashMap<>();
/**与某个客户端的连接会话,需要通过它来给客户端发送数据*/
private Session session;
/**接收userId*/
private String userId="";
/**接收userId*/
private String roomId="";
/**
* 连接建立成功调用的方法*/
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId, @RequestParam("roomId") String roomId) {
this.session = session;
this.userId=userId;
this.roomId=roomId;
if(StringUtils.isEmpty(roomId)){
singleSessions.put(userId,this);
}else{
if(roomSessions.contains(roomId)){
Set<SocketEndPointService> webSocketServers = roomSessions.get(roomId);
webSocketServers.add(this);
}
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
if(StringUtils.isEmpty(roomId)) {
Set<SocketEndPointService> webSocketServers = roomSessions.get(roomId);
webSocketServers.remove(this);
}else{
singleSessions.remove(this.userId);
}
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息*/
@OnMessage
public void onMessage(String message, Session session) {
RequestMessage requestMessage = JSONUtil.toBean(message,RequestMessage.class);
if(requestMessage.getMessageId() == null){
requestMessage.setMessageId(idWorker.nextId());
}
ResponseMessage responseMessage = new ResponseMessage();
BeanUtil.copyProperties(responseMessage,requestMessage);
responseMessage.setSendTime(new Date());
if(StringUtils.isEmpty(roomId)) {
Set<SocketEndPointService> webSocketServers = roomSessions.get(roomId);
//给所有该房间的人发消息,排出自己,并
for(SocketEndPointService webSocketServer: webSocketServers){
if(webSocketServer.equals(this)) continue;
webSocketServer.session.getAsyncRemote().sendText(JSONUtil.toJsonStr(responseMessage));
}
}else{
//个人发送
String receiverId = requestMessage.getReceiver();
if(singleSessions.containsKey(receiverId))
singleSessions.get(receiverId).session.getAsyncRemote().sendText(JSONUtil.toJsonStr(responseMessage));
}
}
/**
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
error.printStackTrace();
}
/**
* 实现服务器主动推送
*/
public void sendMessage(String message) throws IOException {
ResponseMessage responseMessage = new ResponseMessage();
responseMessage.setSender("admin");
responseMessage.setSendTime(new Date());
responseMessage.setContent(message);
responseMessage.setType("application/text");
responseMessage.setMessageId(idWorker.nextId());
JSONObject j = new JSONObject(responseMessage);
this.session.getBasicRemote().sendText(j.toString());
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SocketEndPointService that = (SocketEndPointService) o;
return Objects.equals(userId, that.userId) ;
}
@Override
public int hashCode() {
return Objects.hash(session, userId, roomId);
}
}
方式2 实现 WebSocketConfigurer 通过 WebSocketHandlerRegistry 注册 某个路径 和对应的 消息处理的 TextWebSocketHandler 的实现类
集成 TextWebSocketHandler 进行一些 建立连接 关闭连接 发送消息的操作
public class WebSocketAuthHandler extends TextWebSocketHandler {
/**
* socket 建立成功事件
*
* @param session
* @throws Exception
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
Object token = session.getAttributes().get("token");
/* if (token != null) {
// 用户连接成功,放入在线用户缓存
// WsSessionManager.add(token.toString(), session);
} else {
throw new RuntimeException("用户登录已经失效!");
}*/
}
/**
* 接收消息事件
*
* @param session
* @param message
* @throws Exception
*/
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
// 获得客户端传来的消息
String payload = message.getPayload();
Object token = session.getAttributes().get("token");
System.out.println("server 接收到 " + token + " 发送的 " + payload);
session.sendMessage(new TextMessage("server 发送给 " + token + " 消息 " + payload + " " + LocalDateTime.now().toString()));
}
/**
* socket 断开连接时
*
* @param session
* @param status
* @throws Exception
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
Object token = session.getAttributes().get("token");
if (token != null) {
// 用户退出,移除缓存
// WsSessionManager.remove(token.toString());
}
}
}
websocket vue 前端连接
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>VUE SINGLE</title>
<!-- VUE -->
<script src="https://cdn.jsdelivr.net/npm/vue/dist/vue.js"></script>
<!-- 引入样式 -->
<link rel="stylesheet" href="https://unpkg.com/element-ui/lib/theme-chalk/index.css">
<!-- 引入组件库 -->
<script src="https://unpkg.com/element-ui/lib/index.js"></script>
<script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.js"></script>
<script type="application/javascript" src="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.min.js"></script>
</head>
<body>
<div id="vue_det">
<div style="background-color: #b3e19d;width: 1200px;height: 600px;overflow-y:auto">
<div v-for="item,index in messagList" style="width: 1200px;height: 50px;" :class="item.self?'textSelf':'textOther'"> {{item.showMessage}} </div>
</div>
<el-form :inline="true">
<el-form-item label="输入信息" prop="desc">
<el-input type="textarea" v-model="text"></el-input>
</el-form-item>
<el-form-item>
<el-button type="primary" @click="send">发送</el-button>
</el-form-item>
</el-form>
</div>
</body>
<script type="text/javascript">
var vm = new Vue({
el: '#vue_det',
data(){
return {
userId:"",
lockReconnect:false, //防止重复连接
text:"",
messagList:[]
}
},
mounted () {
//测试 用户名 随机
var arr= ["a","b","c","d"];
var name = "";
for(var x =0 ; x <4;x++){
var key = Math.floor(Math.random()*10 /3)
name += arr[key];
}
this.userId = name;
// wss 为请求 https 安全协议
//非安全的
var url = 'http://localhost:28087/socketJs?name='+ this.userId
this.createWebSocket(url);
},
beforeDestroy () {
this.onbeforeunload()
},
methods: {
createWebSocket(wsUrl) {
try {
if(typeof(WebSocket) == "undefined") {
console.log("您的浏览器不支持WebSocket");
}else
this.websocket = new WebSocket(wsUrl);
//初始化
this.initWebSocket();
} catch(e) {
console.log('catch');
//异常后重新连接
this.reconnect();
}
},
initWebSocket () {
// 连接错误
this.websocket.onerror = this.setErrorMessage
// 连接成功
this.websocket.onopen = this.setOnopenMessage
// 收到消息的回调
this.websocket.onmessage = this.setOnmessageMessage
// 连接关闭的回调
this.websocket.onclose = this.setOncloseMessage
// 监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
window.onbeforeunload = this.onbeforeunload
},
setErrorMessage () {
console.log('WebSocket连接发生错误 状态码:' + this.websocket.readyState)
},
setOnopenMessage () {
console.log('WebSocket连接成功 状态码:' + this.websocket.readyState)
//心跳检测重置
//this.heartCheck().start();
},
setOnmessageMessage (event) {
// 根据服务器推送的消息做自己的业务处理 {}
var obj = JSON.parse( event.data)
obj.showMessage = obj.userId +" "+ obj.createTime +"\r\n"+obj.message;
if(obj.userId === this.userId){
obj.self = true;
}else{
obj.self = false;
}
this.messagList.push(obj);
console.log(obj.showMessage);
const h = this.$createElement;
this.$notify({
title: '标题名称',
message: h('i', { style: 'color: teal'}, obj.showMessage)
});
},
setOncloseMessage () {
console.log('WebSocket连接关闭 状态码:' + this.websocket.readyState)
},
onbeforeunload () {
this.closeWebSocket()
},
closeWebSocket () {
this.websocket.close()
},
send(){
this.websocket.send(this.text);
},
reconnect() {
if(this.lockReconnect) {
return;
};
this.lockReconnect = true;
//没连接上会一直重连,设置延迟避免请求过多
tt && clearTimeout(tt);
tt = setTimeout(function () {
createWebSocket();
this.lockReconnect = false;
}, 4000);
},
//心跳检测
heartCheck(){
var heartCheck = {
timeout: 210000,
timeoutObj: null,
serverTimeoutObj: null,
start: function(){
var self = this;
this.timeoutObj && clearTimeout(this.timeoutObj);
this.serverTimeoutObj && clearTimeout(this.serverTimeoutObj);
this.timeoutObj = setTimeout(function(){
//这里发送一个心跳,后端收到后,返回一个心跳消息,
//onmessage拿到返回的心跳就说明连接正常
console.log(getNowTime() +' Socket 连接重试');
//socket.send("连接成功");
self.serverTimeoutObj = setTimeout(function() {
}, self.timeout);
}, this.timeout)
}
}
return heartCheck
}
}
});
</script>
<style scoped="">
.textOther {
text-align: left;
background-color: #00a0e9;border: solid 1px black
}
.textSelf {
text-align: right;
background-color: #BAC498;border: solid 1px black
}
</style>
</html>
版权声明:本文为Jerome_jia原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。