这篇文章接着上篇继续解释流的建立,其中最重要的一个函数FlowGetNew,主要目的是获取一个flow,获取过程中也是非常曲折的,求爷爷告奶奶才能求来一个flow。
1. FlowGetNew 这个函数也分几个方面理解:
FlowHandlePacket-》FlowGetFlowFromHash-》FlowGetNew
a。从线程自己的flow队列里获取flow,如果获取成功则返回flow,如果没有就从全局flow内存池获取一个flow队列,再从flow队列里获取flow。
b。如果线程自己的flow队列和全局flow内存池也没有可用的flow队列,且flow内存超过配置上限,则设置进入紧急模式。
c。接b,设置紧急模式之后,调用函数FlowGetUsedFlow获取flow,这个函数是从正在使用的全局flow_hash的bucket链表里获取一个,后续会注释这个函数。
d。如果线程自己的flow队列和全局flow内存池也没有可用的flow队列,且flow内存没有超过配置上限,则直接在内存上分配flow即调用函数FlowAlloc。
static Flow *FlowGetNew(ThreadVars *tv, FlowLookupStruct *fls, const Packet *p)
{
//获取紧急模式标志
const bool emerg = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) != 0);
//检查是否可以生成flow,icmp错误包不生成flow
//如果是紧急模式,tcp非sync的包不生成flow,可以看出sync包优先生成flow
if (FlowCreateCheck(p, emerg) == 0) {
return NULL;
}
//从线程自己的flow队列里获取flow,如果获取成功则返回flow
/* get a flow from the spare queue */
Flow *f = FlowQueuePrivateGetFromTop(&fls->spare_queue);
if (f == NULL) {
//如果获取flow失败,就从全局flow内存池获取一个flow队列,再从flow队列里获取flow。
//FlowSpareSync这个函数主要是从空闲的全局flow内存池获取一个flow队列,给自己用
f = FlowSpareSync(tv, fls, p, emerg);
}
//全局flow内存池也没有可用的flow队列
if (f == NULL) {
/* If we reached the max memcap, we get a used flow */
//判断如果flow内存超过配置上限,则进入紧急模式。
if (!(FLOW_CHECK_MEMCAP(sizeof(Flow) + FlowStorageSize()))) {
/* declare state of emergency */
if (!(SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)) {
//没有设置紧急模式标志才会设置,设置过就没必要再设置了
SC_ATOMIC_OR(flow_flags, FLOW_EMERGENCY);
//这个函数把当前的流老化的正常超时时间设置为紧急模式超时时间
FlowTimeoutsEmergency();
}
//没有空闲flow,flow占用内存也超过配置上限,试着从正在使用的全局flow_hash
//的bucket链表上获取一个,只有引用计数为0则拿出来,不管它是否满足超时条件
f = FlowGetUsedFlow(tv, fls->dtv, &p->ts);
if (f == NULL) {
return NULL;
}
#ifdef UNITTESTS
if (tv != NULL && fls->dtv != NULL) {
#endif
StatsIncr(tv, fls->dtv->counter_flow_get_used);
#ifdef UNITTESTS
}
#endif
/* flow is still locked from FlowGetUsedFlow() */
FlowUpdateCounter(tv, fls->dtv, p->proto);
return f;
}
//如果线程自己的flow队列和全局flow内存池也没有可用的flow队列,
//且flow内存没有超过配置上限,则直接在内存上分配flow即调用函数FlowAlloc
/* now see if we can alloc a new flow */
f = FlowAlloc();
if (f == NULL) {
#ifdef UNITTESTS
if (tv != NULL && fls->dtv != NULL) {
#endif
StatsIncr(tv, fls->dtv->counter_flow_memcap);
#ifdef UNITTESTS
}
#endif
return NULL;
}
/* flow is initialized but *unlocked* */
//FlowAlloc里会初始化flow,没人用它,也没人锁它,谁用谁上锁
} else {
/* flow has been recycled before it went into the spare queue */
/* flow is initialized (recylced) but *unlocked* */
//这的意思就是,获取到flow了,这个flow回收时会被初始化FlowInit(f),
//的确如此,回收时会这么做,也会释放锁
}
FLOWLOCK_WRLOCK(f);
FlowUpdateCounter(tv, fls->dtv, p->proto);
return f;
}
2. FlowTimeoutsEmergency 函数
FlowHandlePacket-》FlowGetFlowFromHash-》FlowGetNew-》FlowTimeoutsEmergency
这个函数功能单一,把老化超时时间设置为紧急模式的老化时间。
void FlowTimeoutsEmergency(void)
{
//两个参数都是全局变量,
//flow_timeouts 是正在使用的超时时间
//flow_timeouts_emerg 是初始化时设置的紧急超时时间
//把老化超时时间设置为紧急模式的老化时间,设置后在流老化线程中,
//检查超时时获取到的时间就是这个紧急模式的时间
SC_ATOMIC_SET(flow_timeouts, flow_timeouts_emerg);
}
3. FlowGetUsedFlow 函数
FlowHandlePacket-》FlowGetFlowFromHash-》FlowGetNew-》FlowGetUsedFlow
这个函数是在FlowGetNew中获取flow时,既没有空闲flow可用,flow内存也到达了配置的内存上限的时候调用这个函数。
函数主要是从全局变量flow_hash已经使用的flow中获取一个flow,这个flow的引用计数必须为0,不检查是否超时,只要引用计数为0就拿出来。
获取flow时,选取bucket时,有个小算法,如果每次都从flow_hash前边的bucket中拿flow出来,那么前边的势必很快取完,取完后,每次就要从前往后遍历,每次遍历都是无用功,耗费时间,因为在前边的bucket中的符合条件的flow先被取走,就会依次遍历每个bucket。
所以,设置一个原子变量flow_prune_idx,用它控制每次从哪个bucket开始获取符合条件的flow,这里是每次加5,即每次取flow的bucket的索引间隔为5,如果这个bucket上没有符合条件的flow,则索引加1,检查下一个bucket,直到找到flow或者查找计数为5(全局变量固定值),如果遍历到达最后一个bucket,则再从第一个bucket开始搜索。
目的是为每个bucket保留符合条件的flow,每次取时可以较快的找到flow,不需要遍历所有bucket.
/** \internal
* \brief Get a flow from the hash directly.
*
* Called in conditions where the spare queue is empty and memcap is reached.
*
* Walks the hash until a flow can be freed. Timeouts are disregarded, use_cnt
* is adhered to. "flow_prune_idx" atomic int makes sure we don't start at the
* top each time since that would clear the top of the hash leading to longer
* and longer search times under high pressure (observed).
*
* \param tv thread vars
* \param dtv decode thread vars (for flow log api thread data)
*
* \retval f flow or NULL
*/
static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv, const struct timeval *ts)
{
//这个函数对控制bucket索引的全局原子变量加上FLOW_GET_NEW_TRIES,值是5,
//就是每次搜索的bucket索引是上次的值加5,不会每次都从开始搜索,这样可以最大概率的给每个bucket
//保留符合条件的flow,以后再搜索这些bucket,成功概率较大,不需要遍历所有bucket.
//这是个人理解,为什么是最大概率呢,因为不是所有bucket都有符合条件的flow,你不搜索它,它也没有,
//搜索它,它也没有,也谈不上给那个bucket保留符合条件的flow,
uint32_t idx = GetUsedAtomicUpdate(FLOW_GET_NEW_TRIES) % flow_config.hash_size;
uint32_t tried = 0;
while (1) {
//查找次数其实也是查找的bucket个数,如果到达5次,则更新计数器,返回.
if (tried++ > FLOW_GET_NEW_TRIES) {
STATSADDUI64(counter_flow_get_used_eval, tried);
break;
}
//bucket索引到最大时,从0开始
if (++idx >= flow_config.hash_size)
idx = 0;
FlowBucket *fb = &flow_hash[idx];
//next_ts超时变量设置INT_MAX时,说明bucket上没有flow在未来next_ts要超时
//就是bucket是空的,FlowManager老化线程设置的INT_MAX
if (SC_ATOMIC_GET(fb->next_ts) == INT_MAX)
continue;
if (GetUsedTryLockBucket(fb) != 0) {
STATSADDUI64(counter_flow_get_used_eval_busy, 1);
continue;
}
Flow *f = fb->head;
if (f == NULL) {
FBLOCK_UNLOCK(fb);
continue;
}
if (GetUsedTryLockFlow(f) != 0) {
STATSADDUI64(counter_flow_get_used_eval_busy, 1);
FBLOCK_UNLOCK(fb);
continue;
}
/** never prune a flow that is used by a packet or stream msg
* we are currently processing in one of the threads */
if (f->use_cnt > 0) {
//计数器大于0,说明有packe引用这个flow,不能抢别人的flow
STATSADDUI64(counter_flow_get_used_eval_busy, 1);
FBLOCK_UNLOCK(fb);
FLOWLOCK_UNLOCK(f);
continue;
}
//这个函数判断这个flow,最近是否使用过,
//0判断的依据就是最新时间和这个flow的时间差,小于一定秒数则最近使用过,不能拿走
if (StillAlive(f, ts)) {
STATSADDUI64(counter_flow_get_used_eval_reject, 1);
FBLOCK_UNLOCK(fb);
FLOWLOCK_UNLOCK(f);
continue;
}
//好了,可喜可贺,这个flow计数为0,最近没被更新,就它了
//把它从bucket上拿走
/* remove from the hash */
fb->head = f->next;
f->next = NULL;
f->fb = NULL;
FBLOCK_UNLOCK(fb);
//这flow的终止标志,被强行拿走标志FLOW_END_FLAG_FORCED,
//flow终止时是在紧急模式,设置标志FLOW_END_FLAG_EMERGENCY
/* rest of the flags is updated on-demand in output */
f->flow_end_flags |= FLOW_END_FLAG_FORCED;
if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)
f->flow_end_flags |= FLOW_END_FLAG_EMERGENCY;
/* invoke flow log api */
#ifdef UNITTESTS
if (dtv) {
#endif
//输出flow日志
if (dtv->output_flow_thread_data) {
(void)OutputFlowLog(tv, dtv->output_flow_thread_data, f);
}
#ifdef UNITTESTS
}
#endif
//清除flow中的一些资源,主要是是否flow关联的tcp会话的内存即f->protoctx
//flow结构体后的storage空间,void *数组,重新初始化这个flow
FlowClearMemory(f, f->protomap);
/* leave locked */
STATSADDUI64(counter_flow_get_used_eval, tried);
return f;
}
STATSADDUI64(counter_flow_get_used_failed, 1);
return NULL;
}