一、场景描述:
微服务搭建一个平台版本控制平台:实时检查每个应用及应用下各个平台端的在线用户,定点指定各平台下的软件更新。市面上的长连接工具很多,我这里选择socke.io,因为并发这些都是有专业的人测试过了重点是跨平台ios、Android、web等以及各种语言的支持都比较全面。
二、模型简介
三、基本实现
1、springboot集成socket.io长连接
1.1、pom引入依赖
<!-- 长连接 -->
<dependency>
<groupId>com.corundumstudio.socketio</groupId>
<artifactId>netty-socketio</artifactId>
<version>1.7.19</version>
</dependency>
1.2、配置参数初始化
socketio:
# 在系统中发布用系统的ip就好如果是docker等容器就配置为 0.0.0.0
host: 172.16.0.57
port: 10088
# 设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器
maxFramePayloadLength: 1048576
# 设置http交互最大内容长度
maxHttpContentLength: 1048576
# socket连接数大小(如只监听一个端口boss线程组为1即可)
bossCount: 1
workCount: 100
allowCustomRequests: true
# 协议升级超时时间(毫秒),默认10秒。HTTP握手升级为ws协议超时时间
upgradeTimeout: 1000000
# Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件
pingTimeout: 6000000
# Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔
pingInterval: 25000
1.3、长连接启动类配置
import cn.bdk.vcl.domin.GaodeProvinceInfoEnum;
import com.corundumstudio.socketio.SocketConfig;
import com.corundumstudio.socketio.SocketIOServer;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import java.util.Arrays;
import java.util.stream.Collectors;
/**
* 长连接启动类
*/
@Configuration
public class SocketIOConfig implements InitializingBean {
@Value("${socketio.host}")
private String host;
@Value("${socketio.port}")
private Integer port;
@Value("${socketio.bossCount}")
private int bossCount;
@Value("${socketio.workCount}")
private int workCount;
@Value("${socketio.allowCustomRequests}")
private boolean allowCustomRequests;
@Value("${socketio.upgradeTimeout}")
private int upgradeTimeout;
@Value("${socketio.pingTimeout}")
private int pingTimeout;
@Value("${socketio.pingInterval}")
private int pingInterval;
public SocketIOServer socketIOServer;
@Autowired
DefaultSocketIOHandler defaultSocketIOHandler;
@Autowired
ProvinceSpassNameSocketIOHandler provinceSpassNameSocketIOHandler;
@Override
public void afterPropertiesSet() {
SocketConfig socketConfig = new SocketConfig();
socketConfig.setReuseAddress(true);
socketConfig.setTcpNoDelay(true);
socketConfig.setSoLinger(0);
com.corundumstudio.socketio.Configuration configuration = new com.corundumstudio.socketio.Configuration();
configuration.setSocketConfig(socketConfig);
// host在本地测试可以设置为localhost或者本机IP,在Linux服务器跑可换成服务器IP
configuration.setHostname(host);
configuration.setPort(port);
// socket连接数大小(如只监听一个端口boss线程组为1即可)
configuration.setBossThreads(bossCount);
configuration.setWorkerThreads(workCount);
configuration.setAllowCustomRequests(allowCustomRequests);
// 协议升级超时时间(毫秒),默认10秒。HTTP握手升级为ws协议超时时间
configuration.setUpgradeTimeout(upgradeTimeout);
// Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件
configuration.setPingTimeout(pingTimeout);
// Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔
configuration.setPingInterval(pingInterval);
socketIOServer = new SocketIOServer(configuration);
Arrays.stream(GaodeProvinceInfoEnum.values()).peek(item -> {
socketIOServer.addNamespace("/" + item.getValue()).addConnectListener(provinceSpassNameSocketIOHandler);
}).collect(Collectors.toList());
//添加事件监听器
socketIOServer.addListeners(defaultSocketIOHandler);
//启动SocketIOServer
socketIOServer.start();
}
}
1.4、注册事件
import com.corundumstudio.socketio.HandshakeData;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.listener.ConnectListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
/**
* 命名空间链接事件监听
*/
@Component
//@Slf4j
public class ProvinceSpassNameSocketIOHandler implements ConnectListener {
@Resource
RedisTemplate redisTemplate;
@Override
public void onConnect(SocketIOClient client) {
if (!ObjectUtils.isEmpty(client)) {
HandshakeData handshakeData = client.getHandshakeData();
String platformId = handshakeData.getSingleUrlParam("platformId");
String appId = handshakeData.getSingleUrlParam("appId");
String phoneNo = handshakeData.getSingleUrlParam("phoneNo");
if (!StringUtils.isEmpty(platformId)) {
if (!StringUtils.isEmpty(phoneNo) && !phoneNo.equals("0")) {
String key = appId + ":" + platformId;
// 先加平台空间不做推送,统计的时候才会用
client.joinRoom(appId);
client.joinRoom(key);
if (redisTemplate.opsForHash().hasKey("allow", key + ":" + phoneNo)) {
// 版本管理进入白名单
client.joinRoom(key + ":1:allow");
} else {
// 版本管理进入正常名单
client.joinRoom(key + ":1:rurrency");
}
}
}
}
}
}
import com.corundumstudio.socketio.HandshakeData;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.listener.ConnectListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
/**
* 默认主空间连接事件监听
*/
@Component
public class DefaultSocketIOHandler implements ConnectListener {
@Resource
RedisTemplate redisTemplate;
@Override
public void onConnect(SocketIOClient client) {
if (!ObjectUtils.isEmpty(client)) {
HandshakeData handshakeData = client.getHandshakeData();
String platformId = handshakeData.getSingleUrlParam("platformId");
String appId = handshakeData.getSingleUrlParam("appId");
String phoneNo = handshakeData.getSingleUrlParam("phoneNo");
// 先加平台空间不做推送,统计的时候才会用
client.joinRoom(appId);
client.joinRoom(appId + ":" + platformId);
if (!StringUtils.isEmpty(platformId)) {
if (!StringUtils.isEmpty(phoneNo) && !phoneNo.equals("0")) {
//版本更新白名单
String key = appId + ":" + platformId + ":1";
if (redisTemplate.opsForHash().hasKey("allow", key + ":" + phoneNo)) {
client.joinRoom(key + ":allow");
} else {
client.joinRoom(key + ":rurrency");
}
} else {
//适配老版本 更新一版后删除
client.joinRoom(platformId);
}
}
}
}
}
1.5、向外提供用户统计服务
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.corundumstudio.socketio.BroadcastOperations;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIONamespace;
import com.corundumstudio.socketio.SocketIOServer;
import com.google.gson.Gson;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import javax.annotation.Resource;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
* <p>
* 用户实时在线人数统计
* </p>
*
* @author morik
* @since 2022-05-27
*/
@Service
public class PlatformInfoServiceImpl extends ServiceImpl<PlatformInfoMapper, PlatformInfo> implements IPlatformInfoService {
@Resource
private PlatformInfoMapper platformInfoMapper;
@Resource
private PlatformVersionMapper platformVersionMapper;
@Resource
private SocketIOConfig socketIOConfig;
/**
* 根据appid查询所有平台的实施在线人数
*/
public CommonResult<PlatformUseNoListVO> getPlatformUseNo(GetUsageDO getUsageDO) {
LambdaQueryWrapper<PlatformInfo> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(PlatformInfo::getAppId, getUsageDO.getAppId());
List<PlatformInfo> platformInfos = platformInfoMapper.selectList(lambdaQueryWrapper);
if (ObjectUtils.isEmpty(platformInfos) || platformInfos.size() < 1) {
return CommonResult.failed("没有平台被找到");
}
SocketIOServer socketIOServer = socketIOConfig.socketIOServer;
Collection<SocketIONamespace> allNamespaces = socketIOServer.getAllNamespaces();
AtomicInteger appUserNo = new AtomicInteger(0);
//统计应用各个平台使用 21 Android
platformInfos.stream().peek(it -> {
AtomicInteger platUserNo = new AtomicInteger(0);
//统计各行政区划用户使用 /cn50 重庆
allNamespaces.stream().peek(item -> {
getUsageDO.setPlatformId(it.getPlatformId());
getUseNo(getUsageDO, platUserNo, socketIOServer, item, it.getAppId() + ":" + it.getPlatformId());
}).collect(Collectors.toList());
it.setUserNo(platUserNo.get());
appUserNo.addAndGet(platUserNo.get());
}).collect(Collectors.toList());
PlatformUseNoListVO platformUseNoListVO = new PlatformUseNoListVO();
platformUseNoListVO.setPlatformInfos(platformInfos);
platformUseNoListVO.setAppUseNo(appUserNo.get());
return CommonResult.success(platformUseNoListVO);
}
/**
* 查询指定应用使用总数或者查询指定应用的指定平台使用总是
*
* @param getUsageDO
* @re
*/
public CommonResult<String> getAppUseNo(GetUsageDO getUsageDO) {
AtomicInteger appUserNo = new AtomicInteger(0);
AtomicInteger platUserNo = new AtomicInteger(0);
SocketIOServer socketIOServer = socketIOConfig.socketIOServer;
Collection<SocketIONamespace> allNamespaces = socketIOServer.getAllNamespaces();
if (!ObjectUtils.isEmpty(socketIOServer)) {
if (ObjectUtils.isEmpty(getUsageDO.getPlatformId())) {
allNamespaces.stream().peek(item -> {
getUseNo(getUsageDO, appUserNo, socketIOServer, item, getUsageDO.getAppId() + "");
}).collect(Collectors.toList());
return CommonResult.success(appUserNo.get() + "");
} else {
allNamespaces.stream().peek(item -> {
getUseNo(getUsageDO, platUserNo, socketIOServer, item, getUsageDO.getAppId() + ":" + getUsageDO.getPlatformId());
}).collect(Collectors.toList());
return CommonResult.success(platUserNo.get() + "");
}
}
return CommonResult.success("0");
}
private void getUseNo(GetUsageDO getUsageDO, AtomicInteger appUserNo, SocketIOServer socketIOServer, SocketIONamespace item, String key) {
if (!ObjectUtils.isEmpty(item)) {
Collection<SocketIOClient> allClients = item.getAllClients();
if (!ObjectUtils.isEmpty(getUsageDO.getPlatformId())) {
BroadcastOperations roomOperations = item.getRoomOperations(key);
Collection<SocketIOClient> clients = roomOperations.getClients();
appUserNo.addAndGet(clients.size());
}
}
}
}
2、我们这里直接用java客户端来链接测试
pom文件依赖参考
<!-- 长连接 客户端所需包-->
<dependency>
<groupId>io.socket</groupId>
<artifactId>socket.io-client</artifactId>
<version>2.0.1</version>
</dependency>
import cn.hutool.core.date.DateUtil;
import io.socket.client.IO;
import io.socket.client.Socket;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class SocketIOClientLaunch {
public static void main(String[] args) {
// 服务端socket.io连接通信地址
String url = "http://172.16.0.57:10088/cn50";
try {
IO.Options options = new IO.Options();
options.transports = new String[]{"websocket","xhr-polling","jsonp-polling"};
options.reconnectionAttempts = 2;
// 失败重连的时间间隔
options.reconnectionDelay = 1000;
// 连接超时时间(ms)
options.timeout = 500;
// userId: 唯一标识 传给服务端存储
final Socket socket = IO.socket(url + "?platformId=21&appId=1234585&phoneNo=20", options);
socket.on(Socket.EVENT_CONNECT, args1 -> socket.send("hello..."));
// 自定义事件`connected` -> 接收服务端成功连接消息
socket.on("connected", objects -> log.debug("服务端:" + objects[0].toString()));
// 自定义事件`push_data_event` -> 接收服务端消息
socket.on("push_data_event", objects -> log.debug("服务端:" + objects[0].toString()));
// 自定义事件`myBroadcast` -> 接收服务端广播消息
socket.on("myBroadcast", objects -> log.debug("服务端:" + objects[0].toString()));
socket.emit("chatevent","1230");
socket.connect();
int i =1;
while (true) {
Thread.sleep(3000);
i++;
// 自定义事件`push_data_event` -> 向服务端发送消息
socket.emit("push_data_event", "发送数据 " + i);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
3、也可以用JS客户端建立简单的长连接测试(ios、Android等每个平台都有相关sdk几行代码都能集成),官网参考地址:
Client API | Socket.IO
,js下载地址:
Files within /
<html>
<head>
<meta charset="utf-8"/>
<script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/2.1.0/socket.io.js"></script>
</head>
<body>
<script type="text/javascript">
const socket = io("http://172.16.0.57:10088/cn51", {
reconnectionDelayMax: 10000,
transports: ['websocket'],
query: {"platformId": "26","appId":"1234585","phoneNo":"21"}
});
console.log(socket);
socket.on('connect', function () {
console.log("connect")
});
</script>
</body>
</html>
1、当然也可以用react一键集成ios、Android移动端
1.1、package.json引包
"socket.io-client": "^2.4.0"
2.1、代码参考
import React, {Component} from 'react';
const io = require('socket.io-client');
class App extends Component {
componentDidMount(): void {
const socket = io("http://172.16.0.57:10088/chat", {
reconnectionDelayMax: 10000,
transports:['websocket'],
query: {"platformId": 66666}
});
console.log(socket);
socket.on('connect', function() {
console.log("connect")
});
socket.on('connect_error', (error : any) => {
console.log(error)
});
socket.on('message_event', function(data : any) {
console.log("message_event")
console.log(data);
});
socket.emit("push_data_event","react发送的数据");
socket.on("myBroadcast",function (message :any) {
console.log(message);
})
}
render() {
return (
<div>
hett
</div>
);
}
}
2、flutter一键集成参考
2.1、引包
dependencies:
# socket.io包
socket_io_client: ^0.9.9
2.2、代码参考
IO.Socket channel;
@override
void initState() {
super.initState();
_listenWebSocket();
}
void _listenWebSocket() async {
// 构建请求头,可以放一些cookie等信息,
//Map<String, dynamic> headers = new Map();
//headers['origin'] = 'http://127.0.0.1:7001';
// 建立websocket链接
// 链接的书写规范,schame://host:port/namespace, 这里socket_io_client在处理链接时候会把path和后面的query参数都作为namespace来处理,所以如果我们的namespace是/的话,就直接使用http://host/
channel = IO.io('http://172.16.0.57:10088/', <String, dynamic>{
// 请求的path
'path': '/chat',
// 构造的header放这里
'extraHeaders': headers,
// 查询参数,扔这里
'query': {
'EIO': 3,
'transport': 'websocket',
'platformId': 66666,
},
// 说明需要升级成websocket链接
'transports': ['websocket'],
});
// 链接建立成功之后,可以发送数据到socket.io的后端了
channel.on('connect', (_) {
print('connect');
// 发送消息和回调函数给socket.io服务端,在服务端可以直接获取到该方法,然后调用
channel.emitWithAck('exchange', '11111', ack: (data) {
print('ack $data') ;
if (data != null) {
print('from server $data');
} else {
print("Null") ;
}
});
});
// 链接建立失败时调用
channel.on('error', (data){
print('error');
print(data);
});
// 链接出错时调用
channel.on("connect_error", (data) => print('connect_error: '));
// 链接断开时调用
channel.on('disconnect', (_) => print('disconnect======'));
// 链接关闭时调用
channel.on('close', (_) => print('close===='));
// 服务端emit一个message的事件时,可以直接监听到
channel.on('message', (data) {
print('onmessage');
print(data);
});
}
// 关闭websocket链接,避免内存占用
@override
void dispose() {
super.dispose();
print('close');
channel.close();
}
3、Android集成
3.1、引包
implementation 'io.socket:socket.io-client:1.0.0'
3.2、给网络权限
<uses-permission android:name="android.permission.INTERNET"/>
3.3、代码参考
private Socket mSocket;
try {
IO.Options opts = new IO.Options();
opts.query = "platformId=666";
mSocket = IO.socket("http://172.16.0.57:10088/chat", opts);
} catch (URISyntaxException e) {
e.printStackTrace();
}
mSocket.on("connect", new Emitter.Listener() {
@Override
public void call(Object... args) {
runOnUiThread(new Runnable() {
@Override
public void run() {
tv_lj.setText("链接成功");
}
});
}
});
4、ios集成详细配置参考:https://github.com/socketio/socket.io-client-swift
4.1、代码参考
import SocketIO
let manager = SocketManager(socketURL: URL(string: "http://172.16.0.57:10088/chat?platformId=666")!, config: [.log(true), .compress])
let socket = manager.defaultSocket
socket.on(clientEvent: .connect) {data, ack in
print("socket connected")
}
socket.on("currentAmount") {data, ack in
guard let cur = data[0] as? Double else { return }
socket.emitWithAck("canUpdate", cur).timingOut(after: 0) {data in
if data.first as? String ?? "passed" == SocketAckValue.noAck {
// Handle ack timeout
}
socket.emit("update", ["amount": cur + 2.50])
}
ack.with("Got your currentAmount", "dude")
}
socket.connect()
四、启动服务端及客户端查看实时在线人数
版权声明:本文为qq_29653373原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。