ngx_event

  • Post author:
  • Post category:其他


前面说了一些基本的数据结构,现在来看ngx关于网络这块的处理。

ngx_event会牵涉到一个模块的东西,这个暂时先不说,在后面的文章,整合的时候才说,现在只是来看这个数据结构和功能。

看下结构体:

typedef void (*ngx_event_handler_pt)(ngx_event_t *ev);
typedef void (*ngx_connection_handler_pt)(ngx_connection_t *c);

struct ngx_event_s {
    void            *data;//上下文信息

    unsigned         write:1;//是否可写

    unsigned         accept:1;//accept

    /* used to detect the stale events in kqueue, rtsig, and epoll */
    unsigned         instance:1;//

    /*
     * the event was passed or would be passed to a kernel;
     * in aio mode - operation was posted.
     */
    unsigned         active:1;//是否

    unsigned         disabled:1;//释放

    /* the ready event; in aio mode 0 means that no operation can be posted */
    unsigned         ready:1;

    unsigned         oneshot:1;

    /* aio operation is complete */
    unsigned         complete:1;

    unsigned         eof:1;
    unsigned         error:1;

    unsigned         timedout:1;
    unsigned         timer_set:1;

    unsigned         delayed:1;

    unsigned         deferred_accept:1;

    /* the pending eof reported by kqueue, epoll or in aio chain operation */
    unsigned         pending_eof:1;

#if !(NGX_THREADS)
    unsigned         posted_ready:1;
#endif

#if (NGX_WIN32)
    /* setsockopt(SO_UPDATE_ACCEPT_CONTEXT) was successful */
    unsigned         accept_context_updated:1;
#endif

#if (NGX_HAVE_KQUEUE)
    unsigned         kq_vnode:1;

    /* the pending errno reported by kqueue */
    int              kq_errno;
#endif

    /*
     * kqueue only:
     *   accept:     number of sockets that wait to be accepted
     *   read:       bytes to read when event is ready
     *               or lowat when event is set with NGX_LOWAT_EVENT flag
     *   write:      available space in buffer when event is ready
     *               or lowat when event is set with NGX_LOWAT_EVENT flag
     *
     * iocp: TODO
     *
     * otherwise:
     *   accept:     1 if accept many, 0 otherwise
     */

#if (NGX_HAVE_KQUEUE) || (NGX_HAVE_IOCP)
    int              available;
#else
    unsigned         available:1;
#endif

    ngx_event_handler_pt  handler;//事件处理函数


#if (NGX_HAVE_AIO)

#if (NGX_HAVE_IOCP)
    ngx_event_ovlp_t ovlp;
#else
    struct aiocb     aiocb;
#endif

#endif

    ngx_uint_t       index;

    ngx_log_t       *log;

    ngx_rbtree_node_t   timer;//定时事件节点

    unsigned         closed:1;

    /* to test on worker exit */
    unsigned         channel:1;
    unsigned         resolver:1;

#if (NGX_THREADS)

    unsigned         locked:1;

    unsigned         posted_ready:1;
    unsigned         posted_timedout:1;
    unsigned         posted_eof:1;

#if (NGX_HAVE_KQUEUE)
    /* the pending errno reported by kqueue */
    int              posted_errno;
#endif

#if (NGX_HAVE_KQUEUE) || (NGX_HAVE_IOCP)
    int              posted_available;
#else
    unsigned         posted_available:1;
#endif

    ngx_atomic_t    *lock;
    ngx_atomic_t    *own_lock;

#endif

    /* the links of the posted queue */
    ngx_event_t     *next;//事件链表
    ngx_event_t    **prev;


#if 0

    /* the threads support */

    /*
     * the event thread context, we store it here
     * if $(CC) does not understand __thread declaration
     * and pthread_getspecific() is too costly
     */

    void            *thr_ctx;

#if (NGX_EVENT_T_PADDING)

