Suricata6.0流表管理源码注释四:流的建立02

  • Post author:
  • Post category:其他


这篇文章接着上篇继续解释流的建立,其中最重要的一个函数FlowGetNew,主要目的是获取一个flow,获取过程中也是非常曲折的,求爷爷告奶奶才能求来一个flow。

1. FlowGetNew 这个函数也分几个方面理解:

FlowHandlePacket-》FlowGetFlowFromHash-》FlowGetNew

a。从线程自己的flow队列里获取flow,如果获取成功则返回flow,如果没有就从全局flow内存池获取一个flow队列,再从flow队列里获取flow。

b。如果线程自己的flow队列和全局flow内存池也没有可用的flow队列,且flow内存超过配置上限,则设置进入紧急模式。

c。接b,设置紧急模式之后,调用函数FlowGetUsedFlow获取flow,这个函数是从正在使用的全局flow_hash的bucket链表里获取一个,后续会注释这个函数。

d。如果线程自己的flow队列和全局flow内存池也没有可用的flow队列,且flow内存没有超过配置上限,则直接在内存上分配flow即调用函数FlowAlloc。

static Flow *FlowGetNew(ThreadVars *tv, FlowLookupStruct *fls, const Packet *p)
{
    //获取紧急模式标志
    const bool emerg = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) != 0);

    //检查是否可以生成flow,icmp错误包不生成flow
    //如果是紧急模式,tcp非sync的包不生成flow,可以看出sync包优先生成flow
    if (FlowCreateCheck(p, emerg) == 0) {
        return NULL;
    }

    //从线程自己的flow队列里获取flow,如果获取成功则返回flow
    /* get a flow from the spare queue */
    Flow *f = FlowQueuePrivateGetFromTop(&fls->spare_queue);
    if (f == NULL) {
        //如果获取flow失败,就从全局flow内存池获取一个flow队列,再从flow队列里获取flow。
        //FlowSpareSync这个函数主要是从空闲的全局flow内存池获取一个flow队列,给自己用
        f = FlowSpareSync(tv, fls, p, emerg);
    }

    //全局flow内存池也没有可用的flow队列
    if (f == NULL) {
        /* If we reached the max memcap, we get a used flow */
        //判断如果flow内存超过配置上限,则进入紧急模式。
        if (!(FLOW_CHECK_MEMCAP(sizeof(Flow) + FlowStorageSize()))) {
            /* declare state of emergency */
            if (!(SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)) {
                //没有设置紧急模式标志才会设置,设置过就没必要再设置了
                SC_ATOMIC_OR(flow_flags, FLOW_EMERGENCY);
                //这个函数把当前的流老化的正常超时时间设置为紧急模式超时时间
                FlowTimeoutsEmergency();
            }
        
            //没有空闲flow,flow占用内存也超过配置上限,试着从正在使用的全局flow_hash
            //的bucket链表上获取一个,只有引用计数为0则拿出来,不管它是否满足超时条件
            f = FlowGetUsedFlow(tv, fls->dtv, &p->ts);
            if (f == NULL) {
                return NULL;
            }
#ifdef UNITTESTS
            if (tv != NULL && fls->dtv != NULL) {
#endif
                StatsIncr(tv, fls->dtv->counter_flow_get_used);
#ifdef UNITTESTS
            }
#endif
            /* flow is still locked from FlowGetUsedFlow() */
            FlowUpdateCounter(tv, fls->dtv, p->proto);
            return f;
        }

        //如果线程自己的flow队列和全局flow内存池也没有可用的flow队列,
        //且flow内存没有超过配置上限,则直接在内存上分配flow即调用函数FlowAlloc
        /* now see if we can alloc a new flow */
        f = FlowAlloc();
        if (f == NULL) {
#ifdef UNITTESTS
            if (tv != NULL && fls->dtv != NULL) {
#endif
                StatsIncr(tv, fls->dtv->counter_flow_memcap);
#ifdef UNITTESTS
            }
#endif
            return NULL;
        }

        /* flow is initialized but *unlocked* */
        //FlowAlloc里会初始化flow,没人用它,也没人锁它,谁用谁上锁
    } else {
        /* flow has been recycled before it went into the spare queue */

        /* flow is initialized (recylced) but *unlocked* */
        //这的意思就是,获取到flow了,这个flow回收时会被初始化FlowInit(f),
        //的确如此,回收时会这么做,也会释放锁
    }

    FLOWLOCK_WRLOCK(f);
    FlowUpdateCounter(tv, fls->dtv, p->proto);
    return f;
}

2. FlowTimeoutsEmergency 函数

FlowHandlePacket-》FlowGetFlowFromHash-》FlowGetNew-》FlowTimeoutsEmergency

这个函数功能单一,把老化超时时间设置为紧急模式的老化时间。

void FlowTimeoutsEmergency(void)
{
    //两个参数都是全局变量,
    //flow_timeouts 是正在使用的超时时间
    //flow_timeouts_emerg 是初始化时设置的紧急超时时间

    //把老化超时时间设置为紧急模式的老化时间,设置后在流老化线程中,
    //检查超时时获取到的时间就是这个紧急模式的时间
    SC_ATOMIC_SET(flow_timeouts, flow_timeouts_emerg);
}

3. FlowGetUsedFlow 函数

FlowHandlePacket-》FlowGetFlowFromHash-》FlowGetNew-》FlowGetUsedFlow

这个函数是在FlowGetNew中获取flow时,既没有空闲flow可用,flow内存也到达了配置的内存上限的时候调用这个函数。

函数主要是从全局变量flow_hash已经使用的flow中获取一个flow,这个flow的引用计数必须为0,不检查是否超时,只要引用计数为0就拿出来。

