一:前言
RCU机制出现的比较早,只是在linux kernel中一直到2.5版本的时候才被采用.关于RCU机制,这里就不做过多的介绍了,网上有很多有关RCU介绍和使用的文档.请自行查阅.本文主要是从linux kernel源代码的角度.来分析RCU的实现.
在讨论RCU的实现之前.有必要重申以下几点:
1:RCU使用在读者多而写者少的情况.RCU和读写锁相似.但RCU的读者占锁没有任何的系统开销.写者与写写者之间必须要保持同步,且写者必须要等它之前的读者全部都退出之后才能释放之前的资源.
2:RCU保护的是指针.这一点尤其重要.因为指针赋值是一条单指令.也就是说是一个原子操作.因它更改指针指向没必要考虑它的同步.只需要考虑cache的影响.
3:读者是可以嵌套的.也就是说rcu_read_lock()可以嵌套调用.
4:读者在持有rcu_read_lock()的时候,不能发生进程上下文切换.否则,因为写者需要要等待读者完成,写者进程也会一直被阻塞.
以下的代码是基于linux kernel 2.6.26
二:使用RCU的实例
Linux kernel中自己附带有详细的文档来介绍RCU,这些文档位于linux-2.6.26.3/Documentation/RCU. 这些文档值得多花点时间去仔细研读一下.
下面以whatisRCU.txt中的例子作为今天分析的起点:
struct foo {
int a;
char b;
long c;
};
DEFINE_SPINLOCK(foo_mutex);
struct foo *gbl_foo;
void foo_update_a(int new_a)
{
struct foo *new_fp;
struct foo *old_fp;
new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);
spin_lock(&foo_mutex);
old_fp = gbl_foo;
*new_fp = *old_fp;
new_fp->a = new_a;
rcu_assign_pointer(gbl_foo, new_fp);
spin_unlock(&foo_mutex);
synchronize_rcu();
kfree(old_fp);
}
int foo_get_a(void)
{
int retval;
rcu_read_lock();
retval = rcu_dereference(gbl_foo)->a;
rcu_read_unlock();
return retval;
}
如上代码所示,RCU被用来保护全局指针struct foo *gbl_foo. foo_get_a()用来从RCU保护的结构中取得gbl_foo的值.而foo_update_a()用来更新被RCU保护的gbl_foo的值.
另外,我们思考一下,为什么要在foo_update_a()中使用自旋锁foo_mutex呢?
假设中间没有使用自旋锁.那foo_update_a()的代码如下:
void foo_update_a(int new_a)
{
struct foo *new_fp;
struct foo *old_fp;
new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);
old_fp = gbl_foo;
1:————————-
*new_fp = *old_fp;
new_fp->a = new_a;
rcu_assign_pointer(gbl_foo, new_fp);
synchronize_rcu();
kfree(old_fp);
}
假设A进程在上图—-标识处被B进程抢点.B进程也执行了goo_ipdate_a().等B执行完后,再切换回A进程.此时,A进程所持的old_fd实际上已经被B进程给释放掉了.此后A进程对old_fd的操作都是非法的.
另外,我们在上面也看到了几个有关RCU的核心API.它们为别是:
rcu_read_lock()
rcu_read_unlock()
synchronize_rcu()
rcu_assign_pointer()
rcu_dereference()
其中,rcu_read_lock()和rcu_read_unlock()用来保持一个读者的RCU临界区.在该临界区内不允许发生上下文切换.
rcu_dereference():读者调用它来获得一个被RCU保护的指针.
Rcu_assign_pointer():写者使用该函数来为被RCU保护的指针分配一个新的值.这样是为了安全从写者到读者更改其值.这个函数会返回一个新值
三:RCU API实现分析
Rcu_read_lock()和rcu_read_unlock()的实现如下:
#define rcu_read_lock() __rcu_read_lock()
#define rcu_read_unlock() __rcu_read_unlock()
#define __rcu_read_lock()
do {
preempt_disable();
__acquire(RCU);
rcu_read_acquire();
} while (0)
#define __rcu_read_unlock()
do {
rcu_read_release();
__release(RCU);
preempt_enable();
} while (0)
其中__acquire(),rcu_read_read_acquire(),rcu_read_release(),rcu_read_release()都是一些选择编译函数,可以忽略不可看.因此可以得知.rcu_read_lock(),rcu_read_unlock()只是禁止和启用抢占.因为在读者临界区,不允许发生上下文切换.
rcu_dereference()和rcu_assign_pointer()的实现如下:
#define rcu_dereference(p) ({
typeof(p) _________p1 = ACCESS_ONCE(p);
smp_read_barrier_depends();
(_________p1);
})
#define rcu_assign_pointer(p, v)
({
if (!__builtin_constant_p(v) ||
((v) != NULL))
smp_wmb();
(p) = (v);
})
它们的实现也很简单.因为它们本身都是原子操作.因为只是为了cache一致性,插上了内存屏障.可以让其它的读者/写者可以看到保护指针的最新值.
synchronize_rcu()在RCU中是一个最核心的函数,它用来等待之前的读者全部退出.我们后面的大部份分析也是围绕着它而进行.实现如下:
void synchronize_rcu(void)
{
struct rcu_synchronize rcu;
init_completion(&rcu.completion);
/* Will wake me after RCU finished */
call_rcu(&rcu.head, wakeme_after_rcu);
/* Wait for it */
wait_for_completion(&rcu.completion);
}
我们可以看到,它初始化了一个本地变量,它的类型为struct rcu_synchronize.调用call_rcu()之后.一直等待条件变量rcu.competion的满足.
在这里看到了RCU的另一个核心API,它就是call_run().它的定义如下:
void call_rcu(struct rcu_head *head,
void (*func)(struct rcu_head *rcu))
它用来等待之前的读者操作完成之后,就会调用函数func.
我们也可以看到,在synchronize_rcu()中,读者操作完了要调用的函数就是wakeme_after_rcu().
另外,call_rcu()用在不可睡眠的条件中,如果中断环境,禁止抢