    /* event should not cross cache line in SMP */

    uint32_t         padding[NGX_EVENT_T_PADDING];
#endif
#endif
};

ngx_event主要提供对io事件的封装,具体的io事件的处理是用的ngx_event_actions_t这个结构体

来看下这个结构体

typedef struct {
    ngx_int_t  (*add)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);
    ngx_int_t  (*del)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);

    ngx_int_t  (*enable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);
    ngx_int_t  (*disable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);

    ngx_int_t  (*add_conn)(ngx_connection_t *c);
    ngx_int_t  (*del_conn)(ngx_connection_t *c, ngx_uint_t flags);

    ngx_int_t  (*process_changes)(ngx_cycle_t *cycle, ngx_uint_t nowait);
    ngx_int_t  (*process_events)(ngx_cycle_t *cycle, ngx_msec_t timer,
                   ngx_uint_t flags);

    ngx_int_t  (*init)(ngx_cycle_t *cycle, ngx_msec_t timer);
    void       (*done)(ngx_cycle_t *cycle);
} ngx_event_actions_t;

ngx_event_actions_t 是一个通用的处理接口,具体的后面实现是根据平台来的,比如linux的epoll mac的kqueue 这些,这里不打算讲解这些平台的io处理,过一下流程就好。

每一个函数的作用喃,看一下函数名字应该能够准确的知道。

我这里以kqueue为例子过一遍。

typedef struct {
    ngx_str_t              *name;

    void                 *(*create_conf)(ngx_cycle_t *cycle);
    char                 *(*init_conf)(ngx_cycle_t *cycle, void *conf);

    ngx_event_actions_t     actions;
} ngx_event_module_t;

static ngx_int_t ngx_kqueue_init(ngx_cycle_t *cycle, ngx_msec_t timer);
static void ngx_kqueue_done(ngx_cycle_t *cycle);
static ngx_int_t ngx_kqueue_add_event(ngx_event_t *ev, ngx_int_t event,
    ngx_uint_t flags);
static ngx_int_t ngx_kqueue_del_event(ngx_event_t *ev, ngx_int_t event,
    ngx_uint_t flags);
static ngx_int_t ngx_kqueue_set_event(ngx_event_t *ev, ngx_int_t filter,
    ngx_uint_t flags);
static ngx_int_t ngx_kqueue_process_changes(ngx_cycle_t *cycle, ngx_uint_t try);
static ngx_int_t ngx_kqueue_process_events(ngx_cycle_t *cycle, ngx_msec_t timer,
    ngx_uint_t flags);
