grpc实现c++异步非阻塞stream
参考文章
-
Non-blocking single-threaded streaming C++ server
-
gRPC C++ async api doc and sample code
-
grpc异步stream server端demo
序言
原来一直是用着同步阻塞的grpc stream。由于不想再创建新的线程来监听grpc stream的新消息了,所以就想着采用异步非阻塞的方式来实现grpc的stream。但是google了一下,发现关于grpc异步非阻塞的文章很少,大部分都是grpc官方的阻塞的demo,所以特此写一篇文章来介绍grpc异步非阻塞的逻辑和实现的方法。
首先先给出一个结论,grpc stream采用的模型跟普通的异步非阻塞模型类似,所以逻辑相对复杂,如果没有较好的抽象思维以及一定的编码能力,最好还是采用同步阻塞的方式接收消息。
如参考文章1里面所说的,grpc的异步并不是真正的异步,仍然会有后台线程:
-
Note grpc library needs to use the thread donated via Next or AsyncNext to do some background work and thus only AsyncNext infrequently with very short deadline may not be a good idea.
( 注意grpc库需要使用通过Next或AsyncNext来做一些后台工作,因此使用一个到期期限很短的AsyncNext可能不是一个好主意 ) -
Also, regarding the single-threadedness, the current grpc implementation creates internal threads to do background work such as timer handling and others. As a result, you will not have a truly single-threaded server even if you only use one thread for the server.
( 此外,关于单线程,当前的grpc的实现会创建了内部线程来完成后台工作,如计时器处理等。因此,即使只为服务器使用一个线程,你也不会有真正的单线程服务器。)
grpc的异步模型
grpc的异步模型以completion queue和tag为核心,completion queue中存放的是触发的事件对应tag,通过completion queue的AsyncNext函数或者Next函数来取出tag,用户再根据tag去调用具体的事件的自定义处理函数,可以看出tag就是识别触发事件(如新链接接入,收到新消息等)的唯一标识符。tag在grpc中是由用户自己去绑定,它的类型是void*,即无类型指针,相对于某些框架来说,grpc的异步框架更自由一点。框架如下图所示:
在grpc中使用双向stream流,一个链接就是一个stream,那么,每当我们在completion queue中取到一个tag,首先,我们要通过tag去判断是哪个stream触发了事件,然后我们要根据tag去判断,到底是什么事件触发了,常用的事件有新链接接入(connect),收到新信息(read),消息发送成功(send),链接断开(disconnect)。tag可以使用一个id,这个id对应一个stream,同时对应一个具体的事件(如收到新信息)。为了方便本人所写的例子中,使用的是 std::function<void(bool)>作为tag,就是绑定了对象的类成员函数指针,该类成员函数为void xxx(bool) ;由于该函数指针绑定了对象,而对象和stream是绑定的, 所以很容易从tag中知道是哪个stream,然后不同的事件绑定不同的类成员函数,通过类成员函数的不同就可以实现不同事件的处理。
注意,在stream断开的时候,会触发读完成事件(read)和写完成事件(send),通过completion queue的AsyncNext函数或者Next函数返回的ok标识符就可以区分是正常的读写完成事件,还是因为stream断开触发的。
code下载地址
grpc async stream server
https://gitee.com/evilskyman/grpc-demo.git
grpc async stream client
https://gitee.com/evilskyman/grpc-async-client-stream-demo.git
server code
#include <algorithm>
#include <chrono>
#include <cmath>
#include <cstddef>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <queue>
#include <unordered_set>
#include <mutex>
#include <time.h>
#include <grpc/grpc.h>
#include <grpcpp/security/server_credentials.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
#include <grpcpp/server_context.h>
#include "hello.pb.h"
#include "hello.grpc.pb.h"
using grpc::Server;
using grpc::ServerAsyncResponseWriter;
using grpc::ServerBuilder;
using grpc::ServerCompletionQueue;
using grpc::ServerContext;
using grpc::ServerReader;
using grpc::ServerReaderWriter;
using grpc::ServerAsyncReaderWriter;
using grpc::ServerWriter;
using grpc::Status;
using hello::HelloService;
using hello::HelloMsg;
using TagType = std::function<void(bool)>;//本人使用的tag是函数指针,函数指针使用的是绑定了类的类对象函数.
using namespace std;
class GrpcStreamServerInstance;
const static std::string server_address("0.0.0.0:8860");
static unordered_set<GrpcStreamServerInstance *> GrpcStreamServerInstanceSet;//存放着所有的已经连接上的stream对应的GrpcStreamServerInstance
class GrpcStreamServerInterface {
public:
virtual void connected(bool ok) = 0; //新连接接入服务器
virtual void readDone(bool ok) = 0; //读到一帧新消息
virtual void writeDone(bool ok) = 0; //写入完成一帧消息到客户端
virtual void disconnect(bool ok) = 0; //服务器被动断开,不管谁发送的断开指令
};
typedef hello::HelloService::AsyncService ServeiceType;
class GrpcStreamServerInstance : public GrpcStreamServerInterface{
private:
public:
GrpcStreamServerInstance(ServeiceType* service,grpc::ServerCompletionQueue* inputCq);
virtual ~GrpcStreamServerInstance(){};
void connected(bool ok) override;
void readDone(bool ok) override;
void writeDone(bool ok) override;
void disconnect(bool ok) override;
bool asycSendMsg(HelloMsg& msg);
private:
ServeiceType* service;
grpc::ServerCompletionQueue* cq;//Completion Queue
ServerContext serverContext;//每一个stream都有自己的serverContext
ServerAsyncReaderWriter<HelloMsg, HelloMsg> stream;
//函数指针
TagType connectedFunc;//新链接接入时触发
TagType readDoneFunc;//读到新消息时触发
TagType writeDoneFunc;//发送一帧消息成功后触发
TagType disconnectFunc;//stream断开时触发
//inputMsg用来接收消息,用stream.Read()中绑定
HelloMsg inputMsg;
//onWrite用来区分stream有没有在发送消息,如果stream在发送,则只需要将消息写入writeBuffer,
//否则要使用stream.Write()触发stream的发送;
bool onWrite;
//写缓存
queue<HelloMsg> writeBuffer;
};
//GrpcStreamServerInstance不需要其它线程交互,故不需要互斥锁
GrpcStreamServerInstance::GrpcStreamServerInstance(ServeiceType* inputService,grpc::ServerCompletionQueue* inputCq):\
service(inputService),cq(inputCq),stream(&serverContext)
{
//使用std::bind绑定对象和类对象函数得到一个函数指针
connectedFunc = std::bind(&GrpcStreamServerInstance::connected, this, std::placeholders::_1);
readDoneFunc = std::bind(&GrpcStreamServerInstance::readDone, this, std::placeholders::_1);
writeDoneFunc = std::bind(&GrpcStreamServerInstance::writeDone, this, std::placeholders::_1);
disconnectFunc = std::bind(&GrpcStreamServerInstance::disconnect, this, std::placeholders::_1);
//设置serverContext,stream断开时,CompletionQueue会返回一个tag,这个tag就是输入的disconnectFunc这个函数指针
serverContext.AsyncNotifyWhenDone(&disconnectFunc);
//设置当新新链接connect的时候,cq返回connectedFunc作为tag
service->Requesthello(&serverContext,&stream, cq,cq,&connectedFunc);
onWrite = false;
}
void GrpcStreamServerInstance::connected(bool ok){
//新建一个GrpcStreamServerInstance,一个client的grpc链接就对应一个GrpcStreamServerInstance实例
stream.Read(&inputMsg,&readDoneFunc);
//新的GrpcStreamServerInstance,会在构造函数中调用service->Requesthello()来绑定新链接
new GrpcStreamServerInstance(service,cq);
//新加入的stream对应的GrpcStreamServerInstance会被加入到GrpcStreamServerInstanceSet中,用于发送消息或者统计stream链接。
GrpcStreamServerInstanceSet.insert(this);
cout << "当前链接数量为"<< GrpcStreamServerInstanceSet.size()<< endl;
}
void GrpcStreamServerInstance::readDone(bool ok){
try{
if(!ok){
//当ok == false,说明stream已经断开
return;
}
cout << "收到消息,id为"<< inputMsg.id() <<",msg为" << inputMsg.msg() << endl;
stream.Read(&inputMsg,&readDoneFunc);
}catch(const std::exception& e){
cout << e.what() << endl;
}
}
void GrpcStreamServerInstance::writeDone(bool ok){
if(!ok){
//当ok == false,说明stream已经断开
return;
}
onWrite = false;
if(writeBuffer.empty())return;
//当grpc写完时,会触发writeDone,我们只需要从自定义的writeBuffer中取一帧继续写即可
stream.Write(std::move(writeBuffer.front()),&writeDoneFunc);
writeBuffer.pop();
onWrite = true;
}
void GrpcStreamServerInstance::disconnect(bool ok){
GrpcStreamServerInstanceSet.erase(this);
cout << "链接断开,当前链接数量为"<< GrpcStreamServerInstanceSet.size()<< endl;
delete this;
}
bool GrpcStreamServerInstance::asycSendMsg(HelloMsg& msg){
writeBuffer.push(msg);
if(!onWrite){
//没有任何写操作在执行
onWrite = true;
stream.Write(std::move(writeBuffer.front()),&writeDoneFunc);
writeBuffer.pop();
}
return true;
}
class GrpcStreamServerThread {
public:
GrpcStreamServerThread(): msgNum(0){};
~GrpcStreamServerThread();
void run();
bool sendMsg(const HelloMsg& msg);
bool isTimeElapsed(struct timeval now,struct timeval last,int64_t ms);
private:
queue<HelloMsg> msgQueue;
std::atomic_int msgNum;
timed_mutex writeLock;
};
bool GrpcStreamServerThread::isTimeElapsed(struct timeval now,struct timeval last,int64_t ms){
int64_t sub = (now.tv_sec - last.tv_sec)*1000;
sub = sub + (now.tv_usec - last.tv_usec)/1000;
return sub > ms ? true :false;
}
void GrpcStreamServerThread::run(){
try{
std::unique_ptr<grpc::ServerCompletionQueue> cq;
ServeiceType service;
ServerBuilder builder;
// builder.AddChannelArgument(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 10000);
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service);
cq = builder.AddCompletionQueue();
std::unique_ptr<Server> server_= builder.BuildAndStart();
new GrpcStreamServerInstance(&service,cq.get());
struct timeval lastTime = {0,0};//用来记录时间,保证下面定时消息的发送
while (true) {
void * tag;
bool ok;
//阻塞100毫秒,gpr_time_from_millis()函数的单位是毫秒,输入的是tag和ok的地址,cq->AsyncNext()会把结果写到地址对应的内存上
grpc::ServerCompletionQueue::NextStatus status = cq->AsyncNext(&tag, &ok,\
gpr_time_from_millis(100,GPR_TIMESPAN));
if(status == grpc::ServerCompletionQueue::NextStatus::GOT_EVENT){
//grpc服务器有新的事件,强制转换tag从void * 到 std::function<void(bool)> *,即void *(bool) 函数指针
TagType* functionPointer = reinterpret_cast<TagType*>(tag);
//通过函数指针functionPointer调用函数GrpcStreamServerInstance::xxx
(*functionPointer)(ok);
}
//从msgQueue中取出新的消息,发送各个client
if( msgNum != 0){
std::unique_lock<timed_mutex>lock(writeLock,std::defer_lock);
chrono::milliseconds tryTime(500);
if(lock.try_lock_for(tryTime)){
//获取到了锁
while(!msgQueue.empty()){
HelloMsg msg = msgQueue.front();
for(auto temp : GrpcStreamServerInstanceSet){
temp->asycSendMsg(msg);
}
msgQueue.pop();
}
}else{
cout << "500ms内没抢到锁" << endl;
continue;
}
}
//定时发送消息
struct timeval now;
gettimeofday(&now, NULL);
if(isTimeElapsed(now,lastTime,10*1000)){//判断是否已经过了10秒
lastTime = now;
HelloMsg msg;
msg.set_id(2);
msg.set_msg("hello world");
for(auto temp : GrpcStreamServerInstanceSet){
temp->asycSendMsg(msg);
}
}
//此处可以加入一些自定义的处理函数,比如记录时间等等,但是不应该阻塞太久。
}
}catch(const std::exception& e){
cout << e.what() << endl;
}
}
GrpcStreamServerThread::~GrpcStreamServerThread(){
}
bool GrpcStreamServerThread::sendMsg(const HelloMsg& msg){
{
std::unique_lock<timed_mutex>lock(writeLock,std::defer_lock);
chrono::milliseconds tryTime(500);
//main函数与GrpcStreamServerThread::run()处于不同的线程,需要加锁保证线程安全
if(lock.try_lock_for(tryTime)){
//获取到了锁
msgQueue.push(msg);
msgNum++;
}else{
cout << "500ms内没抢到锁" << endl;
return false;
}
}
return true;
}
int main(){
GrpcStreamServerThread* grpcStreamServerThread = new GrpcStreamServerThread();
thread myThread(std::bind(&GrpcStreamServerThread::run, grpcStreamServerThread));
HelloMsg msg;
msg.set_id(1);
while(1){
std::string input;
cin >> input;
//输入exit,跳出循环结束程序。
if(input == "exit")break;
msg.set_msg(input);
grpcStreamServerThread->sendMsg(msg);
}
return 0;
}