C#/.NetCore/.Net6中使用Grpc服务(基础篇)

  • Post author:
  • Post category:其他




一、Grpc概述

Grpc是由google开发的一个高性能、通用的开源RPC框架,主要基于HTTP2协议传输。



1、Grpc支持4种流

a、简单RPC

b、客户端流式RPC

c、服务端流式RPC

d、双向流式RPC



2、示例应用环境&dll

a、使用.net 6控制台(SimpleGrpc.Server 服务端、SimpleGrpc.Client 客户端)

b、所需的dll(NuGet程序包:Grpc.Core、Grpc.Tools、Google.Protobuf)

在这里插入图片描述



二、SimpleGrpc.Server 服务端配置

1、新建文件夹Protos,在该文件夹中添加proto文件(在一个文件内定义服务)

在这里插入图片描述

2、很关键(踩坑找了很久),在项目文件(xxx…csproj)中添加xml元素:

 <ItemGroup>
    <Protobuf Include="Protos\FirstTest.proto" GrpcServices="Server" />
 </ItemGroup>

生成的源代码可在项目下的obj–>Debug–>net6.0–>Protos文件夹下查看,

文件名和proto文件的命名一致,这里会生成FirstTest.cs和FirstTestGrpc.cs文件

在这里插入图片描述

3、新建类FirstTestSvc继承FirstTest.FirstTestBase并重写方法

using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using System.Collections.Concurrent;
using System.Text.RegularExpressions;

namespace SimpleGrpc.Server.CoreSvc
{
    public class FirstTestSvc : FirstTest.FirstTestBase
    {
        private readonly ConcurrentQueue<GrpcResult> queue = new();

        #region 简单RPC——有参有返回值
        /// <summary>
        /// 简单RPC——有参有返回值
        /// </summary>
        /// <param name="request"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public override async Task<GrpcResult> SimpleRpc(GrpcParam request, ServerCallContext context)
        {
            TryTakeHost(context.Peer, out string host);
            Console.WriteLine($"[{host}]:请求SimpleRpc!");
            return await Task.FromResult(new GrpcResult
            {
                Success = true,
                Data = $"数据已被处理--{request.Id}--{request.Name}"
            });
        }
        #endregion

        #region 简单RPC——有参无返回值
        /// <summary>
        /// 简单RPC——有参无返回值
        /// </summary>
        /// <param name="request"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public override async Task<Empty> SimpleEmpty(GrpcParam request, ServerCallContext context)
        {
            TryTakeHost(context.Peer, out string host);
            Console.WriteLine($"[{host}]:请求SimpleEmpty!");
            return await Task.FromResult(new Empty());
        }
        #endregion

        #region 客户端流式RPC
        /// <summary>
        /// 客户端流式RPC
        /// </summary>
        /// <param name="requestStream"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public override async Task<GrpcResult> ClientStream(IAsyncStreamReader<GrpcParam> requestStream, ServerCallContext context)
        {
            TryTakeHost(context.Peer, out string host);
            while (await requestStream.MoveNext())
            {
                var ret = requestStream.Current;
                Console.WriteLine($"[{host}]:请求ClientStream:id-{ret.Id}!");
                await Task.FromResult(new GrpcResult
                {
                    Success = true,
                    Data = $"数据已被处理--{ret.Id}--{ret.Name}"
                });
            }
            return new GrpcResult { Data = "已完成" };
        }
        #endregion

        #region 服务端流式RPC
        /// <summary>
        /// 服务端流式RPC
        /// </summary>
        /// <param name="request"></param>
        /// <param name="responseStream"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public override async Task ServerStream(GrpcParam request, IServerStreamWriter<GrpcResult> responseStream, ServerCallContext context)
        {
            TryTakeHost(context.Peer, out string host);
            Console.WriteLine($"[{host}]:客户端已连接ServerStream!");
            await Task.Run(async () =>
            {
                Task.Run(() => //模拟生产数据
                {
                    for (int i = 0; i < 40; i++)
                    {
                        queue.Enqueue(new GrpcResult { Success = true, Data = $"服务端响应数据:{i}" });
                        Thread.Sleep(700);
                    }
                });
                while (true)
                {
                    if (queue.TryDequeue(out GrpcResult? result) && result != null)
                    {
                        await responseStream.WriteAsync(result);
                    }
                    await Task.Delay(700);
                }
            });
            await Task.CompletedTask;
        }
        #endregion

        #region 双向流式RPC
        /// <summary>
        /// 双向流式RPC
        /// </summary>
        /// <param name="requestStream"></param>
        /// <param name="responseStream"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public override async Task TwoWayStream(IAsyncStreamReader<GrpcParam> requestStream, IServerStreamWriter<GrpcResult> responseStream, ServerCallContext context)
        {
            TryTakeHost(context.Peer, out string host);
            while (await requestStream.MoveNext())
            {
                var current = requestStream.Current;
                Console.WriteLine($"[{host}]:请求流TwoWayStream数据:{current.Id}!");
                await responseStream.WriteAsync(new GrpcResult
                {
                    Success = true,
                    Data = $"客户端id:{current.Id}--服务端id:{current.Id + new Random().Next(1, 10)}"
                });
            }
            await Task.CompletedTask;
        }
        #endregion