static ngx_inline void ngx_kqueue_dump_event(ngx_log_t *

ngx_event_module_t  ngx_kqueue_module_ctx = {
    &kqueue_name,
    ngx_kqueue_create_conf,                /* create configuration */
    ngx_kqueue_init_conf,                  /* init configuration */

    {
        ngx_kqueue_add_event,              /* add an event */
        ngx_kqueue_del_event,              /* delete an event */
        ngx_kqueue_add_event,              /* enable an event */
        ngx_kqueue_del_event,              /* disable an event */
        NULL,                              /* add an connection */
        NULL,                              /* delete an connection */
        ngx_kqueue_process_changes,        /* process the changes */
        ngx_kqueue_process_events,         /* process the events */
        ngx_kqueue_init,                   /* init the events */
        ngx_kqueue_done                    /* done the events */
    }

};

/*
 * The "change_list" should be declared as ngx_thread_volatile.
 * However, the use of the change_list is localized in kqueue functions and
 * is protected by the mutex so even the "icc -ipo" should not build the code
 * with the race condition.  Thus we avoid the declaration to make a more
 * readable code.
 */

static struct kevent  *change_list, *change_list0, *change_list1;
static struct kevent  *event_list;
static ngx_uint_t      max_changes, nchanges, nevents;

//初始化
static ngx_int_t
ngx_kqueue_init(ngx_cycle_t *cycle, ngx_msec_t timer)
{
    ngx_kqueue_conf_t  *kcf;
    struct timespec     ts;
#if (NGX_HAVE_TIMER_EVENT)
    struct kevent       kev;
#endif

    kcf = ngx_event_get_conf(cycle->conf_ctx, ngx_kqueue_module);

    if (ngx_kqueue == -1) {//有没有创建kqy
        ngx_kqueue = kqueue();

        if (ngx_kqueue == -1) {
            ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
                          "kqueue() failed");
            return NGX_ERROR;
        }

#if (NGX_THREADS)

        list_mutex = ngx_mutex_init(cycle->log, 0);
        if (list_mutex == NULL) {
            return NGX_ERROR;
        }

        kevent_mutex = ngx_mutex_init(cycle->log, 0);
        if (kevent_mutex == NULL) {
            return NGX_ERROR;
        }

#endif
    }

    if (max_changes < kcf->changes) {//支持的改变大小
        if (nchanges) {
            ts.tv_sec = 0;
            ts.tv_nsec = 0;

            if (kevent(ngx_kqueue, change_list, (int) nchanges, NULL, 0, &ts)
                == -1)
            {
                ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                              "kevent() failed");
                return NGX_ERROR;
            }
            nchanges = 0;
        }

        if (change_list0) {
            ngx_free(change_list0);
        }

        change_list0 = ngx_alloc(kcf->changes * sizeof(struct kevent),
                                 cycle->log);//分配空间
        if (change_list0 == NULL) {
            return NGX_ERROR;
        }

        if (change_list1) {
            ngx_free(change_list1);
        }

        change_list1 = ngx_alloc(kcf->changes * sizeof(struct kevent),
                                 cycle->log);
        if (change_list1 == NULL) {
            return NGX_ERROR;
        }

        change_list = change_list0;
    }

    max_changes = kcf->changes;

    if (nevents < kcf->events) {
        if (event_list) {
            ngx_free(event_list);
        }

        event_list = ngx_alloc(kcf->events * sizeof(struct kevent), cycle->log);
        if (event_list == NULL) {
            return NGX_ERROR;
        }
    }

    ngx_event_flags = NGX_USE_ONESHOT_EVENT
                      |NGX_USE_KQUEUE_EVENT
                      |NGX_USE_VNODE_EVENT;

#if (NGX_HAVE_TIMER_EVENT)

    if (timer) {
        kev.ident = 0;
        kev.filter = EVFILT_TIMER;
        kev.flags = EV_ADD|EV_ENABLE;
        kev.fflags = 0;
        kev.data = timer;
        kev.udata = 0;

        ts.tv_sec = 0;
        ts.tv_nsec = 0;

        if (kevent(ngx_kqueue, &kev, 1, NULL, 0, &ts) == -1) {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                          "kevent(EVFILT_TIMER) failed");
            return NGX_ERROR;
        }

        ngx_event_flags |= NGX_USE_TIMER_EVENT;
    }

#endif

#if (NGX_HAVE_CLEAR_EVENT)
    ngx_event_flags |= NGX_USE_CLEAR_EVENT;
#else
    ngx_event_flags |= NGX_USE_LEVEL_EVENT;
#endif

#if (NGX_HAVE_LOWAT_EVENT)
    ngx_event_flags |= NGX_USE_LOWAT_EVENT;
#endif

    nevents = kcf->events;

    ngx_io = ngx_os_io;

    ngx_event_actions = ngx_kqueue_module_ctx.actions;

    return NGX_OK;
}

没啥特别的就是标准的初始化 分配空间。

增加event:

