本章内容解读SRS开源代码框架,无二次开发,以学习交流为目的。
SRS是国人开发的开源流媒体服务器,C++语言开发,本章使用版本:
https://github.com/ossrs/srs/tree/5.0release
。
SRS信号量的使用
SRS封装了SrsSignalManager类,注册信号量回调函数,使用linux无名管道,接收到信号量时写入管道,在协程里轮询读取管道里的信号,并作相关处理。
信号量的使用提供了用户进程(命令行即可)与正在运行的SRS程序通信的方式,SRS使用信号量实现的功能:强制退出程序、优雅地退出程序(执行一系列析构,停止监听等)、重新加载配置文件、日志文件切割等。
源码
SRS使用的是协程轮询,这里使用传统线程轮询,用法类似。
srs_app_server.hpp
#ifndef SRS_APP_SERVER_HPP
#define SRS_APP_SERVER_HPP
#include <vector>
#include <string>
typedef int srs_error_t;
#define srs_success 0
// 系统信号量宏定义
// The signal defines.
// To reload the config file and apply new config.
#define SRS_SIGNAL_RELOAD SIGHUP
// Reopen the log file.
#define SRS_SIGNAL_REOPEN_LOG SIGUSR1
// For gracefully upgrade, start new SRS and gracefully quit old one.
// @see https://github.com/ossrs/srs/issues/1579
// TODO: Not implemented.
#define SRS_SIGNAL_UPGRADE SIGUSR2
// The signal for srs to fast quit, do essential dispose then exit.
#define SRS_SIGNAL_FAST_QUIT SIGTERM
// The signal for srs to gracefully quit, do carefully dispose then exit.
// @see https://github.com/ossrs/srs/issues/1579
#define SRS_SIGNAL_GRACEFULLY_QUIT SIGQUIT
// The signal for SRS to abort by assert(false).
#define SRS_SIGNAL_ASSERT_ABORT SIGABRT
// The application level signals.
// Persistence the config in memory to config file.
// @see https://github.com/ossrs/srs/issues/319#issuecomment-134993922
// @remark we actually don't handle the signal for it's not a valid os signal.
#define SRS_SIGNAL_PERSISTENCE_CONFIG 1000
// Convert signal to io,
// @see: st-1.9/docs/notes.html
class SrsSignalManager /*: public ISrsCoroutineHandler*/
{
private:
// Per-process pipe which is used as a signal queue.
// Up to PIPE_BUF/sizeof(int) signals can be queued up.
int sig_pipe[2];//无名管道,fd[0]负责读,fd[1]负责写
/*srs_netfd_t*/int signal_read_stfd;
private:
// SrsServer* server;
// SrsCoroutine* trd;
public:
SrsSignalManager(/*SrsServer* s*/);
virtual ~SrsSignalManager();
public:
virtual srs_error_t initialize();
virtual srs_error_t start();//监听信号,开启线程轮询
// Interface ISrsEndlessThreadHandler.
public:
static void* startThread(void* obj);//SRS是在协程轮询读信号,这里改为传统线程轮询,用法类似
virtual void *cycle();//轮询读取管道
void on_signal(int signo);
private:
// Global singleton instance
static SrsSignalManager* instance;
// Signal catching function.
// Converts signal event to I/O event.
static void sig_catcher(int signo);//信号捕获回调函数
};
#endif
srs_app_server.cpp
#include <srs_app_server.hpp>
#include <sys/types.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <algorithm>
#include <assert.h>
#if !defined(SRS_OSX) && !defined(SRS_CYGWIN64)
#include <sys/inotify.h>
#endif
using namespace std;
#ifdef SRS_RTC
#include <srs_app_rtc_network.hpp>
#endif
#ifdef SRS_GB28181
#include <srs_app_gb28181.hpp>
#endif
SrsSignalManager* SrsSignalManager::instance = NULL;
SrsSignalManager::SrsSignalManager()
{
SrsSignalManager::instance = this;
// server = s;
sig_pipe[0] = sig_pipe[1] = -1;
// trd = new SrsSTCoroutine("signal", this, _srs_context->get_id());
// signal_read_stfd = NULL;
}
SrsSignalManager::~SrsSignalManager()
{
// srs_freep(trd);
// srs_close_stfd(signal_read_stfd);
if (sig_pipe[0] > 0) {
::close(sig_pipe[0]);
}
if (sig_pipe[1] > 0) {
::close(sig_pipe[1]);
}
}
srs_error_t SrsSignalManager::initialize()
{
/* Create signal pipe */
//创建管道
if (pipe(sig_pipe) < 0) {
printf( "create pipe,ERROR_SYSTEM_CREATE_PIPE\n");
return -1;
}
// if ((signal_read_stfd = srs_netfd_open(sig_pipe[0])) == NULL) {
// return srs_error_new(ERROR_SYSTEM_CREATE_PIPE, "open pipe");
// }
return srs_success;
}
srs_error_t SrsSignalManager::start()
{
srs_error_t err = srs_success;
/**
* Note that if multiple processes are used (see below),
* the signal pipe should be initialized after the fork(2) call
* so that each process has its own private pipe.
*/
struct sigaction sa;
//sigemptyset:将信号集置空
//sigaction:为信号指定相关的处理程序
/* Install sig_catcher() as a signal handler */
sa.sa_handler = SrsSignalManager::sig_catcher;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
sigaction(SRS_SIGNAL_RELOAD, &sa, NULL);
sa.sa_handler = SrsSignalManager::sig_catcher;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
sigaction(SRS_SIGNAL_FAST_QUIT, &sa, NULL);
sa.sa_handler = SrsSignalManager::sig_catcher;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
sigaction(SRS_SIGNAL_GRACEFULLY_QUIT, &sa, NULL);
sa.sa_handler = SrsSignalManager::sig_catcher;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
sigaction(SRS_SIGNAL_ASSERT_ABORT, &sa, NULL);
sa.sa_handler = SrsSignalManager::sig_catcher;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
sigaction(SIGINT, &sa, NULL);
sa.sa_handler = SrsSignalManager::sig_catcher;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
sigaction(SRS_SIGNAL_REOPEN_LOG, &sa, NULL);
printf("signal installed, reload=%d, reopen=%d, fast_quit=%d, grace_quit=%d\n",
SRS_SIGNAL_RELOAD, SRS_SIGNAL_REOPEN_LOG, SRS_SIGNAL_FAST_QUIT, SRS_SIGNAL_GRACEFULLY_QUIT);
// if ((err = trd->start()) != srs_success) {//执行协程处理函数
// return srs_error_wrap(err, "signal manager");
// }
pthread_t tid;
pthread_create(&tid,NULL,SrsSignalManager::startThread,this);
return err;
}
void *SrsSignalManager::startThread(void *obj)
{
SrsSignalManager* pSrsSignalManager = (SrsSignalManager*)obj;
pSrsSignalManager->cycle();
return nullptr;
}
void* SrsSignalManager::cycle()
{
while (true) {
int signo;
/* Read the next signal from the pipe */
//从管道读取数据
/*srs_read*/read(sig_pipe[0], &signo, sizeof(int)/*, SRS_UTIME_NO_TIMEOUT*/);
/* Process signal synchronously */
on_signal(signo);
usleep(1000);
}
return nullptr;
}
void SrsSignalManager::sig_catcher(int signo)
{
int err;
/* Save errno to restore it after the write() */
err = errno;
/* write() is reentrant/async-safe */
int fd = SrsSignalManager::instance->sig_pipe[1];//往管道里写数据
write(fd, &signo, sizeof(int));
errno = err;
}
void SrsSignalManager::on_signal(int signo)
{
// For signal to quit with coredump.
if (signo == SRS_SIGNAL_ASSERT_ABORT) {
printf("abort with coredump, signo=%d\n", signo);
assert(false);
return;
}
if (signo == SRS_SIGNAL_RELOAD) {
printf("reload config, signo=%d\n", signo);
return;
}
#ifndef SRS_GPERF_MC
if (signo == SRS_SIGNAL_REOPEN_LOG) {
printf("reopen log file, signo=%d\n", signo);
return;
}
#endif
#ifdef SRS_GPERF_MC
if (signo == SRS_SIGNAL_REOPEN_LOG) {
signal_gmc_stop = true;
srs_warn("for gmc, the SIGUSR1 used as SIGINT, signo=%d", signo);
return;
}
#endif
if (signo == SRS_SIGNAL_PERSISTENCE_CONFIG) {
return;
}
if (signo == SIGINT) {
#ifdef SRS_GPERF_MC
srs_trace("gmc is on, main cycle will terminate normally, signo=%d", signo);
signal_gmc_stop = true;
#endif
}
// For K8S, force to gracefully quit for gray release or canary.
// @see https://github.com/ossrs/srs/issues/1595#issuecomment-587473037
if (signo == SRS_SIGNAL_FAST_QUIT ) {
printf("force gracefully quit, signo=%d\n", signo);
signo = SRS_SIGNAL_GRACEFULLY_QUIT;
}
if ((signo == SIGINT || signo == SRS_SIGNAL_FAST_QUIT) ) {
printf("sig=%d, user terminate program, fast quit\n", signo);
return;
}
if (signo == SRS_SIGNAL_GRACEFULLY_QUIT ) {
printf("sig=%d, user start gracefully quit\n年", signo);
return;
}
}
源码测试
程序运行后,在命令行执行:
killall -s SIGUSR1 srs_Signal #srs_Signal是进程名
srs_Signal进程打印
signal installed, reload=1, reopen=10, fast_quit=15, grace_quit=3
reopen log file, signo=10 #发送SIGUSR1 信号时打印
force gracefully quit, signo=15 #Ctrl+c 强制退出时打印
sig=3, user start gracefully quit #Ctrl+c 强制退出时打印
版权声明:本文为weixin_40355471原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。