C#/.NetCore/.Net6中使用Grpc服务—基础篇
一、Grpc概述
Grpc是由google开发的一个高性能、通用的开源RPC框架,主要基于HTTP2协议传输。
1、Grpc支持4种流
a、简单RPC
b、客户端流式RPC
c、服务端流式RPC
d、双向流式RPC
2、示例应用环境&dll
a、使用.net 6控制台(SimpleGrpc.Server 服务端、SimpleGrpc.Client 客户端)
b、所需的dll(NuGet程序包:Grpc.Core、Grpc.Tools、Google.Protobuf)
二、SimpleGrpc.Server 服务端配置
1、新建文件夹Protos,在该文件夹中添加proto文件(在一个文件内定义服务)
2、很关键(踩坑找了很久),在项目文件(xxx…csproj)中添加xml元素:
<ItemGroup>
<Protobuf Include="Protos\FirstTest.proto" GrpcServices="Server" />
</ItemGroup>
生成的源代码可在项目下的obj–>Debug–>net6.0–>Protos文件夹下查看,
文件名和proto文件的命名一致,这里会生成FirstTest.cs和FirstTestGrpc.cs文件
3、新建类FirstTestSvc继承FirstTest.FirstTestBase并重写方法
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using System.Collections.Concurrent;
using System.Text.RegularExpressions;
namespace SimpleGrpc.Server.CoreSvc
{
public class FirstTestSvc : FirstTest.FirstTestBase
{
private readonly ConcurrentQueue<GrpcResult> queue = new();
#region 简单RPC——有参有返回值
/// <summary>
/// 简单RPC——有参有返回值
/// </summary>
/// <param name="request"></param>
/// <param name="context"></param>
/// <returns></returns>
public override async Task<GrpcResult> SimpleRpc(GrpcParam request, ServerCallContext context)
{
TryTakeHost(context.Peer, out string host);
Console.WriteLine($"[{host}]:请求SimpleRpc!");
return await Task.FromResult(new GrpcResult
{
Success = true,
Data = $"数据已被处理--{request.Id}--{request.Name}"
});
}
#endregion
#region 简单RPC——有参无返回值
/// <summary>
/// 简单RPC——有参无返回值
/// </summary>
/// <param name="request"></param>
/// <param name="context"></param>
/// <returns></returns>
public override async Task<Empty> SimpleEmpty(GrpcParam request, ServerCallContext context)
{
TryTakeHost(context.Peer, out string host);
Console.WriteLine($"[{host}]:请求SimpleEmpty!");
return await Task.FromResult(new Empty());
}
#endregion
#region 客户端流式RPC
/// <summary>
/// 客户端流式RPC
/// </summary>
/// <param name="requestStream"></param>
/// <param name="context"></param>
/// <returns></returns>
public override async Task<GrpcResult> ClientStream(IAsyncStreamReader<GrpcParam> requestStream, ServerCallContext context)
{
TryTakeHost(context.Peer, out string host);
while (await requestStream.MoveNext())
{
var ret = requestStream.Current;
Console.WriteLine($"[{host}]:请求ClientStream:id-{ret.Id}!");
await Task.FromResult(new GrpcResult
{
Success = true,
Data = $"数据已被处理--{ret.Id}--{ret.Name}"
});
}
return new GrpcResult { Data = "已完成" };
}
#endregion
#region 服务端流式RPC
/// <summary>
/// 服务端流式RPC
/// </summary>
/// <param name="request"></param>
/// <param name="responseStream"></param>
/// <param name="context"></param>
/// <returns></returns>
public override async Task ServerStream(GrpcParam request, IServerStreamWriter<GrpcResult> responseStream, ServerCallContext context)
{
TryTakeHost(context.Peer, out string host);
Console.WriteLine($"[{host}]:客户端已连接ServerStream!");
await Task.Run(async () =>
{
Task.Run(() => //模拟生产数据
{
for (int i = 0; i < 40; i++)
{
queue.Enqueue(new GrpcResult { Success = true, Data = $"服务端响应数据:{i}" });
Thread.Sleep(700);
}
});
while (true)
{
if (queue.TryDequeue(out GrpcResult? result) && result != null)
{
await responseStream.WriteAsync(result);
}
await Task.Delay(700);
}
});
await Task.CompletedTask;
}
#endregion
#region 双向流式RPC
/// <summary>
/// 双向流式RPC
/// </summary>
/// <param name="requestStream"></param>
/// <param name="responseStream"></param>
/// <param name="context"></param>
/// <returns></returns>
public override async Task TwoWayStream(IAsyncStreamReader<GrpcParam> requestStream, IServerStreamWriter<GrpcResult> responseStream, ServerCallContext context)
{
TryTakeHost(context.Peer, out string host);
while (await requestStream.MoveNext())
{
var current = requestStream.Current;
Console.WriteLine($"[{host}]:请求流TwoWayStream数据:{current.Id}!");
await responseStream.WriteAsync(new GrpcResult
{
Success = true,
Data = $"客户端id:{current.Id}--服务端id:{current.Id + new Random().Next(1, 10)}"
});
}
await Task.CompletedTask;
}
#endregion
#region 获取主机信息
/// <summary>
/// 获取主机信息
/// </summary>
/// <param name="peer"></param>
/// <param name="host"></param>
/// <returns></returns>
private static bool TryTakeHost(string peer, out string host)
{
//peer地址示例:ipv4:127.0.0.1:52384、ipv6:[::1]:52384
var ipv4Match = Regex.Match(peer, @"^ipv4:(.+)", RegexOptions.IgnoreCase);
if (ipv4Match.Success)
{
host = ipv4Match.Groups[1].Value;
return true;
}
var ipv6Match = Regex.Match(peer, @"^ipv6:(.+)", RegexOptions.IgnoreCase);
if (ipv6Match.Success)
{
host = ipv6Match.Groups[1].Value;
return true;
}
host = string.Empty;
return false;
}
#endregion
}
}
4、Program编码配置指定的主机端口和需监听的服务
using Grpc.Core;
using SimpleGrpc.Server;
using SimpleGrpc.Server.CoreSvc;
string host = "localhost";
int port = 7007;
Console.WriteLine($"**************正在创建Grpc服务监听-host:{host}--port:{port}**************");
Server server = new()
{
Ports = { new ServerPort(host, port, ServerCredentials.Insecure) },
Services = { FirstTest.BindService(new FirstTestSvc()) }
};
server.Start();
Console.WriteLine($"**************输入stop按回车停止Grpc服务**************");
while (true)
{
string? input = Console.ReadLine();
if (input != null && input.Trim().ToLower() == "stop")
{
server.ShutdownAsync().Wait();
Console.WriteLine($"**************Grpc服务已成功停止**************");
}
Thread.Sleep(300);
}
三、SimpleGrpc.Client 客户端配置
1、拷贝服务端的proto文件(目前发现两端配置文件需完全一致才能调用成功)
3、很关键(踩坑找了很久),在项目文件(xxx…csproj)中添加xml元素:
<ItemGroup>
<Protobuf Include="Protos\FirstTest.proto" GrpcServices="Client" />
</ItemGroup>
生成的源代码可在项目下的obj–>Debug–>net6.0–>Protos文件夹下查看,
文件名和proto文件的命名一致,这里会生成FirstTest.cs和FirstTestGrpc.cs文件
4、编码连接服务端
using Grpc.Core;
using SimpleGrpc.Server;
string host = "localhost";
int port = 7007;
Console.WriteLine($"**************正在连接Grpc服务端-host:{host}--port:{port}**************");
Channel channel = new(host, port, ChannelCredentials.Insecure);
var client = new FirstTest.FirstTestClient(channel);
Console.WriteLine($"************** 简单RPC——有参有返回值 **************");
var ret1 = client.SimpleRpcAsync(new GrpcParam { Id = 123, Name = "Darren1" });
Console.WriteLine($"简单RPC——有参有返回值响应数据:{ret1.ResponseAsync.Result.Data}");
Console.WriteLine($"\r\n************** 简单RPC——有参无返回值 **************");
var ret2 = client.SimpleEmptyAsync(new GrpcParam { Id = 123, Name = "Darren2" });
Console.WriteLine($"简单RPC——有参无返回值请求完成");
Console.WriteLine($"\r\n************** 客户端流式RPC **************");
Task.Run(() =>
{
var cts = new CancellationTokenSource(); //任务取消令牌
var ret3 = client.ClientStream(cancellationToken: cts.Token);
cts.CancelAfter(15000); //15s后取消
while (!cts.Token.IsCancellationRequested)
{
ret3.RequestStream.WriteAsync(new GrpcParam
{
Id = new Random().Next(1, 100),
Name = "Darren3"
});
Thread.Sleep(700);
}
ret3.RequestStream.CompleteAsync(); //告知服务端发送完成
Console.WriteLine($"************** 客户端流式RPC响应结果:{ret3.ResponseAsync.Result.Data} **************");
});
Console.WriteLine($"\r\n************** 服务端流式RPC **************");
Task.Run(() =>
{
var ret4 = client.ServerStream(new GrpcParam { Id = 1, Name = "Darren4" });
var stream = ret4.ResponseStream;
while (stream.MoveNext().Result)
{
Console.WriteLine($"************** 服务端流式RPC响应结果:{stream.Current.Data} **************");
}
});
Console.WriteLine($"\r\n************** 双向流式RPC **************");
Task.Run(() =>
{
var ret5 = client.TwoWayStream();
var reqStream = ret5.RequestStream.WriteAsync(new GrpcParam { Id = 123, Name = "Darren5" });
Console.WriteLine($"************** 双向流式RPC请求完成 **************");
var stream = ret5.ResponseStream;
while (stream.MoveNext().Result)
{
Console.WriteLine($"************** 双向流式RPC响应结果:{stream.Current.Data} **************");
}
});
Console.ReadKey();
四、基础版到此已完成、进阶版(多人聊天室)后续有时间再整理…