static ngx_int_t
ngx_kqueue_set_event(ngx_event_t *ev, ngx_int_t filter, ngx_uint_t flags)
{
    struct kevent     *kev;
    struct timespec    ts;
    ngx_connection_t  *c;

    c = ev->data;

    ngx_log_debug3(NGX_LOG_DEBUG_EVENT, ev->log, 0,
                   "kevent set event: %d: ft:%i fl:%04Xi",
                   c->fd, filter, flags);

    if (nchanges >= max_changes) {//不够了
        ngx_log_error(NGX_LOG_WARN, ev->log, 0,
                      "kqueue change list is filled up");

        ts.tv_sec = 0;
        ts.tv_nsec = 0;

        if (kevent(ngx_kqueue, change_list, (int) nchanges, NULL, 0, &ts)//注入
            == -1)
        {
            ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno, "kevent() failed");
            return NGX_ERROR;
        }

        nchanges = 0;
    }

    kev = &change_list[nchanges];//获取一个kev

    kev->ident = c->fd;//设置为文件描述符
    kev->filter = (short) filter;
    kev->flags = (u_short) flags;
    kev->udata = NGX_KQUEUE_UDATA_T ((uintptr_t) ev | ev->instance);

    if (filter == EVFILT_VNODE) {
        kev->fflags = NOTE_DELETE|NOTE_WRITE|NOTE_EXTEND
                                 |NOTE_ATTRIB|NOTE_RENAME
#if (__FreeBSD__ == 4 && __FreeBSD_version >= 430000) \
    || __FreeBSD_version >= 500018
                                 |NOTE_REVOKE
#endif
                      ;
        kev->data = 0;

    } else {
#if (NGX_HAVE_LOWAT_EVENT)
        if (flags & NGX_LOWAT_EVENT) {
            kev->fflags = NOTE_LOWAT;
            kev->data = ev->available;

        } else {
            kev->fflags = 0;
            kev->data = 0;
        }
#else
        kev->fflags = 0;
        kev->data = 0;
#endif
    }

    ev->index = nchanges;//设置index
    nchanges++;

    if (flags & NGX_FLUSH_EVENT) {//kevent flush
        ts.tv_sec = 0;
        ts.tv_nsec = 0;

        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, 0, "kevent flush");

        if (kevent(ngx_kqueue, change_list, (int) nchanges, NULL, 0, &ts)
            == -1)
        {
            ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno, "kevent() failed");
            return NGX_ERROR;
        }

        nchanges = 0;
    }

    return NGX_OK;
}

static ngx_int_t
ngx_kqueue_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags)
{
    ngx_int_t          rc;
#if 0
    ngx_event_t       *e;
    ngx_connection_t  *c;
#endif

    ev->active = 1;//标志使用
    ev->disabled = 0;
    ev->oneshot = (flags & NGX_ONESHOT_EVENT) ? 1 : 0;

    ngx_mutex_lock(list_mutex);


    rc = ngx_kqueue_set_event(ev, event, EV_ADD|EV_ENABLE|flags);//标记为add ENABLE

    ngx_mutex_unlock(list_mutex);

    return rc;
}

删除event:

static ngx_int_t
ngx_kqueue_del_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags)
{
    ngx_int_t     rc;
    ngx_event_t  *e;

    ev->active = 0;//设置不活跃
    ev->disabled = 0;

    ngx_mutex_lock(list_mutex);

    if (ev->index < nchanges
        && ((uintptr_t) change_list[ev->index].udata & (uintptr_t) ~1)
            == (uintptr_t) ev)//运气不错 还没有压入
    {
        ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,
                       "kevent deleted: %d: ft:%i",
                       ngx_event_ident(ev->data), event);

        /* if the event is still not passed to a kernel we will not pass it */

        nchanges--;

        if (ev->index < nchanges) {//不是最后一个
            e = (ngx_event_t *)
                    ((uintptr_t) change_list[nchanges].udata & (uintptr_t) ~1);
            change_list[ev->index] = change_list[nchanges];
            e->index = ev->index;//把最后一个提到当前位置
        }

        ngx_mutex_unlock(list_mutex);

        return NGX_OK;
    }

    /*
     * when the file descriptor is closed the kqueue automatically deletes
     * its filters so we do not need to delete explicitly the event
     * before the closing the file descriptor.
     */

    if (flags & NGX_CLOSE_EVENT) {
        ngx_mutex_unlock(list_mutex);
        return NGX_OK;
    }

    if (flags & NGX_DISABLE_EVENT) {
        ev->disabled = 1;

    } else {
        flags |= EV_DELETE;
    }

    rc = ngx_kqueue_set_event(ev, event, flags);

    ngx_mutex_unlock(list_mutex);

    return rc;
}

