redis之启动服务监听

  • Post author:
  • Post category:其他


redis是用c语言实现的一个内存数据库,先从

server.c

文件中的

main()

方法看起:

int main(int argc, char **argv) {
    ...
    initServerConfig();
    ...
    initServer();
    ...
}


main()

方法干的事很多,这里只研究启动服务以及监听这块,主要就是上面两个方法。

initServerConfig()

方法主要就是给server结构体赋初始值,部分代码如下:

void initServerConfig(void) {
    ...
    server.timezone = getTimeZone(); /* Initialized by tzset(). */
    server.configfile = NULL;
    server.executable = NULL;
    server.arch_bits = (sizeof(long) == 8) ? 64 : 32;
    server.bindaddr_count = 0;
    server.unixsocketperm = CONFIG_DEFAULT_UNIX_SOCKET_PERM;
    server.ipfd_count = 0;
    server.tlsfd_count = 0;
    server.sofd = -1;
    ...
}

其实这方法也只是给一部分元素赋值,还有部分值是为null的。可以看到

server.ipfd_count

初始值为0,这个后面会提到。启动服务的主要工作都是在

initServer()

方法中:

void initServer(void) {
    ...
    /* Open the TCP listening socket for the user commands. */
    if (server.port != 0 &&
        listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
        exit(1);
    ...

    /* Abort if there are no listening sockets at all. */
    if (server.ipfd_count == 0) {
        serverLog(LL_WARNING, "Configured to not listen anywhere, exiting.");
        exit(1);
    }
    ...
    /* Create an event handler for accepting new connections in TCP and Unix domain sockets. */
    for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR){
                serverPanic(
                    "Unrecoverable error creating server.ipfd file event.");
            }
    }
    ...
}

这里我们只研究不加密的tcp服务,所以关于

tls

部分和

unix

域套接字部分都省去了,当然还有

server

结构体的部分字段的初始化。在

initServer()

方法中是调用了

listenToPort()

方法启动监听的,第三个参数就是前面提到的

server.ipfd_count

,并且是引用。

/*
 * On success the function returns C_OK.
 * On error the function returns C_ERR. 
 */
int listenToPort(int port, int *fds, int *count) {
    int j;

    /* Force binding of 0.0.0.0 if no bind address is specified, always
     * entering the loop if j == 0. */
    if (server.bindaddr_count == 0) server.bindaddr[0] = NULL;
    for (j = 0; j < server.bindaddr_count || j == 0; j++) {
        if (server.bindaddr[j] == NULL) {
            int unsupported = 0;
            /* Bind * for both IPv6 and IPv4, we enter here only if
             * server.bindaddr_count == 0. */
            fds[*count] = anetTcp6Server(server.neterr,port,NULL,
                server.tcp_backlog);
            if (fds[*count] != ANET_ERR) {
                anetNonBlock(NULL,fds[*count]);
                (*count)++;
            } else if (errno == EAFNOSUPPORT) {
                unsupported++;
                serverLog(LL_WARNING,"Not listening to IPv6: unsupported");
            }

            if (*count == 1 || unsupported) {
                /* Bind the IPv4 address as well. */
                fds[*count] = anetTcpServer(server.neterr,port,NULL,
                    server.tcp_backlog);
                if (fds[*count] != ANET_ERR) {
                    anetNonBlock(NULL,fds[*count]);
                    (*count)++;
                } else if (errno == EAFNOSUPPORT) {
                    unsupported++;
                    serverLog(LL_WARNING,"Not listening to IPv4: unsupported");
                }
            }
            /* Exit the loop if we were able to bind * on IPv4 and IPv6,
             * otherwise fds[*count] will be ANET_ERR and we'll print an
             * error and return to the caller with an error. */
            if (*count + unsupported == 2) break;
        } else if (strchr(server.bindaddr[j],':')) {
            /* Bind IPv6 address. */
            fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j],
                server.tcp_backlog);
        } else {
            /* Bind IPv4 address. */
            fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j],
                server.tcp_backlog);
        }
        if (fds[*count] == ANET_ERR) {
            serverLog(LL_WARNING,
                "Could not create server TCP listening socket %s:%d: %s",
                server.bindaddr[j] ? server.bindaddr[j] : "*",
                port, server.neterr);
                if (errno == ENOPROTOOPT     || errno == EPROTONOSUPPORT ||
                    errno == ESOCKTNOSUPPORT || errno == EPFNOSUPPORT ||
                    errno == EAFNOSUPPORT    || errno == EADDRNOTAVAIL)
                    continue;
            return C_ERR;
        }
        anetNonBlock(NULL,fds[*count]);
        (*count)++;
    }
    return C_OK;
}

