基于Socket实现的后台服务

  • Post author:
  • Post category:其他


基于Socket实现的后台服务,用于接收客户端的心跳消息,并根据心跳消息来维护客户端连接。

具体实现中,服务启动后会创建一个Socket监听器,等待客户端的连接请求。当客户端连接成功后,服务会为每个连接创建一个Task实例,用于接收客户端发送的心跳消息,并根据心跳消息更新心跳时间戳。服务还会为每个连接启动一个独立的Task实例,用于定时向客户端发送心跳消息,以保持连接的活跃状态。

如果服务在一定时间内没有收到客户端发送的心跳消息,就会认为客户端已经掉线,服务会关闭连接并从连接列表中移除该客户端。

此服务适用于需要实现长连接的场景,例如实时消息推送、在线游戏等。

using Microsoft.Extensions.Hosting;
using MSEBP.Kernel.Common.Logging;
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Authorization.WebApi
{
    /// <summary>
    /// 官方介绍主要是用来实现实现后台任务
    /// </summary>
    public class SocketBackgroundService : IHostedService, IDisposable
    {
        private readonly ILogger _logger;

        /// <summary>
        /// 心跳间隔(毫秒)
        /// </summary>
        private readonly int _heartBeatInterval = 30000;
        /// <summary>
        /// 心跳超时时间(毫秒)
        /// </summary>
        private readonly int _heartBeatTimeout = 60000;
        private readonly CancellationTokenSource _cts = new CancellationTokenSource();

        private readonly int _clientIdLength = 10;

        /// <summary>
        /// 存储每个客户端的 Socket 对象
        /// </summary>
        public static ConcurrentDictionary<string, Socket> _clients = new ConcurrentDictionary<string, Socket>();

        /// <summary>
        /// 构造函数
        /// </summary>
        /// <param name="logger"></param>
        public SocketBackgroundService(ILogger logger)
        {
            _logger = logger;
        }

        /// <summary>
        /// 服务开始执行的代码
        /// </summary>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        public async Task StartAsync(CancellationToken cancellationToken)
        {
            _ = Task.Run(async () =>
            {
                try
                {
                    IPAddress localIp = Dns.GetHostEntry(Dns.GetHostName()).AddressList.FirstOrDefault(ip => ip.AddressFamily == AddressFamily.InterNetwork);
                    if (localIp != null)
                    {
                        Console.WriteLine($"Socket IP address: {localIp}");
                        IPEndPoint localEndPoint = new IPEndPoint(localIp, 8181);
                        Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
                        listener.Bind(localEndPoint);
                        listener.Listen(100);
                        while (!_cts.IsCancellationRequested)
                        {
                            try
                            {
                                Socket client = await listener.AcceptAsync();
                                _ = Task.Run(async () =>
                                {
                                    await ReceiveMessageAsync(client);
                                }, _cts.Token);
                            }
                            catch (Exception ex)
                            {
                                _logger.Error($"客户端连接", ex);
                                Console.WriteLine(ex.ToString());
                            }
                        }
                        listener.Close();
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.ToString());
                }
            }, _cts.Token);
        }


        /// <summary>
        /// 接受客户端心跳
        /// </summary>
        /// <param name="socket"></param>
        /// <returns></returns>
        private async Task ReceiveHeartbeatAsync(Socket socket)
        {
            // 初始化心跳时间戳
            DateTime heartbeatTimestamp = DateTime.UtcNow;
            CancellationToken token = _cts.Token;

            while (!token.IsCancellationRequested)
            {
                try
                {
                    // 接收客户端消息
                    byte[] buffer = new byte[1024];
                    int count = await socket.ReceiveAsync(new ArraySegment<byte>(buffer), SocketFlags.None, token);
                    // 处理客户端消息
                    string message = Encoding.UTF8.GetString(buffer, 0, count);
                    if (message.Contains("heartbeat"))
                    {
                        // 更新心跳时间戳
                        heartbeatTimestamp = DateTime.UtcNow;
                        string clientId = message.Substring(0, Math.Min(message.Length, _clientIdLength));
                        _clients.TryAdd(clientId, socket);
                    }
                    else if (string.IsNullOrWhiteSpace(message))
                    {
                        // 关闭连接 移除客户端连接
                        foreach (var item in _clients)
                        {
                            if (item.Value == socket)
                            {
                                _clients.TryRemove(item.Key, out _);
                                break;
                            }
                        }
                        socket.Close();
                        break;
                    }
                    else
                    {
                        //业务逻辑处理
                    }

                    // 检测心跳超时
                    if ((DateTime.UtcNow - heartbeatTimestamp).TotalMilliseconds > _heartBeatTimeout)
                    {
                        // 关闭连接 移除客户端连接
                        string clientId = message.Substring(0, Math.Min(message.Length, _clientIdLength));
                        _clients.TryRemove(clientId, out _);
                        socket.Close();
                        break;
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.ToString());
                }
                finally
                {
                    // 关闭连接 移除客户端连接
                    socket.Close();
                }
            }

        }

        /// <summary>
        /// 启动Socket发送心跳检测
        /// </summary>
        /// <param name="socket"></param>
        /// <returns></returns>
        private async Task StartHeartbeatMonitoringAsync(Socket socket)
        {
            CancellationToken token = _cts.Token;
            while (!token.IsCancellationRequested)
            {
                {
                    // 发送心跳消息
                    byte[] heartbeatMessage = Encoding.UTF8.GetBytes("heartbeat");
                    socket.Send(heartbeatMessage);
                    // 等待心跳间隔
                    await Task.Delay(_heartBeatInterval);
                    // 检测心跳超时
                    if (socket.Poll(_heartBeatTimeout, SelectMode.SelectRead) && socket.Available == 0)
                    {
                        // 关闭连接
                        socket.Close();
                        break;
                    }
                }
                // 移除客户端连接
                IPAddress clientIpAddress = ((IPEndPoint)socket.RemoteEndPoint).Address;
                _clients.TryRemove(clientIpAddress.ToString(), out _);
            }
        }



        /// <summary>
        /// 停止
        /// </summary>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        public async Task StopAsync(CancellationToken cancellationToken)
        {
            _cts.Cancel();
            await Task.CompletedTask;
        }

        /// <summary>
        /// 释放
        /// </summary>
        public void Dispose()
        {
            _cts.Dispose();
        }
    }
}



版权声明:本文为qq165285727原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。