        #region 获取主机信息
        /// <summary>
        /// 获取主机信息
        /// </summary>
        /// <param name="peer"></param>
        /// <param name="host"></param>
        /// <returns></returns>
        private static bool TryTakeHost(string peer, out string host)
        {
            //peer地址示例:ipv4:127.0.0.1:52384、ipv6:[::1]:52384
            var ipv4Match = Regex.Match(peer, @"^ipv4:(.+)", RegexOptions.IgnoreCase);
            if (ipv4Match.Success)
            {
                host = ipv4Match.Groups[1].Value;
                return true;
            }
            var ipv6Match = Regex.Match(peer, @"^ipv6:(.+)", RegexOptions.IgnoreCase);
            if (ipv6Match.Success)
            {
                host = ipv6Match.Groups[1].Value;
                return true;
            }
            host = string.Empty;
            return false;
        }
        #endregion
    }
}

4、Program编码配置指定的主机端口和需监听的服务

using Grpc.Core;
using SimpleGrpc.Server;
using SimpleGrpc.Server.CoreSvc;

string host = "localhost";
int port = 7007;
Console.WriteLine($"**************正在创建Grpc服务监听-host:{host}--port:{port}**************");
Server server = new()
{
    Ports = { new ServerPort(host, port, ServerCredentials.Insecure) },
    Services = { FirstTest.BindService(new FirstTestSvc()) }
};
server.Start();

Console.WriteLine($"**************输入stop按回车停止Grpc服务**************");

while (true)
{
    string? input = Console.ReadLine();
    if (input != null && input.Trim().ToLower() == "stop")
    {
        server.ShutdownAsync().Wait();
        Console.WriteLine($"**************Grpc服务已成功停止**************");
    }
    Thread.Sleep(300);
}



三、SimpleGrpc.Client 客户端配置

1、拷贝服务端的proto文件(目前发现两端配置文件需完全一致才能调用成功)

在这里插入图片描述

3、很关键(踩坑找了很久),在项目文件(xxx…csproj)中添加xml元素:

<ItemGroup>
    <Protobuf Include="Protos\FirstTest.proto" GrpcServices="Client" />
</ItemGroup>

生成的源代码可在项目下的obj–>Debug–>net6.0–>Protos文件夹下查看,

文件名和proto文件的命名一致,这里会生成FirstTest.cs和FirstTestGrpc.cs文件

在这里插入图片描述

4、编码连接服务端

using Grpc.Core;
using SimpleGrpc.Server;

string host = "localhost";
int port = 7007;
Console.WriteLine($"**************正在连接Grpc服务端-host:{host}--port:{port}**************");
Channel channel = new(host, port, ChannelCredentials.Insecure);
var client = new FirstTest.FirstTestClient(channel);

Console.WriteLine($"************** 简单RPC——有参有返回值 **************");
var ret1 = client.SimpleRpcAsync(new GrpcParam { Id = 123, Name = "Darren1" });
Console.WriteLine($"简单RPC——有参有返回值响应数据:{ret1.ResponseAsync.Result.Data}");


Console.WriteLine($"\r\n************** 简单RPC——有参无返回值 **************");
var ret2 = client.SimpleEmptyAsync(new GrpcParam { Id = 123, Name = "Darren2" });
Console.WriteLine($"简单RPC——有参无返回值请求完成");

Console.WriteLine($"\r\n************** 客户端流式RPC **************");
Task.Run(() =>
{
    var cts = new CancellationTokenSource(); //任务取消令牌
    var ret3 = client.ClientStream(cancellationToken: cts.Token);
    cts.CancelAfter(15000); //15s后取消
    while (!cts.Token.IsCancellationRequested)
    {
        ret3.RequestStream.WriteAsync(new GrpcParam
        {
            Id = new Random().Next(1, 100),
            Name = "Darren3"
        });
        Thread.Sleep(700);
    }
    ret3.RequestStream.CompleteAsync(); //告知服务端发送完成

    Console.WriteLine($"************** 客户端流式RPC响应结果:{ret3.ResponseAsync.Result.Data} **************");
});

Console.WriteLine($"\r\n************** 服务端流式RPC **************");
Task.Run(() =>
{
    var ret4 = client.ServerStream(new GrpcParam { Id = 1, Name = "Darren4" });
    var stream = ret4.ResponseStream;
    while (stream.MoveNext().Result)
    {
        Console.WriteLine($"************** 服务端流式RPC响应结果:{stream.Current.Data} **************");
    }
});

Console.WriteLine($"\r\n************** 双向流式RPC **************");
Task.Run(() =>
{
    var ret5 = client.TwoWayStream();
    var reqStream = ret5.RequestStream.WriteAsync(new GrpcParam { Id = 123, Name = "Darren5" });
    Console.WriteLine($"************** 双向流式RPC请求完成 **************");

    var stream = ret5.ResponseStream;
    while (stream.MoveNext().Result)
    {
        Console.WriteLine($"************** 双向流式RPC响应结果:{stream.Current.Data} **************");
    }
});

Console.ReadKey();



四、基础版到此已完成、进阶版(多人聊天室)后续有时间再整理…



版权声明:本文为weixin_52437470原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。