该方法先启动ipv6进行监听,如果绑定成功或者

errno



EAFNOSUPPORT

时,就再尝试启动ipv4进行监听。因为

*count



unsupported

初始值都为0,若绑定成功那么

*count

值为1;如果

errno==EAFNOSUPPORT



unsupported

值为1,这样下面的if判断条件就必然为真从而执行。

这里解释下

EAFNOSUPPORT

的含义,也就是

address family not supported

的意思。目前

socket()

函数只支持常见的五种协议族:

family 说明
AF_INET ipv4协议
AF_INET6 ipv6协议
AF_LOCAL unix域协议
AF_ROUTE 路由套接字
AF_KEY 密钥套接字

只要ip地址绑定成功,暂且不管协议支持与否,函数都会返回成功。

再回到

initServer()

方法中,第二个

if()

语句是根据

server.ipfd_count

的值判断是否有绑定成功,如果成功,则程序继续往下执行,调用

aeCreateFileEvent()

方法,加入到

event handler

中,并给一个回调函数

acceptTcpHandler()

,当服务监听到请求时便调用该函数进行接收。

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    ...
    while(max--) {
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
        acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
    }
}

int anetTcpAccept(char *err, int s, char *ip, size_t ip_len, int *port) {
    int fd;
    struct sockaddr_storage sa;
    socklen_t salen = sizeof(sa);
    if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == -1)
        return ANET_ERR;

    if (sa.ss_family == AF_INET) {
        struct sockaddr_in *s = (struct sockaddr_in *)&sa;
        if (ip) inet_ntop(AF_INET,(void*)&(s->sin_addr),ip,ip_len);
        if (port) *port = ntohs(s->sin_port);
    } else {
        struct sockaddr_in6 *s = (struct sockaddr_in6 *)&sa;
        if (ip) inet_ntop(AF_INET6,(void*)&(s->sin6_addr),ip,ip_len);
        if (port) *port = ntohs(s->sin6_port);
    }
    return fd;
}

static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {
    int fd;
    while(1) {
        fd = accept(s,sa,len);
        if (fd == -1) {
            if (errno == EINTR)
                continue;
            else {
                anetSetError(err, "accept: %s", strerror(errno));
                return ANET_ERR;
            }
        }
        break;
    }
    return fd;
}

上面这三个方法其实来自不同的文件,这里图方便我就放在一块了。从上往下依次调用,可见到最后还是无限循环调用我们熟悉的

accept()

方法接收客户端请求。

至此,redis服务的启动到监听就理得差不多了,中间省略了很多地方,以及事件处理器也没有介绍,以后再抽时间介绍吧。

其实关于

listenToPort()

方法中

*count+unsupported==2

这块我想了很久之前一直不理解这块意思。总觉得如果if判断后,

*count=0,unsupported=2

,那么就说明

fds[*count]

是等于

ANET_ERR

的,也是属于失败的一种,应该返回失败才对,而这里也是直接跳出循环返回成功。后来想到可能是设计者考虑到以后可能会增加新的协议族,在后续流程中处理即可。譬如目前如果是其他的不支持的协议族,

accept()

方法也会接收失败的。

一点浅见,如有问题,欢迎指正!



版权声明:本文为yang1018679原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。