基于Socket实现的后台服务,用于接收客户端的心跳消息,并根据心跳消息来维护客户端连接。
具体实现中,服务启动后会创建一个Socket监听器,等待客户端的连接请求。当客户端连接成功后,服务会为每个连接创建一个Task实例,用于接收客户端发送的心跳消息,并根据心跳消息更新心跳时间戳。服务还会为每个连接启动一个独立的Task实例,用于定时向客户端发送心跳消息,以保持连接的活跃状态。
如果服务在一定时间内没有收到客户端发送的心跳消息,就会认为客户端已经掉线,服务会关闭连接并从连接列表中移除该客户端。
此服务适用于需要实现长连接的场景,例如实时消息推送、在线游戏等。
using Microsoft.Extensions.Hosting;
using MSEBP.Kernel.Common.Logging;
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Authorization.WebApi
{
/// <summary>
/// 官方介绍主要是用来实现实现后台任务
/// </summary>
public class SocketBackgroundService : IHostedService, IDisposable
{
private readonly ILogger _logger;
/// <summary>
/// 心跳间隔(毫秒)
/// </summary>
private readonly int _heartBeatInterval = 30000;
/// <summary>
/// 心跳超时时间(毫秒)
/// </summary>
private readonly int _heartBeatTimeout = 60000;
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
private readonly int _clientIdLength = 10;
/// <summary>
/// 存储每个客户端的 Socket 对象
/// </summary>
public static ConcurrentDictionary<string, Socket> _clients = new ConcurrentDictionary<string, Socket>();
/// <summary>
/// 构造函数
/// </summary>
/// <param name="logger"></param>
public SocketBackgroundService(ILogger logger)
{
_logger = logger;
}
/// <summary>
/// 服务开始执行的代码
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task StartAsync(CancellationToken cancellationToken)
{
_ = Task.Run(async () =>
{
try
{
IPAddress localIp = Dns.GetHostEntry(Dns.GetHostName()).AddressList.FirstOrDefault(ip => ip.AddressFamily == AddressFamily.InterNetwork);
if (localIp != null)
{
Console.WriteLine($"Socket IP address: {localIp}");
IPEndPoint localEndPoint = new IPEndPoint(localIp, 8181);
Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
listener.Bind(localEndPoint);
listener.Listen(100);
while (!_cts.IsCancellationRequested)
{
try
{
Socket client = await listener.AcceptAsync();
_ = Task.Run(async () =>
{
await ReceiveMessageAsync(client);
}, _cts.Token);
}
catch (Exception ex)
{
_logger.Error($"客户端连接", ex);
Console.WriteLine(ex.ToString());
}
}
listener.Close();
}
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
}, _cts.Token);
}
/// <summary>
/// 接受客户端心跳
/// </summary>
/// <param name="socket"></param>
/// <returns></returns>
private async Task ReceiveHeartbeatAsync(Socket socket)
{
// 初始化心跳时间戳
DateTime heartbeatTimestamp = DateTime.UtcNow;
CancellationToken token = _cts.Token;
while (!token.IsCancellationRequested)
{
try
{
// 接收客户端消息
byte[] buffer = new byte[1024];
int count = await socket.ReceiveAsync(new ArraySegment<byte>(buffer), SocketFlags.None, token);
// 处理客户端消息
string message = Encoding.UTF8.GetString(buffer, 0, count);
if (message.Contains("heartbeat"))
{
// 更新心跳时间戳
heartbeatTimestamp = DateTime.UtcNow;
string clientId = message.Substring(0, Math.Min(message.Length, _clientIdLength));
_clients.TryAdd(clientId, socket);
}
else if (string.IsNullOrWhiteSpace(message))
{
// 关闭连接 移除客户端连接
foreach (var item in _clients)
{
if (item.Value == socket)
{
_clients.TryRemove(item.Key, out _);
break;
}
}
socket.Close();
break;
}
else
{
//业务逻辑处理
}
// 检测心跳超时
if ((DateTime.UtcNow - heartbeatTimestamp).TotalMilliseconds > _heartBeatTimeout)
{
// 关闭连接 移除客户端连接
string clientId = message.Substring(0, Math.Min(message.Length, _clientIdLength));
_clients.TryRemove(clientId, out _);
socket.Close();
break;
}
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
finally
{
// 关闭连接 移除客户端连接
socket.Close();
}
}
}
/// <summary>
/// 启动Socket发送心跳检测
/// </summary>
/// <param name="socket"></param>
/// <returns></returns>
private async Task StartHeartbeatMonitoringAsync(Socket socket)
{
CancellationToken token = _cts.Token;
while (!token.IsCancellationRequested)
{
{
// 发送心跳消息
byte[] heartbeatMessage = Encoding.UTF8.GetBytes("heartbeat");
socket.Send(heartbeatMessage);
// 等待心跳间隔
await Task.Delay(_heartBeatInterval);
// 检测心跳超时
if (socket.Poll(_heartBeatTimeout, SelectMode.SelectRead) && socket.Available == 0)
{
// 关闭连接
socket.Close();
break;
}
}
// 移除客户端连接
IPAddress clientIpAddress = ((IPEndPoint)socket.RemoteEndPoint).Address;
_clients.TryRemove(clientIpAddress.ToString(), out _);
}
}
/// <summary>
/// 停止
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task StopAsync(CancellationToken cancellationToken)
{
_cts.Cancel();
await Task.CompletedTask;
}
/// <summary>
/// 释放
/// </summary>
public void Dispose()
{
_cts.Dispose();
}
}
}
版权声明:本文为qq165285727原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。