获取flow时,选取bucket时,有个小算法,如果每次都从flow_hash前边的bucket中拿flow出来,那么前边的势必很快取完,取完后,每次就要从前往后遍历,每次遍历都是无用功,耗费时间,因为在前边的bucket中的符合条件的flow先被取走,就会依次遍历每个bucket。

所以,设置一个原子变量flow_prune_idx,用它控制每次从哪个bucket开始获取符合条件的flow,这里是每次加5,即每次取flow的bucket的索引间隔为5,如果这个bucket上没有符合条件的flow,则索引加1,检查下一个bucket,直到找到flow或者查找计数为5(全局变量固定值),如果遍历到达最后一个bucket,则再从第一个bucket开始搜索。

目的是为每个bucket保留符合条件的flow,每次取时可以较快的找到flow,不需要遍历所有bucket.

/** \internal
 *  \brief Get a flow from the hash directly.
 *
 *  Called in conditions where the spare queue is empty and memcap is reached.
 *
 *  Walks the hash until a flow can be freed. Timeouts are disregarded, use_cnt
 *  is adhered to. "flow_prune_idx" atomic int makes sure we don't start at the
 *  top each time since that would clear the top of the hash leading to longer
 *  and longer search times under high pressure (observed).
 *
 *  \param tv thread vars
 *  \param dtv decode thread vars (for flow log api thread data)
 *
 *  \retval f flow or NULL
 */
static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv, const struct timeval *ts)
{
    //这个函数对控制bucket索引的全局原子变量加上FLOW_GET_NEW_TRIES,值是5,
    //就是每次搜索的bucket索引是上次的值加5,不会每次都从开始搜索,这样可以最大概率的给每个bucket
    //保留符合条件的flow,以后再搜索这些bucket,成功概率较大,不需要遍历所有bucket.
    //这是个人理解,为什么是最大概率呢,因为不是所有bucket都有符合条件的flow,你不搜索它,它也没有,
    //搜索它,它也没有,也谈不上给那个bucket保留符合条件的flow,
    uint32_t idx = GetUsedAtomicUpdate(FLOW_GET_NEW_TRIES) % flow_config.hash_size;
    uint32_t tried = 0;

    while (1) {
        //查找次数其实也是查找的bucket个数,如果到达5次,则更新计数器,返回.
        if (tried++ > FLOW_GET_NEW_TRIES) {
            STATSADDUI64(counter_flow_get_used_eval, tried);
            break;
        }

        //bucket索引到最大时,从0开始
        if (++idx >= flow_config.hash_size)
            idx = 0;

        FlowBucket *fb = &flow_hash[idx];

        //next_ts超时变量设置INT_MAX时,说明bucket上没有flow在未来next_ts要超时
        //就是bucket是空的,FlowManager老化线程设置的INT_MAX
        if (SC_ATOMIC_GET(fb->next_ts) == INT_MAX)
            continue;

        if (GetUsedTryLockBucket(fb) != 0) {
            STATSADDUI64(counter_flow_get_used_eval_busy, 1);
            continue;
        }

        Flow *f = fb->head;
        if (f == NULL) {
            FBLOCK_UNLOCK(fb);
            continue;
        }

        if (GetUsedTryLockFlow(f) != 0) {
            STATSADDUI64(counter_flow_get_used_eval_busy, 1);
            FBLOCK_UNLOCK(fb);
            continue;
        }

        /** never prune a flow that is used by a packet or stream msg
         *  we are currently processing in one of the threads */
        if (f->use_cnt > 0) {
            //计数器大于0,说明有packe引用这个flow,不能抢别人的flow
            STATSADDUI64(counter_flow_get_used_eval_busy, 1);
            FBLOCK_UNLOCK(fb);
            FLOWLOCK_UNLOCK(f);
            continue;
        }

        //这个函数判断这个flow,最近是否使用过,
        //0判断的依据就是最新时间和这个flow的时间差,小于一定秒数则最近使用过,不能拿走
        if (StillAlive(f, ts)) {
            STATSADDUI64(counter_flow_get_used_eval_reject, 1);
            FBLOCK_UNLOCK(fb);
            FLOWLOCK_UNLOCK(f);
            continue;
        }

        //好了,可喜可贺,这个flow计数为0,最近没被更新,就它了
        //把它从bucket上拿走
        /* remove from the hash */
        fb->head = f->next;
        f->next = NULL;
        f->fb = NULL;
        FBLOCK_UNLOCK(fb);

        //这flow的终止标志,被强行拿走标志FLOW_END_FLAG_FORCED,
        //flow终止时是在紧急模式,设置标志FLOW_END_FLAG_EMERGENCY
        /* rest of the flags is updated on-demand in output */
        f->flow_end_flags |= FLOW_END_FLAG_FORCED;
        if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)
            f->flow_end_flags |= FLOW_END_FLAG_EMERGENCY;

        /* invoke flow log api */
#ifdef UNITTESTS
        if (dtv) {
#endif
            //输出flow日志
            if (dtv->output_flow_thread_data) {
                (void)OutputFlowLog(tv, dtv->output_flow_thread_data, f);
            }
#ifdef UNITTESTS
        }
#endif
        //清除flow中的一些资源,主要是是否flow关联的tcp会话的内存即f->protoctx
        //flow结构体后的storage空间,void *数组,重新初始化这个flow
        FlowClearMemory(f, f->protomap);

        /* leave locked */

        STATSADDUI64(counter_flow_get_used_eval, tried);
        return f;
    }

    STATSADDUI64(counter_flow_get_used_failed, 1);
    return NULL;
}



版权声明:本文为ljq32原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。