linux rcu机制,Linux RCU机制详解 (透彻)

  • Post author:
  • Post category:linux


一:前言

RCU机制出现的比较早,只是在linux kernel中一直到2.5版本的时候才被采用.关于RCU机制,这里就不做过多的介绍了,网上有很多有关RCU介绍和使用的文档.请自行查阅.本文主要是从linux kernel源代码的角度.来分析RCU的实现.

在讨论RCU的实现之前.有必要重申以下几点:

1:RCU使用在读者多而写者少的情况.RCU和读写锁相似.但RCU的读者占锁没有任何的系统开销.写者与写写者之间必须要保持同步,且写者必须要等它之前的读者全部都退出之后才能释放之前的资源.

2:RCU保护的是指针.这一点尤其重要.因为指针赋值是一条单指令.也就是说是一个原子操作.因它更改指针指向没必要考虑它的同步.只需要考虑cache的影响.

3:读者是可以嵌套的.也就是说rcu_read_lock()可以嵌套调用.

4:读者在持有rcu_read_lock()的时候,不能发生进程上下文切换.否则,因为写者需要要等待读者完成,写者进程也会一直被阻塞.

以下的代码是基于linux kernel 2.6.26

二:使用RCU的实例

Linux kernel中自己附带有详细的文档来介绍RCU,这些文档位于linux-2.6.26.3/Documentation/RCU. 这些文档值得多花点时间去仔细研读一下.

下面以whatisRCU.txt中的例子作为今天分析的起点:

struct foo {

int a;

char b;

long c;

};

DEFINE_SPINLOCK(foo_mutex);

struct foo *gbl_foo;

void foo_update_a(int new_a)

{

struct foo *new_fp;

struct foo *old_fp;

new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);

spin_lock(&foo_mutex);

old_fp = gbl_foo;

*new_fp = *old_fp;

new_fp->a = new_a;

rcu_assign_pointer(gbl_foo, new_fp);

spin_unlock(&foo_mutex);

synchronize_rcu();

kfree(old_fp);

}

int foo_get_a(void)

{

int retval;

rcu_read_lock();

retval = rcu_dereference(gbl_foo)->a;

rcu_read_unlock();

return retval;

}

如上代码所示,RCU被用来保护全局指针struct foo *gbl_foo. foo_get_a()用来从RCU保护的结构中取得gbl_foo的值.而foo_update_a()用来更新被RCU保护的gbl_foo的值.

另外,我们思考一下,为什么要在foo_update_a()中使用自旋锁foo_mutex呢?

假设中间没有使用自旋锁.那foo_update_a()的代码如下:

void foo_update_a(int new_a)

{

struct foo *new_fp;

struct foo *old_fp;

new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);

old_fp = gbl_foo;

1:————————-

*new_fp = *old_fp;

new_fp->a = new_a;

rcu_assign_pointer(gbl_foo, new_fp);

synchronize_rcu();

kfree(old_fp);

}

假设A进程在上图—-标识处被B进程抢点.B进程也执行了goo_ipdate_a().等B执行完后,再切换回A进程.此时,A进程所持的old_fd实际上已经被B进程给释放掉了.此后A进程对old_fd的操作都是非法的.

另外,我们在上面也看到了几个有关RCU的核心API.它们为别是:

rcu_read_lock()

rcu_read_unlock()

synchronize_rcu()

rcu_assign_pointer()

rcu_dereference()

其中,rcu_read_lock()和rcu_read_unlock()用来保持一个读者的RCU临界区.在该临界区内不允许发生上下文切换.

rcu_dereference():读者调用它来获得一个被RCU保护的指针.

Rcu_assign_pointer():写者使用该函数来为被RCU保护的指针分配一个新的值.这样是为了安全从写者到读者更改其值.这个函数会返回一个新值

三:RCU API实现分析

Rcu_read_lock()和rcu_read_unlock()的实现如下:

#define rcu_read_lock() __rcu_read_lock()

#define rcu_read_unlock() __rcu_read_unlock()

#define __rcu_read_lock()

do {

preempt_disable();

__acquire(RCU);

rcu_read_acquire();

} while (0)

#define __rcu_read_unlock()

do {

rcu_read_release();

__release(RCU);

preempt_enable();

} while (0)

其中__acquire(),rcu_read_read_acquire(),rcu_read_release(),rcu_read_release()都是一些选择编译函数,可以忽略不可看.因此可以得知.rcu_read_lock(),rcu_read_unlock()只是禁止和启用抢占.因为在读者临界区,不允许发生上下文切换.

rcu_dereference()和rcu_assign_pointer()的实现如下:

#define rcu_dereference(p) ({

typeof(p) _________p1 = ACCESS_ONCE(p);

smp_read_barrier_depends();

(_________p1);

})

#define rcu_assign_pointer(p, v)

({

if (!__builtin_constant_p(v) ||

((v) != NULL))

smp_wmb();

(p) = (v);

})

它们的实现也很简单.因为它们本身都是原子操作.因为只是为了cache一致性,插上了内存屏障.可以让其它的读者/写者可以看到保护指针的最新值.

synchronize_rcu()在RCU中是一个最核心的函数,它用来等待之前的读者全部退出.我们后面的大部份分析也是围绕着它而进行.实现如下:

void synchronize_rcu(void)

{

struct rcu_synchronize rcu;

init_completion(&rcu.completion);

/* Will wake me after RCU finished */

call_rcu(&rcu.head, wakeme_after_rcu);

/* Wait for it */

wait_for_completion(&rcu.completion);

}

我们可以看到,它初始化了一个本地变量,它的类型为struct rcu_synchronize.调用call_rcu()之后.一直等待条件变量rcu.competion的满足.

在这里看到了RCU的另一个核心API,它就是call_run().它的定义如下:

void call_rcu(struct rcu_head *head,

void (*func)(struct rcu_head *rcu))

它用来等待之前的读者操作完成之后,就会调用函数func.

我们也可以看到,在synchronize_rcu()中,读者操作完了要调用的函数就是wakeme_after_rcu().

另外,call_rcu()用在不可睡眠的条件中,如果中断环境,禁止抢