删除就是先判断下有没有被压入内核,没有的话,就干掉,有的话,就设置del flag去删除。

process_changes:

static ngx_int_t
ngx_kqueue_process_changes(ngx_cycle_t *cycle, ngx_uint_t try)
{
    int               n;
    ngx_int_t         rc;
    ngx_err_t         err;
    struct timespec   ts;
    struct kevent    *changes;

    ngx_mutex_lock(kevent_mutex);

    ngx_mutex_lock(list_mutex);

    if (nchanges == 0) {
        ngx_mutex_unlock(list_mutex);
        ngx_mutex_unlock(kevent_mutex);
        return NGX_OK;
    }

    changes = change_list;
    if (change_list == change_list0) {
        change_list = change_list1;
    } else {
        change_list = change_list0;
    }

    n = (int) nchanges;
    nchanges = 0;

    ngx_mutex_unlock(list_mutex);

    ts.tv_sec = 0;
    ts.tv_nsec = 0;

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "kevent changes: %d", n);

    if (kevent(ngx_kqueue, changes, n, NULL, 0, &ts) == -1) {
        err = ngx_errno;
        ngx_log_error((err == NGX_EINTR) ? NGX_LOG_INFO : NGX_LOG_ALERT,
                      cycle->log, err, "kevent() failed");
        rc = NGX_ERROR;

    } else {
        rc = NGX_OK;
    }

    ngx_mutex_unlock(kevent_mutex);

    return rc;
}

把changes压入,然后选择一个空的changeless数组。

下面来对event的处理了:

ngx_kqueue_process_events

