linux rcu机制,Linux RCU机制详解 (透彻)
一:前言
RCU機制出現的比較早,只是在linux kernel中一直到2.5版本的時候才被采用.關于RCU機制,這里就不做過多的介紹了,網上有很多有關RCU介紹和使用的文檔.請自行查閱.本文主要是從linux kernel源代碼的角度.來分析RCU的實現.
在討論RCU的實現之前.有必要重申以下幾點:
1:RCU使用在讀者多而寫者少的情況.RCU和讀寫鎖相似.但RCU的讀者占鎖沒有任何的系統開銷.寫者與寫寫者之間必須要保持同步,且寫者必須要等它之前的讀者全部都退出之后才能釋放之前的資源.
2:RCU保護的是指針.這一點尤其重要.因為指針賦值是一條單指令.也就是說是一個原子操作.因它更改指針指向沒必要考慮它的同步.只需要考慮cache的影響.
3:讀者是可以嵌套的.也就是說rcu_read_lock()可以嵌套調用.
4:讀者在持有rcu_read_lock()的時候,不能發生進程上下文切換.否則,因為寫者需要要等待讀者完成,寫者進程也會一直被阻塞.
以下的代碼是基于linux kernel 2.6.26
二:使用RCU的實例
Linux kernel中自己附帶有詳細的文檔來介紹RCU,這些文檔位于linux-2.6.26.3/Documentation/RCU. 這些文檔值得多花點時間去仔細研讀一下.
下面以whatisRCU.txt中的例子作為今天分析的起點:
struct foo {
int a;
char b;
long c;
};
DEFINE_SPINLOCK(foo_mutex);
struct foo *gbl_foo;
void foo_update_a(int new_a)
{
struct foo *new_fp;
struct foo *old_fp;
new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);
spin_lock(&foo_mutex);
old_fp = gbl_foo;
*new_fp = *old_fp;
new_fp->a = new_a;
rcu_assign_pointer(gbl_foo, new_fp);
spin_unlock(&foo_mutex);
synchronize_rcu();
kfree(old_fp);
}
int foo_get_a(void)
{
int retval;
rcu_read_lock();
retval = rcu_dereference(gbl_foo)->a;
rcu_read_unlock();
return retval;
}
如上代碼所示,RCU被用來保護全局指針struct foo *gbl_foo. foo_get_a()用來從RCU保護的結構中取得gbl_foo的值.而foo_update_a()用來更新被RCU保護的gbl_foo的值.
另外,我們思考一下,為什么要在foo_update_a()中使用自旋鎖foo_mutex呢?
假設中間沒有使用自旋鎖.那foo_update_a()的代碼如下:
void foo_update_a(int new_a)
{
struct foo *new_fp;
struct foo *old_fp;
new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);
old_fp = gbl_foo;
1:-------------------------
*new_fp = *old_fp;
new_fp->a = new_a;
rcu_assign_pointer(gbl_foo, new_fp);
synchronize_rcu();
kfree(old_fp);
}
假設A進程在上圖----標識處被B進程搶點.B進程也執行了goo_ipdate_a().等B執行完后,再切換回A進程.此時,A進程所持的old_fd實際上已經被B進程給釋放掉了.此后A進程對old_fd的操作都是非法的.
另外,我們在上面也看到了幾個有關RCU的核心API.它們為別是:
rcu_read_lock()
rcu_read_unlock()
synchronize_rcu()
rcu_assign_pointer()
rcu_dereference()
其中,rcu_read_lock()和rcu_read_unlock()用來保持一個讀者的RCU臨界區.在該臨界區內不允許發生上下文切換.
rcu_dereference():讀者調用它來獲得一個被RCU保護的指針.
Rcu_assign_pointer():寫者使用該函數來為被RCU保護的指針分配一個新的值.這樣是為了安全從寫者到讀者更改其值.這個函數會返回一個新值
三:RCU API實現分析
Rcu_read_lock()和rcu_read_unlock()的實現如下:
#define rcu_read_lock() __rcu_read_lock()
#define rcu_read_unlock() __rcu_read_unlock()
#define __rcu_read_lock()
do {
preempt_disable();
__acquire(RCU);
rcu_read_acquire();
} while (0)
#define __rcu_read_unlock()
do {
rcu_read_release();
__release(RCU);
preempt_enable();
} while (0)
其中__acquire(),rcu_read_read_acquire(),rcu_read_release(),rcu_read_release()都是一些選擇編譯函數,可以忽略不可看.因此可以得知.rcu_read_lock(),rcu_read_unlock()只是禁止和啟用搶占.因為在讀者臨界區,不允許發生上下文切換.
rcu_dereference()和rcu_assign_pointer()的實現如下:
#define rcu_dereference(p) ({
typeof(p) _________p1 = ACCESS_ONCE(p);
smp_read_barrier_depends();
(_________p1);
})
#define rcu_assign_pointer(p, v)
({
if (!__builtin_constant_p(v) ||
((v) != NULL))
smp_wmb();
(p) = (v);
})
它們的實現也很簡單.因為它們本身都是原子操作.因為只是為了cache一致性,插上了內存屏障.可以讓其它的讀者/寫者可以看到保護指針的最新值.
synchronize_rcu()在RCU中是一個最核心的函數,它用來等待之前的讀者全部退出.我們后面的大部份分析也是圍繞著它而進行.實現如下:
void synchronize_rcu(void)
{
struct rcu_synchronize rcu;
init_completion(&rcu.completion);
/* Will wake me after RCU finished */
call_rcu(&rcu.head, wakeme_after_rcu);
/* Wait for it */
wait_for_completion(&rcu.completion);
}
我們可以看到,它初始化了一個本地變量,它的類型為struct rcu_synchronize.調用call_rcu()之后.一直等待條件變量rcu.competion的滿足.
在這里看到了RCU的另一個核心API,它就是call_run().它的定義如下:
void call_rcu(struct rcu_head *head,
void (*func)(struct rcu_head *rcu))
它用來等待之前的讀者操作完成之后,就會調用函數func.
我們也可以看到,在synchronize_rcu()中,讀者操作完了要調用的函數就是wakeme_after_rcu().
另外,call_rcu()用在不可睡眠的條件中,如果中斷環境,禁止搶占環境等.而synchronize_rcu()用在可睡眠的環境下.先跟蹤看一下wakeme_after_rcu():
static void wakeme_after_rcu(struct rcu_head *head)
{
struct rcu_synchronize *rcu;
rcu = container_of(head, struct rcu_synchronize, head);
complete(&rcu->completion);
}
我們可以看到,該函數將條件變量置真,然后喚醒了在條件變量上等待的進程.
看下call_rcu():
void call_rcu(struct rcu_head *head,
void (*func)(struct rcu_head *rcu))
{
unsigned long flags;
struct rcu_data *rdp;
head->func = func;
head->next = NULL;
local_irq_save(flags);
rdp = &__get_cpu_var(rcu_data);
*rdp->nxttail = head;
rdp->nxttail = &head->next;
if (unlikely(++rdp->qlen > qhimark)) {
rdp->blimit = INT_MAX;
force_quiescent_state(rdp, &rcu_ctrlblk);
}
local_irq_restore(flags);
}
該函數也很簡單,就是將head加在了per_cpu變量rcu_data的tail鏈表上.
Rcu_data定義如下:
DEFINE_PER_CPU(struct rcu_data, rcu_data) = { 0L };
由此,我們可以得知,每一個CPU都有一個rcu_data.每個調用call_rcu()/synchronize_rcu()進程所代表的head都會掛到rcu_data的tail鏈表上.
那究竟怎么去判斷當前的寫者已經操作完了呢?我們在之前看到,不是讀者在調用rcu_read_lock()的時候要禁止搶占么?因此,我們只需要判斷如有的CPU都進過了一次上下文切換,就說明所有讀者已經退出了.
引用>( (
http://www.ibm.com/developerworks/cn/linux/l-rcu/
)中有關這個過程的描述:
“等待適當時機的這一時期稱為grace period,而CPU發生了上下文切換稱為經歷一個quiescent state,grace period就是所有CPU都經歷一次quiescent state所需要的等待的時間。垃圾收集器就是在grace period之后調用寫者注冊的回調函數來完成真正的數據修改或數據釋放操作的”
要徹底弄清楚這個問題,我們得從RCU的初始化說起.
四:從RCU的初始化說起
RCU的初始化位于start_kernel()àrcu_init().代碼如下:
void __init rcu_init(void)
{
__rcu_init();
}
void __init __rcu_init(void)
{
rcu_cpu_notify(&rcu_nb, CPU_UP_PREPARE,
(void *)(long)smp_processor_id());
/* Register notifier for non-boot CPUs */
register_cpu_notifier(&rcu_nb);
}
Reqister_cpu_notifier()是關于通知鏈表的操作,可以忽略不看.
跟進rcu_cpu_notify():
static int __cpuinit rcu_cpu_notify(struct notifier_block *self,
unsigned long action, void *hcpu)
{
long cpu = (long)hcpu;
switch (action) {
case CPU_UP_PREPARE:
case CPU_UP_PREPARE_FROZEN:
rcu_online_cpu(cpu);
break;
case CPU_DEAD:
case CPU_DEAD_FROZEN:
&n
bsp; rcu_offline_cpu(cpu);
break;
default:
break;
}
return NOTIFY_OK;
}
注意到,在__rcu_init()中是以CPU_UP_PREPARE為參數調用此函數,對應流程轉入rcu_online_cpu中:
static void __cpuinit rcu_online_cpu(int cpu)
{
struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
struct rcu_data *bh_rdp = &per_cpu(rcu_bh_data, cpu);
rcu_init_percpu_data(cpu, &rcu_ctrlblk, rdp);
rcu_init_percpu_data(cpu, &rcu_bh_ctrlblk, bh_rdp);
open_softirq(RCU_SOFTIRQ, rcu_process_callbacks, NULL);
}
我們從這里又看到了另一個per_cpu變量,rcu_bh_data.有關bh的部份之后再來分析.在這里略過這些部份.
Rcu_init_percpu_data()如下:
static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp,
struct rcu_data *rdp)
{
memset(rdp, 0, sizeof(*rdp));
rdp->curtail = &rdp->curlist;
rdp->nxttail = &rdp->nxtlist;
rdp->donetail = &rdp->donelist;
rdp->quiescbatch = rcp->completed;
rdp->qs_pending = 0;
rdp->cpu = cpu;
rdp->blimit = blimit;
}
調用這個函數的第二個參數是一個全局變量rcu_ctlblk.定義如下:
static struct rcu_ctrlblk rcu_ctrlblk = {
.cur = -300,
.completed = -300,
.lock = __SPIN_LOCK_UNLOCKED(&rcu_ctrlblk.lock),
.cpumask = CPU_MASK_NONE,
};
static struct rcu_ctrlblk rcu_bh_ctrlblk = {
.cur = -300,
.completed = -300,
.lock = __SPIN_LOCK_UNLOCKED(&rcu_bh_ctrlblk.lock),
.cpumask = CPU_MASK_NONE,
};
在rcu_init_percpu_data中,初始化了三個鏈表,分別是taillist,curlist和donelist.另外, 將rdp->quiescbatch 賦值為 rcp->completed.這個是一個很重要的操作.
Rdp-> quiescbatch表示rcu_data已經完成的grace period序號(在代碼中也被稱為了batch),rcp->completed表示全部變量rcu_ctrlblk計數已經完成的grace period序號.將rdp->quiescbatch = rcp->completed;,表示不需要等待grace period.
回到rcu_online_cpu()中:
open_softirq(RCU_SOFTIRQ, rcu_process_callbacks, NULL);
初始化了RCU_SOFTIRQ類型的軟中斷.但這個軟中斷什么時候被打開,還需要之后來分析.
之后,每個CPU的初始化都會經過start_kernel()->rcu_init().相應的,也為每個CPU初始化了RCU的相關結構.
五:等待RCU讀者操作完成
之前,我們看完了RCU的初始化,現在可以來看一下RCU如何來判斷當前的RCU讀者已經退出了.
在每一次進程切換的時候,都會調用rcu_qsctr_inc().如下代碼片段如示:
asmlinkage void __sched schedule(void)
{
......
......
rcu_qsctr_inc(cpu);
......
}
Rcu_qsctr_inc()代碼如下:
static inline void rcu_qsctr_inc(int cpu)
{
struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
rdp->passed_quiesc = 1;
}
該函數將對應CPU上的rcu_data的passed_quiesc成員設為了1.
或許你已經發現了,這個過程就標識該CPU經過了一次quiescent state.沒錯:-)
另外,在時鐘中斷中,會進行以下操作:
void update_process_times(int user_tick)
{
......
......
if (rcu_pending(cpu))
rcu_check_callbacks(cpu, user_tick);
......
......
}
在每一次時鐘中斷,都會檢查是否有需要更新的RCU需要處理,如果有,就會為其調用rcu_check_callbacks().
Rcu_pending()的代碼如下:
int rcu_pending(int cpu)
{
return __rcu_pending(&rcu_ctrlblk, &per_cpu(rcu_data, cpu)) ||
__rcu_pending(&rcu_bh_ctrlblk, &per_cpu(rcu_bh_data, cpu));
}
同上面一樣,忽略bh的部份.
static int __rcu_pending(struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
{
/* This cpu has pending rcu entries and the grace period
* for them has completed.
*/
if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch))
return 1;
/* This cpu has no pending entries, but there are new entries */
if (!rdp->curlist && rdp->nxtlist)
return 1;
/* This cpu has finished callbacks to invoke */
if (rdp->donelist)
return 1;
/* The rcu core waits for a quiescent state from the cpu */
if (rdp->quiescbatch != rcp->cur || rdp->qs_pending)
return 1;
/* nothing to do */
return 0;
}
上面有四種情況會返回1,分別對應:
1:該CPU上有等待處理的回調函數,且已經經過了一個batch(grace period).rdp->datch表示rdp在等待的batch序號
2:上一個等待已經處理完了,又有了新注冊的回調函數.
3:等待已經完成,但尚末調用該次等待的回調函數.
4:在等待quiescent state.
關于rcp和rdp結構中成員的含義,我們等用到的時候再來分析.
如果rcu_pending返回1,就會進入到rcu_check_callbacks().代碼如下:
void rcu_check_callbacks(int cpu, int user)
{
if (user ||
(idle_cpu(cpu) && !in_softirq() &&
hardirq_count() taillist.
當前等待grace period完成的函數都會鏈入到rdp->curlist上.
到等待的grace period已經到來,就會將curlist上的鏈表移到donelist上.
當一個grace period過了之后,就會將taillist上的數據移到rdp->curlist上.之后加冊的回調函數又會將其加到rdp->taillist上.
__rcu_process_callbacks()代碼分段分析如下:
static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
struct rcu_data *rdp)
{
if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch)) {
*rdp->donetail = rdp->curlist;
rdp->donetail = rdp->curtail;
rdp->curlist = NULL;
rdp->curtail = &rdp->curlist;
}
如果有需要處理的回調函數,且已經經過了一次grace period.就將curlist上的數據移到donetlist上.
其中,crp->completed表示已經完成的grace period.rdp->batch表示該CPU正在等待的grace period序號.
if (rdp->nxtlist && !rdp->curlist) {
local_irq_disable();
rdp->curlist = rdp->nxtlist;
rdp->curtail = rdp->nxttail;
rdp->nxtlist = NULL;
rdp->nxttail = &rdp->nxtlist;
local_irq_enable();
/*
* start the next batch of callbacks
*/
/* determine batch number */
rdp->batch = rcp->cur + 1;
/* see the comment and corresponding wmb() in
* the rcu_start_batch()
*/
smp_rmb();
if (!rcp->next_pending) {
/* and start it/schedule start if it's a new batch */
spin_lock(&rcp->lock);
rcp->next_pending = 1;
rcu_start_batch(rcp);
spin_unlock(&rcp->lock);
}
}
如果上一個等待的回調函數處理完了,而且又有了新注冊的回調函數.就將taillist上的數據移動到curlist上.并開啟新的grace period等待.
注意里面幾個變量的賦值:
rdp->batch = rcp->cur + 1表示該CPU等待的grace period置為當前已發生grace period序號的下一
個.
每次啟動一個新的grace period等待之后,就會將rcp->next_pending.在啟動的過程中,也就是rcu_start_batch()的過程中,會將rcp->next_pending置為1.設置這個變量主要是防止多個寫者競爭的情況
//更新相關信息
rcu_check_quiescent_state(rcp, rdp);
//處理等待完成的回調函數
if (rdp->donelist)
rcu_do_batch(rdp);
}
接著,更新相關的信息,例如,判斷當前CPU是否進行了quiescent state.或者grace period是否已經完成.
最后再處理掛在rdp->donelist上的鏈表.
這里面有幾個子函數值得好好分析,分別分析如下:
第一個要分析的是rcu_start_batch():
static void rcu_start_batch(struct rcu_ctrlblk *rcp)
{
if (rcp->next_pending &&
rcp->completed == rcp->cur) {
rcp->next_pending = 0;
smp_wmb();
rcp->cur++;
smp_mb();
cpus_andnot(rcp->cpumask, cpu_online_map, nohz_cpu_mask);
rcp->signaled = 0;
}
}
這個函數的代碼雖然很簡單,但隱藏了很多玄機.
每次啟動一個新的grace period等待的時候就將rcp->cur加1,將rcp->cpumask中,將存在的CPU的位置1.
其中,if判斷必須要滿足二個條件:
第一:rcp->next_pending必須為1.我們把這個函數放到__rcu_process_callbacks()這個大環境中看一下:
static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
struct rcu_data *rdp)
{
......
......
if (rdp->nxtlist && !rdp->curlist) {
......
if (!rcp->next_pending) {
/* and start it/schedule start if it's a new batch */
spin_lock(&rcp->lock);
rcp->next_pending = 1;
rcu_start_batch(rcp);
spin_unlock(&rcp->lock);
}
}
}
首先,rcp->next_pending為0才會調用rcu_start_batch()啟動一個新的進程.然后,將rcp->next_pending置為1,再調用rcu_start_batch().在這里要注意中間的自旋鎖.然后在rcu_start_batch()中,再次判斷rcp->next_pending為1后,再進行后續操作,并將rcp->next_pending置為0.
為什么這里需要這樣的判斷呢? 如果其它CPU正在開啟一個新的grace period等待,那就用不著再次開啟一個新的等待了,直接返回即可.
第二: rcu_start_batch()中if要滿足的第二個條件為rcp->completed == rcp->cur.也就是說前面的grace period全部都完成了.每次開啟新等待的時候都會將rcp->cur加1.每一個等待完成之后,都會將rc-> completed等于rcp->cur.
第二個要分析的函數是rcu_check_quiescent_state().代碼如下:
static void rcu_check_quiescent_state(struct rcu_ctrlblk *rcp,
struct rcu_data *rdp)
{
if (rdp->quiescbatch != rcp->cur) {
/* start new grace period: */
rdp->qs_pending = 1;
rdp->passed_quiesc = 0;
rdp->quiescbatch = rcp->cur;
return;
}
/* Grace period already completed for this cpu?
* qs_pending is checked instead of the actual bitmap to avoid
* cacheline trashing.
*/
if (!rdp->qs_pending)
return;
/*
* Was there a quiescent state since the beginning of the grace
* period? If no, then exit and wait for the next call.
*/
if (!rdp->passed_quiesc)
return;
rdp->qs_pending = 0;
spin_lock(&rcp->lock);
/*
* rdp->quiescbatch/rcp->cur and the cpu bitmap can come out of sync
* during cpu startup. Ignore the quiescent state.
*/
if (likely(rdp->quiescbatch == rcp->cur))
cpu_quiet(rdp->cpu, rcp);
spin_unlock(&rcp->lock);
}
首先,如果rdp->quiescbatch != rcp->cur.則說明又開啟了一個新的等待,因此需要重新處理這個等待,首先將rdp->quiescbatch 更新為rcp->cur.然后,使rdp->qs_pending為1.表示有等待需要處理. passed_quiesc也被清成了0.
然后,再判斷rdp->passed_quiesc是否為真,記得我們在之前分析過,在每次進程切換或者進程切換的時候,都會調用rcu_qsctr_inc().該函數會將rdp->passed_quiesc置為1.
因此,在這里判斷這個值是為了檢測該CPU上是否發生了上下文切換.
之后,就是一段被rcp->lock保護的一段區域.如果還是等待沒有發生改變,就會調用cpu_quiet(rdp->cpu, rcp)將該CPU位清零.如果是一個新的等待了,就用不著清了,因為需要重新判斷該CPU上是否發生了上下文切換.
cpu_quiet()函數代碼如下:
static void cpu_quiet(int cpu, struct rcu_ctrlblk *rcp)
{
cpu_clear(cpu, rcp->cpumask);
if (cpus_empty(rcp->cpumask)) {
/* batch completed ! */
rcp->completed = rcp->cur;
rcu_start_batch(rcp);
}
}
它清除當前CPU對應的位,如果CPMMASK為空,對應所有的CPU都發生了進程切換,就會將rcp->completed = rcp->cur.并且根據需要是否開始一個grace period等待.
最后一個要分析的函數是rcu_do_batch().它進行的是清尾的工作.如果等待完成了,那就必須要處理donelist鏈表上掛載的數據了.代碼如下:
static void rcu_do_batch(struct rcu_data *rdp)
{
struct rcu_head *next, *list;
int count = 0;
list = rdp->donelist;
while (list) {
next = list->next;
prefetch(next);
list->func(list);
list = next;
if (++count >= rdp->blimit)
break;
}
rdp->donelist = list;
local_irq_disable();
rdp->qlen -= count;
local_irq_enable();
if (rdp->blimit == INT_MAX && rdp->qlen blimit = blimit;
if (!rdp->donelist)
rdp->donetail = &rdp->donelist;
else
raise_rcu_softirq();
}
它遍歷處理掛在鏈表上的回調函數.在這里,注意每次調用的回調函數有最大值限制.這樣做主要是防止一次調用過多的回調函數而產生不必要系統負載.如果donelist中還有沒處理完的數據,打開RCU軟中斷,在下次軟中斷到來的時候接著處理.
五:幾種RCU情況分析
1:如果CPU 1上有進程調用rcu_read_lock進入臨界區,之后退出來,發生了進程切換,新進程又通過rcu_read-_lock進入臨界區.由于RCU軟中斷中只判斷一次上下文切換,因此,在調用回調函數的時候,仍然有進程處于RCU的讀臨界區,這樣會不會有問題呢?
這樣是不會有問題的.還是上面的例子:
spin_lock(&foo_mutex);
old_fp = gbl_foo;
*new_fp = *old_fp;
new_fp->a = new_a;
rcu_assign_pointer(gbl_foo, new_fp);
spin_unlock(&foo_mutex);
synchronize_rcu();
kfree(old_fp);
使用synchronize_rcu ()只是為了等待持有old_fd(也就是調用rcu_assign_pointer ()更新之前的gbl_foo)的進程退出.而不需要等待所有的讀者全部退出.這是因為,在rcu_assign_pointer ()之后的讀取取得的保護指針,已經是更新好的新值了.
2:上面分析的似乎是針對有掛載鏈表的CPU而言的,那對于只調用rcu_read_lock()的CPU,它們是怎么處理的呢?
首先,每次啟動一次等待,肯定是會更新rcp->cur的.因此,在rcu_pending()的判斷中,下面語句會被滿足:
if (rdp->quiescbatch != rcp->cur || rdp->qs_pending)
return 1;
因此會進入到RCU的軟中斷.在軟中斷處理中:
rcu_process_callbacks() à __rcu_process_callbacks() àrcu_check_quiescent_state()
中,如果該CPU上有進程切換,就會各新rcp中的CPU 掩碼數組.
3:如果一個CPU連續調用synchronize_rcu()或者call_rcu()它們會有什么影響呢?
如果當前有請求在等待,就會新請提交的回調函數掛到taillist上,一直到前一個等待完成,再將taillist的數據移到curlist,并開啟一個新的等待,因此,也就是說,在前一個等待期間提交的請求,都會放到一起處理.也就是說,他們會共同等待所有CPU切換完成.
舉例說明如下:
假設grace period時間是12ms.在12ms內,先后有A,B,C進程提交請求.
那系統在等待處理完后,交A,B,C移到curlist中,開始一個新的等待.
六:有關rcu_read_lock_bh()/rcu_read_unlock_bh()/call_rcu_bh().
在上面的代碼分析的時候,經常看到帶有bh的RCU代碼.現在來看一下這些帶bh的RCU是什么樣的.
#define rcu_read_lock_bh() __rcu_read_lock_bh()
#define rcu_read_unlock_bh() __rcu_read_unlock_bh()
#define __rcu_read_lock_bh()
do {
local_bh_disable();
__acquire(RCU_BH);
rcu_read_acquire();
}while (0)
#define __rcu_read_unlock_bh()
do {
rcu_read_release();
__release(RCU_BH);
local_bh_enable();
} while (0)
根據上面的分析:bh RCU跟普通的RCU相比不同的是,普通RCU是禁止內核搶占,而bh RCU是禁止下半部.
其實,帶bh的RCU一般在軟中斷使用,不過計算quiescent state并不是發生一次上下文切換.而是發生一次softirq.我們在后面的分析中可得到印證.
Call_rcu_bh()代碼如下:
void call_rcu_bh(struct rcu_head *head,
void (*func)(struct rcu_head *rcu))
{
unsigned long flags;
struct rcu_data *rdp;
head->func = func;
head->next = NULL;
local_irq_save(flags);
rdp = &__get_cpu_var(rcu_bh_data);
*rdp->nxttail = head;
rdp->nxttail = &head->next;
if (unlikely(++rdp->qlen > qhimark)) {
rdp->blimit = INT_MAX;
force_quiescent_state(rdp, &rcu_bh_ctrlblk);
}
local_irq_restore(flags);
}
它跟call_rcu()不相同的是,rcu是取per_cpu變量rcu__data和全局變量rcu_ctrlblk.而bh RCU是取rcu_bh_data,rcu_bh_ctrlblk.他們的類型都是一樣的,這樣做只是為了區分BH和普通RCU的等待.
對于rcu_bh_qsctr_inc
static inline void rcu_bh_qsctr_inc(int cpu)
{
struct rcu_data *rdp = &per_cpu(rcu_bh_data, cpu);
rdp->passed_quiesc = 1;
}
它跟rcu_qsctr_inc()機同,也是更改對應成員.
所不同的是,調用rcu_bh_qsctr_inc()的地方發生了變化.
asmlinkage void __do_softirq(void)
{
......
do {
if (pending & 1) {
h->action(h);
rcu_bh_qsctr_inc(cpu);
}
h++;
pending >>= 1;
} while (pending);
......
}
也就是說,在發生軟中斷的時候,才會認為是經過了一次quiescent state.
總結
以上是生活随笔為你收集整理的linux rcu机制,Linux RCU机制详解 (透彻)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: linux redhat 命令大全,re
- 下一篇: t3出行怎么预约打车