static ngx_int_t
ngx_kqueue_process_events(ngx_cycle_t *cycle, ngx_msec_t timer,
    ngx_uint_t flags)
{
    int               events, n;
    ngx_int_t         i, instance;
    ngx_uint_t        level;
    ngx_err_t         err;
    ngx_event_t      *ev, **queue;
    struct timespec   ts, *tp;

    if (ngx_threaded) {//是不是线程工作模式
        if (ngx_kqueue_process_changes(cycle, 0) == NGX_ERROR) {
            return NGX_ERROR;
        }

        n = 0;

    } else {
        n = (int) nchanges;//保存changes
        nchanges = 0;
    }

    if (timer == NGX_TIMER_INFINITE) {//有没有超时时间
        tp = NULL;

    } else {

        ts.tv_sec = timer / 1000;
        ts.tv_nsec = (timer % 1000) * 1000000;

        /*
         * 64-bit Darwin kernel has the bug: kernel level ts.tv_nsec is
         * the int32_t while user level ts.tv_nsec is the long (64-bit),
         * so on the big endian PowerPC all nanoseconds are lost.
         */

#if (NGX_DARWIN_KEVENT_BUG)
        ts.tv_nsec <<= 32;
#endif

        tp = &ts;
    }

    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "kevent timer: %M, changes: %d", timer, n);

    events = kevent(ngx_kqueue, change_list, n, event_list, (int) nevents, tp);

    err = (events == -1) ? ngx_errno : 0;//是否出错

    if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) {//是否需要更新时间
        ngx_time_update();
    }

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "kevent events: %d", events);

    if (err) {
        if (err == NGX_EINTR) {//代表啥都没有

            if (ngx_event_timer_alarm) {
                ngx_event_timer_alarm = 0;
                return NGX_OK;
            }

            level = NGX_LOG_INFO;

        } else {//错误
            level = NGX_LOG_ALERT;
        }

        ngx_log_error(level, cycle->log, err, "kevent() failed");
        return NGX_ERROR;
    }

    if (events == 0) {//等待超时
        if (timer != NGX_TIMER_INFINITE) {
            return NGX_OK;
        }

        ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
                      "kevent() returned no events without timeout");
        return NGX_ERROR;
    }

    ngx_mutex_lock(ngx_posted_events_mutex);

    for (i = 0; i < events; i++) {//遍历

        ngx_kqueue_dump_event(cycle->log, &event_list[i]);

        if (event_list[i].flags & EV_ERROR) {//出错了的
            ngx_log_error(NGX_LOG_ALERT, cycle->log, event_list[i].data,
                          "kevent() error on %d filter:%d flags:%04Xd",
                          event_list[i].ident, event_list[i].filter,
                          event_list[i].flags);
            continue;
        }

#if (NGX_HAVE_TIMER_EVENT)

        if (event_list[i].filter == EVFILT_TIMER) {//timer事件
            ngx_time_update();
            continue;
        }

#endif

        ev = (ngx_event_t *) event_list[i].udata;

        switch (event_list[i].filter) {//判断事件类型

        case EVFILT_READ:
        case EVFILT_WRITE:

            instance = (uintptr_t) ev & 1;
            ev = (ngx_event_t *) ((uintptr_t) ev & (uintptr_t) ~1);

            if (ev->closed || ev->instance != instance) {//如果说是关闭了 或者不一样

                /*
                 * the stale event from a file descriptor
                 * that was just closed in this iteration
                 */

                ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                               "kevent: stale event %p", ev);
                continue;
            }

            if (ev->log && (ev->log->log_level & NGX_LOG_DEBUG_CONNECTION)) {
                ngx_kqueue_dump_event(ev->log, &event_list[i]);
            }

            if (ev->oneshot) {//一次
                ev->active = 0;
            }

#if (NGX_THREADS)//线程相关先不考虑

            if ((flags & NGX_POST_THREAD_EVENTS) && !ev->accept) {
                ev->posted_ready = 1;
                ev->posted_available = event_list[i].data;

                if (event_list[i].flags & EV_EOF) {//eof
                    ev->posted_eof = 1;
                    ev->posted_errno = event_list[i].fflags;
                }

                ngx_locked_post_event(ev, &ngx_posted_events);

                continue;
            }

#endif

            ev->available = event_list[i].data;

            if (event_list[i].flags & EV_EOF) {//结束了
                ev->pending_eof = 1;
                ev->kq_errno = event_list[i].fflags;
            }

            ev->ready = 1;//设置ready

            break;

        case EVFILT_VNODE:
            ev->kq_vnode = 1;

            break;

        case EVFILT_AIO:
            ev->complete = 1;
            ev->ready = 1;

            break;

        default:
            ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
                          "unexpected kevent() filter %d",
                          event_list[i].filter);
            continue;
        }

        if (flags & NGX_POST_EVENTS) {//消息发送处理
            queue = (ngx_event_t **) (ev->accept ? &ngx_posted_accept_events:
                                                   &ngx_posted_events);
            ngx_locked_post_event(ev, queue);

            continue;
        }

        ev->handler(ev);//调用处理函数
    }

    ngx_mutex_unlock(ngx_posted_events_mutex);

    return NGX_OK;
}

总体来说,对于event的处理逻辑还是很清晰了,相信小伙伴只要又网络编程经验,就能轻松看懂,所以我就只加上注释,对方法做个大致的说明,后面将会梳理下ngx_connection,ngs_listen,然后是ngx_request,最后在梳理模块的时候,把他们之间的关系串联起来。



版权声明:本文为pengyelovehk原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。