0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認識你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

自旋鎖的發(fā)展歷史與使用方法

Linux閱碼場 ? 來源:Linux閱碼場 ? 作者:程磊 ? 2022-08-08 08:51 ? 次閱讀

作者簡介:

程磊,一線碼農(nóng),在某手機公司擔(dān)任系統(tǒng)開發(fā)工程師,日常喜歡研究內(nèi)核基本原理。

目錄

一、自旋鎖的發(fā)展歷史

二、原始自旋鎖

2.1 定義與初始化 2.2 加鎖操作 2.3 解鎖操作

三、票號自旋鎖

3.1 定義與初始化 3.2 加鎖操作 3.3 解鎖操作

四、MCS自旋鎖

4.1 定義與初始化 4.2 加鎖操作 4.3 解鎖操作

五、隊列自旋鎖

5.1 定義與初始化 5.2 加鎖操作 5.3 解鎖操作

六、自旋鎖的使用

6.1 自旋鎖的適用場景 6.2 自旋鎖與禁用偽并發(fā)的配合使用 6.3 raw_spin_lock的使用問題

一、自旋鎖的發(fā)展歷史

自旋鎖是Linux內(nèi)核里最常用的鎖之一,自旋鎖的概念很簡單,就是如果加鎖失敗在等鎖時是使用休眠等待還是忙等待,如果是忙等待的話,就是自旋鎖,這也是自旋鎖名字的由來。自旋鎖的邏輯是,用自旋鎖保護的臨界區(qū)要足夠小,而且臨界區(qū)內(nèi)是不能休眠的。所以當(dāng)自旋鎖加鎖失敗時,說明有其它的臨界區(qū)正在執(zhí)行中。由于自旋鎖的臨界區(qū)足夠小且不會休眠,所以我們可以自旋忙等待其它臨界區(qū)的退出,沒必要去休眠,因為休眠要做一大堆操作。而忙等待的話,對方很快就會退出臨界區(qū),我們就可以很快地獲得自旋鎖了。

自旋鎖與UP、SMP的關(guān)系:

根據(jù)自旋鎖的邏輯,自旋鎖的臨界區(qū)是不能休眠的。在UP下,只有一個CPU,如果我們執(zhí)行到了臨界區(qū),此時自旋鎖是不可能處于加鎖狀態(tài)的。因為我們正在占用CPU,又沒有其它的CPU,其它的臨界區(qū)要么沒有到來、要么已經(jīng)執(zhí)行過去了。所以我們是一定能獲得自旋鎖的,所以自旋鎖對UP來說是沒有意義的。但是為了在UP和SMP下代碼的一致性,UP下也有自旋鎖,但是自旋鎖的定義就變成了空結(jié)構(gòu)體,自旋鎖的加鎖操作就退化成了禁用搶占,自旋鎖的解鎖操作也就退化成了開啟搶占。所以說自旋鎖只適用于SMP,但是在UP下也提供了兼容操作。

自旋鎖一開始的實現(xiàn)是很簡單的,后來隨著眾核時代的到來,自旋鎖的公平性成了很大的問題,于是內(nèi)核實現(xiàn)了票號自旋鎖(ticket spinlock)來保證加鎖的公平性。后來又發(fā)現(xiàn)票號自旋鎖有很大的性能問題,于是又開始著力解決自旋鎖的性能問題。先是開發(fā)出了MCS自旋鎖,確實解決了性能問題,但是它改變了自旋鎖的接口,所以沒辦法替代自旋鎖。然后又有人對MCS自旋鎖進行改造從而開發(fā)出了隊列自旋鎖(queue spinlock)。隊列自旋鎖既解決了自旋鎖的性能問題,又保持了自旋鎖的原有接口,非常完美?,F(xiàn)在內(nèi)核使用的自旋鎖是隊列自旋鎖。下面我們用一張圖來總結(jié)一下自旋鎖的發(fā)展史(x86平臺)。

4cb1685e-16ae-11ed-ba43-dac502259ad0.png

注:MCS自旋鎖進了內(nèi)核,但是由于接口不兼容和體積問題,并沒有取代票號自旋鎖。

下面我們將按照自旋鎖的發(fā)展順序來逐步講解,本文的代碼都是按照x86平臺進行講解的,代碼都進行了刪減,把一些調(diào)試數(shù)據(jù)或者無關(guān)緊要的數(shù)據(jù)、代碼都刪除了。

二、原始自旋鎖

我們在《深入理解Linux線程同步》里面講了簡單自旋鎖,原始自旋鎖和它的原理是一樣的,但是實現(xiàn)細節(jié)更為復(fù)雜一些。本節(jié)以內(nèi)核版本2.6.24來講解代碼。

2.1 定義與初始化

我們先來看原始自旋鎖的定義。

linux-src/include/linux/spinlock_types.h

typedef struct {  raw_spinlock_t raw_lock;} spinlock_t;

做了刪減,把調(diào)試相關(guān)的配置數(shù)據(jù)都刪了。

linux-src/include/asm-x86/spinlock_types.h

typedef struct {  unsigned int slock;} raw_spinlock_t;

我們可以看到原始自旋鎖的定義非常簡單,本質(zhì)上就是一個無符號整數(shù)。

下面我們再來看一下自旋鎖變量的定義與初始化。自旋鎖的定義與初始化分為靜態(tài)和動態(tài)兩種。靜態(tài)是指自旋鎖在編譯時就分配好了空間、數(shù)據(jù)就初始化好了,這種情況一般是全局自旋鎖變量。動態(tài)是指自旋鎖是在運行時去創(chuàng)建然后用函數(shù)去初始化的,這種情況一般是自旋鎖內(nèi)嵌在某個結(jié)構(gòu)體里面,隨著結(jié)構(gòu)體的創(chuàng)建而創(chuàng)建,需要用函數(shù)去初始化一下。

靜態(tài)定義與初始化如下:

linux-src/include/linux/spinlock_types.h

#define DEFINE_SPINLOCK(x)  spinlock_t x = __SPIN_LOCK_UNLOCKED(x)
#define __SPIN_LOCK_UNLOCKED(lockname)   (spinlock_t)  {  .raw_lock = __RAW_SPIN_LOCK_UNLOCKED}

linux-src/include/asm-x86/spinlock_types.h

#define __RAW_SPIN_LOCK_UNLOCKED  { 1 }

自旋鎖的動態(tài)初始化如下:

linux-src/include/linux/spinlock.h

# define spin_lock_init(lock)            do { *(lock) = SPIN_LOCK_UNLOCKED; } while (0)

此處的do while(0)是為了能把宏當(dāng)做函數(shù)一樣來用。我們調(diào)用函數(shù)時最后面都要加個;分號,如果沒有do while(0),我們在最后加個;分號,語法就不對了,如果不加,宏看上去就不像是函數(shù)調(diào)用了。有了do while(0),這個問題就解決了。

靜態(tài)初始化是在編譯時就對變量賦值了,動態(tài)初始化是在運行時才對變量進行賦值。

2.2 加鎖操作

下面我們來看一下自旋鎖的加鎖操作:

linux-src/include/linux/spinlock.h

#define spin_lock(lock)      _spin_lock(lock)

linux-src/kernel/spinlock.c

void __lockfunc _spin_lock(spinlock_t *lock){  preempt_disable();  LOCK_CONTENDED(lock, _raw_spin_trylock, _raw_spin_lock);}

linux-src/include/linux/lockdep.h

#define LOCK_CONTENDED(_lock, try, lock)   lock(_lock)

linux-src/include/linux/spinlock.h

# define _raw_spin_lock(lock)    __raw_spin_lock(&(lock)->raw_lock)

linux-src/include/asm-x86/spinlock_32.h

static inline void __raw_spin_lock(raw_spinlock_t *lock){  asm volatile("
1:	"         LOCK_PREFIX " ; decb %0
	"         "jns 3f
"         "2:	"         "rep;nop
	"         "cmpb $0,%0
	"         "jle 2b
	"         "jmp 1b
"         "3:
	"         : "+m" (lock->slock) : : "memory");}

linux-src/include/asm-x86/spinlock_64.h

static inline void __raw_spin_lock(raw_spinlock_t *lock){  asm volatile(    "
1:	"    LOCK_PREFIX " ; decl %0
	"    "jns 2f
"    "3:
"    "rep;nop
	"    "cmpl $0,%0
	"    "jle 3b
	"    "jmp 1b
"    "2:	" : "=m" (lock->slock) : : "memory");}

可以看到spin_lock的最終實現(xiàn)是__raw_spin_lock,是在架構(gòu)代碼里面,在x86上分為32位和64位兩種實現(xiàn),用的都是內(nèi)嵌匯編代碼。關(guān)于內(nèi)嵌匯編,可以查詢gcc的官方文檔GCC內(nèi)嵌匯編語言。

這兩段內(nèi)嵌匯編代碼的意思都是一樣的,但是比較晦澀難懂,我們把它轉(zhuǎn)換為C代碼。

static inline void __raw_spin_lock(raw_spinlock_t *lock){  while(1){    if(--lock->slock == 0) // 匯編代碼中有l(wèi)ock指令前綴,此操作是原子的      return;    while((int)lock->slock <= 0){}  }}

轉(zhuǎn)換成C代碼之后就很好理解了。原始自旋鎖用1來表示沒有加鎖,在無限循環(huán)中,我們首先把鎖變量原子地減1并比較是否等于0,如果等于0,說明剛才鎖變量是1,現(xiàn)在變成了0,我們加鎖成功了,直接返回。如果鎖變量不等于0,就是說鎖變量剛才是0,現(xiàn)在變成負的了,那么我們就無限循環(huán)鎖變量小于等于0,直到鎖變量大于0,也就是等于1,結(jié)束此循環(huán),重新回到大循環(huán)中去嘗試加鎖。為什么要把鎖變量強轉(zhuǎn)為int呢,因為鎖變量的定義是無符號數(shù),而在匯編代碼中把它當(dāng)做有符號數(shù)使用,所以加個int強轉(zhuǎn)。為什么內(nèi)循環(huán)是小于等于0而不是小于0呢,首先剛到達內(nèi)循環(huán)的時候,說明我們搶鎖失敗,鎖變量一定是小于0的,在內(nèi)循環(huán)執(zhí)行的過程中,如果有人釋放了鎖而又有人立馬搶到了鎖,此時鎖變量還是0,此時我們結(jié)束內(nèi)循環(huán)去搶鎖是沒有意義的,搶鎖肯定會失敗還會回到內(nèi)循環(huán)。所以只有當(dāng)鎖變量大于0也就是等于1時,代表鎖是空閑的,此時結(jié)束內(nèi)循環(huán)才是有意義的。

2.3 解鎖操作

下面我們看一下解鎖操作:

linux-src/include/linux/spinlock.h

#define spin_unlock(lock)    _spin_unlock(lock)

linux-src/kernel/spinlock.c

void __lockfunc _spin_unlock(spinlock_t *lock){  _raw_spin_unlock(lock);  preempt_enable();}

linux-src/include/linux/spinlock.h

# define _raw_spin_unlock(lock)    __raw_spin_unlock(&(lock)->raw_lock)

linux-src/include/asm-x86/spinlock_32.h

static inline void __raw_spin_unlock(raw_spinlock_t *lock){  asm volatile("movb $1,%0" : "+m" (lock->slock) :: "memory");}

linux-src/include/asm-x86/spinlock_64.h

static inline void __raw_spin_unlock(raw_spinlock_t *lock){  asm volatile("movl $1,%0" :"=m" (lock->slock) :: "memory");}

可以看到解鎖操作也是在架構(gòu)代碼里面實現(xiàn)的,用的也是內(nèi)嵌匯編代碼,這個代碼比較簡單,就是把鎖變量賦值為1,我們就不再轉(zhuǎn)換成C代碼了。

三、票號自旋鎖

看了上面的原始自旋鎖實現(xiàn)之后,我們發(fā)現(xiàn)自旋鎖并沒有排隊機制,如果有很多人在競爭鎖的情況下,誰能獲得鎖是不確定的。在CPU核數(shù)還比較少的時候,這個問題并不突出,內(nèi)核也沒有去解決這個問題。后來隨著CPU核數(shù)越來越多,內(nèi)核越來越復(fù)雜、鎖競爭越來越激烈,自旋鎖的不公平性問題就越來越突出了。有人做了個實驗,在一個8核CPU的電腦上,有的線程竟然連續(xù)100萬次都沒有獲得自旋鎖。這個不公平性就太嚴重了,解決自旋鎖的公平性問題就迫在眉睫了。

為了解決自旋鎖的公平性問題,內(nèi)核開發(fā)了票號自旋鎖。它的原理就類似于我們?nèi)ャy行辦業(yè)務(wù),以前沒有叫號機,我們每個人都盯著業(yè)務(wù)窗口看,發(fā)現(xiàn)一個人走了立馬一窩蜂地擠過去,誰搶到了位置就輪到誰辦業(yè)務(wù)。在人特別多的時候,有的人可能早上十點來的,下午五點都沒搶到機會。這怎么能行呢,太不公平了,于是銀行買了叫號機,每個進來的人都先取一個號,然后坐著等。業(yè)務(wù)員處理完一個人的業(yè)務(wù)之后就播報下一個要處理的票號。每個人都要一直注意著廣播播報,發(fā)現(xiàn)廣播里叫的號和自己手里的號是一樣的,就輪到自己去辦業(yè)務(wù)了。

票號自旋鎖在實現(xiàn)時把原來的自旋鎖的整形變量拆分成了兩部分,一部分是owner,代表當(dāng)前正在辦業(yè)務(wù)的票號,另一部分是next,代表下一個人取號的號碼。每次加鎖時先取號,定義一個局部變量int ticket = next++,自己取的號是next的值,再把next的值加1,然后不停地比較自己的ticket和owner的值,如果不相等就一直比較,直到相等為止,相等代表加鎖成功,該自己去辦業(yè)務(wù)了。辦業(yè)務(wù)就相當(dāng)于臨界區(qū),辦完業(yè)務(wù)離開臨界區(qū),解鎖自旋鎖就是把owner加1。此時如果有其它人在自旋,他發(fā)現(xiàn)owner加1之后和自己的ticket相等了,就結(jié)束自旋,代表他加鎖成功了。我們來畫一個圖來演示一下:

4cdb6ac8-16ae-11ed-ba43-dac502259ad0.png

票號自旋鎖的狀態(tài)和原始自旋鎖有很大不同,原始自旋鎖是1代表未加鎖,0代表已加鎖,看不出來排隊等鎖的線程有多少個。票號自旋鎖,owner和next相等代表未加鎖,兩者不一定等于0,next和owner的差值等于排隊等鎖的線程個數(shù)。

下面我們以內(nèi)核版本4.1來講解。在x86的實現(xiàn)上,owner叫head,next叫tail,其實這么叫也很合理,從head到tail正好是所有加鎖的人構(gòu)成的一個隊列,head是隊首,已經(jīng)獲得了鎖,tail是隊尾,是下一個來的人的序號。

3.1 定義與初始化

我們先來看票號自旋鎖的定義:

linux-src/include/linux/spinlock_types.h

typedef struct spinlock {  struct raw_spinlock rlock;} spinlock_t;
typedef struct raw_spinlock {  arch_spinlock_t raw_lock;} raw_spinlock_t;

linux-src/arch/x86/include/asm/spinlock_types.h

typedef struct arch_spinlock {  union {    __ticketpair_t head_tail;    struct __raw_tickets {      __ticket_t head, tail;    } tickets;  };} arch_spinlock_t;
#if (CONFIG_NR_CPUS < (256 / __TICKET_LOCK_INC))typedef u8  __ticket_t;typedef u16 __ticketpair_t;#elsetypedef u16 __ticket_t;typedef u32 __ticketpair_t;#endif

這里同樣把一些調(diào)試數(shù)據(jù)代碼進行了刪除。spinlock包含raw_spinlock,raw_spinlock包含arch_spinlock_t,之前只有兩層,spinlock是對外接口,raw_spinlock是各個架構(gòu)的實現(xiàn),現(xiàn)在為什么又多了個arch_spinlock_t呢,原因和RTLinux有關(guān)。為了配合RTLinux的實現(xiàn),Linus決定把原來的raw_spinlock改為arch_spinlock,把原來的spinlock改為raw_spinlock,再實現(xiàn)一個新的spinlock來包含raw_spinlock。這樣的話,arch_spinlock就是各個架構(gòu)的實現(xiàn),spinlock和raw_spinlock在標準Linux下的含義沒有區(qū)別,在RTLinux下含義不同,具體請看6.3節(jié)的講解。

arch_spinlock中使用了共用體,既可以把head tail當(dāng)成一個變量來處理,又可以把它們分開當(dāng)成兩個變量來處理。

下面我們來看一下票號自旋鎖的初始化。

靜態(tài)初始化如下:

linux-src/include/linux/spinlock_types.h

#define DEFINE_SPINLOCK(x)  spinlock_t x = __SPIN_LOCK_UNLOCKED(x)#define __SPIN_LOCK_UNLOCKED(lockname)   (spinlock_t ) __SPIN_LOCK_INITIALIZER(lockname)#define __SPIN_LOCK_INITIALIZER(lockname)   { { .rlock = __RAW_SPIN_LOCK_INITIALIZER(lockname) } }#define __RAW_SPIN_LOCK_INITIALIZER(lockname)    { .raw_lock = __ARCH_SPIN_LOCK_UNLOCKED }

linux-src/arch/x86/include/asm/spinlock_types.h

#define __ARCH_SPIN_LOCK_UNLOCKED  { { 0 } }

動態(tài)初始化如下:

linux-src/include/linux/spinlock.h

#define spin_lock_init(_lock)        do {                raw_spin_lock_init(&(_lock)->rlock);    } while (0)# define raw_spin_lock_init(lock)          do { *(lock) = __RAW_SPIN_LOCK_UNLOCKED(lock); } while (0)

linux-src/include/linux/spinlock_types.h

#define __RAW_SPIN_LOCK_UNLOCKED(lockname)    (raw_spinlock_t) __RAW_SPIN_LOCK_INITIALIZER(lockname)#define __RAW_SPIN_LOCK_INITIALIZER(lockname)    { .raw_lock = __ARCH_SPIN_LOCK_UNLOCKED  }

靜態(tài)初始化和動態(tài)初始化都把票號自旋鎖初始化為{head:0,tail:0}狀態(tài),head和tail相同表明鎖當(dāng)前是未加鎖狀態(tài)。

3.2 加鎖操作

下面我們來看一下票號自旋鎖的加鎖操作:

linux-src/include/linux/spinlock.h

static inline void spin_lock(spinlock_t *lock){  raw_spin_lock(&lock->rlock);}#define raw_spin_lock(lock)  _raw_spin_lock(lock)

linux-src/kernel/locking/spinlock.c

void __lockfunc _raw_spin_lock(raw_spinlock_t *lock){  __raw_spin_lock(lock);}

linux-src/include/linux/spinlock_api_smp.h

static inline void __raw_spin_lock(raw_spinlock_t *lock){  preempt_disable();  LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);}

inux-src/include/linux/lockdep.h

#define LOCK_CONTENDED(_lock, try, lock)   lock(_lock)

linux-src/include/linux/spinlock.h

static inline void do_raw_spin_lock(raw_spinlock_t *lock) __acquires(lock){  arch_spin_lock(&lock->raw_lock);}

linux-src/arch/x86/include/asm/spinlock.h

static __always_inline void arch_spin_lock(arch_spinlock_t *lock){  register struct __raw_tickets inc = { .tail = TICKET_LOCK_INC };
  inc = xadd(&lock->tickets, inc);  if (likely(inc.head == inc.tail))    goto out;
  for (;;) {    unsigned count = SPIN_THRESHOLD;
    do {      inc.head = READ_ONCE(lock->tickets.head);      if (__tickets_equal(inc.head, inc.tail))        goto clear_slowpath;      cpu_relax();    } while (--count);    __ticket_lock_spinning(lock, inc.tail);  }clear_slowpath:  __ticket_check_and_clear_slowpath(lock, inc.head);out:  barrier();  /* make sure nothing creeps before the lock is taken */}

通過層層調(diào)用,spin_lock最終調(diào)用架構(gòu)下的函數(shù)arch_spin_lock。arch_spin_lock函數(shù)和我們前面講的邏輯是一樣的,只不過是代碼實現(xiàn)需要稍微解釋一下。首先定義了一個局部變量inc,inc的初始值是tail為1,然后通過xadd函數(shù)把自旋鎖的tail加1,并返回原自旋鎖的值,xadd函數(shù)是原子操作,此時得到的inc的tail值就是我們的票號ticket。先判斷一下我們的ticket(inc.tail)是否和owner(inc.head)相等,如果相等代表我們加鎖成功了,goto out。如果不相等,就進入一個無限for循環(huán),不停地讀取lock->tickets.head的值,和自己的ticket比較,如果不相等就一直比較,如果相等則代表我們加鎖成功了,退出循環(huán)。為了避免其它代碼有問題而產(chǎn)生死鎖,上述操作是在for循環(huán)里面又加了個do while循環(huán),只循環(huán)一定的次數(shù)。

3.3 解鎖操作

下面我們看一下票號自旋鎖的解鎖操作:

linux-src/include/linux/spinlock.h

static inline void spin_unlock(spinlock_t *lock){  raw_spin_unlock(&lock->rlock);}#define raw_spin_unlock(lock)    _raw_spin_unlock(lock)

linux-src/kernel/locking/spinlock.c

void __lockfunc _raw_spin_unlock(raw_spinlock_t *lock){  __raw_spin_unlock(lock);}

linux-src/include/linux/spinlock_api_smp.h

static inline void __raw_spin_unlock(raw_spinlock_t *lock){  do_raw_spin_unlock(lock);  preempt_enable();}

linux-src/include/linux/spinlock.h

static inline void do_raw_spin_unlock(raw_spinlock_t *lock) __releases(lock){  arch_spin_unlock(&lock->raw_lock);}

linux-src/arch/x86/include/asm/spinlock.h

static __always_inline void arch_spin_unlock(arch_spinlock_t *lock){  __add(&lock->tickets.head, TICKET_LOCK_INC, UNLOCK_LOCK_PREFIX);}

解鎖操作還是挺簡單的,最終只是把head也就是owner加1了而已。

四、MCS自旋鎖

上面的票號自旋鎖完美地解決了公平問題,邏輯簡單,代碼簡潔。但是還存在著一個嚴重的問題,就是當(dāng)鎖競爭比較激烈的時候,大家都在自旋head變量,會導(dǎo)致緩存顛簸,嚴重降低了CPU的性能。為了解決這個問題,我們應(yīng)當(dāng)設(shè)計出一種鎖,把所有排隊等鎖的線程放到一個隊列上,每個線程都自旋自己的節(jié)點,這樣就不會有緩存顛簸問題了,而且還是公平鎖。MCS鎖就是這么設(shè)計的,鎖本身是個指針,指向排隊隊列的末尾,每個申請加鎖的線程都要自己創(chuàng)建一個鎖節(jié)點,然后把自己放到這個隊列的末尾并讓鎖指針指向自己,最后在自己的節(jié)點上自旋。當(dāng)線程解鎖時,要看看自己的next指針是否為空,如果不為空說明有人在等鎖,要把next節(jié)點設(shè)置為加鎖狀態(tài),這樣下一個線程就獲得了自旋鎖。下面我們畫個圖看一下:

4cfb8c4a-16ae-11ed-ba43-dac502259ad0.png

4d22c60c-16ae-11ed-ba43-dac502259ad0.png

圖片演示了MCS自旋鎖基本的加鎖解鎖操作,但是有一個細節(jié)情況沒有考慮,這點在代碼里會進行分析。

下面我們用內(nèi)核版本4.1來講解。

4.1 定義與初始化

我們先來看一下MCS自旋鎖的定義:

linux-src/kernel/locking/mcs_spinlock.h

struct mcs_spinlock {  struct mcs_spinlock *next;  int locked; /* 1 if lock acquired */};

這個定義非常簡單,沒有復(fù)雜的嵌套定義。要注意的是MCS鎖本身是 struct mcs_spinlock *,是個指針,而結(jié)構(gòu)體struct mcs_spinlock本身并不是鎖,而是加鎖時的排隊節(jié)點,我們把它叫做鎖節(jié)點,這是MCS鎖與大部分鎖不同的地方,大部分鎖都只有一個鎖變量,不需要加鎖線程再去分配鎖節(jié)點,而MCS鎖需要加鎖線程去分配一個鎖節(jié)點。

MCS自旋鎖沒有特定的初始化,就是定義一個空指針而已。

struct mcs_spinlock * lock = NULL;

Lock為空指針代表鎖處于空閑狀態(tài)。

4.2 加鎖操作

下面我們來看一下MCS自旋鎖的加鎖操作:

linux-src/kernel/locking/mcs_spinlock.h

static inlinevoid mcs_spin_lock(struct mcs_spinlock **lock, struct mcs_spinlock *node){  struct mcs_spinlock *prev;  /* Init node */  node->locked = 0;  node->next   = NULL;
  prev = xchg(lock, node);  if (likely(prev == NULL)) {    return;  }  WRITE_ONCE(prev->next, node);  /* Wait until the lock holder passes the lock down. */  arch_mcs_spin_lock_contended(&node->locked);}

原子地交換鎖變量的原值和本線程鎖節(jié)點的地址值并返回鎖變量的原值保存到prev變量。如果prev的值是空指針,代表鎖變量之前是空閑狀態(tài),我們是第一個加鎖的,直接獲得了鎖,直接return。如果prev不為NULL,說明有人已經(jīng)獲得了鎖,我們只能等待,讓prev的next指針指向自己,然后在自己的locked上自旋。

4.3 解鎖操作

下面我們看一下解鎖操作:

linux-src/kernel/locking/mcs_spinlock.h

static inlinevoid mcs_spin_unlock(struct mcs_spinlock **lock, struct mcs_spinlock *node){  struct mcs_spinlock *next = READ_ONCE(node->next);
  if (likely(!next)) {    if (likely(cmpxchg(lock, node, NULL) == node))      return;    /* Wait until the next pointer is set */    while (!(next = READ_ONCE(node->next)))      cpu_relax_lowlatency();  }  /* Pass lock to next waiter. */  arch_mcs_spin_unlock_contended(&next->locked);}

先讀出自己的next指針,如果為空指針,說明我們是最后一個線程,可以直接返回了。但是在返回前要把鎖變量設(shè)為空指針,代表鎖現(xiàn)在是空閑狀態(tài)。但是這里并不是直接設(shè)置,而是使用原子交換CAS,只有當(dāng)鎖變量指向自己的時候才把鎖變量置為空,這么做是為了避免和加鎖操作發(fā)生沖突。如果操作成功,代表釋放鎖成功,直接return。如果操作失敗,說明有線程在同時執(zhí)行加鎖操作,它會把我們的next指針設(shè)置為指向它,然后在它的locked上自旋,所以我們要等我們的next被設(shè)置之后也就是不為空的時候,再把next->locked設(shè)置為1。如果一開始我們的next指針就不為空,那么直接把next->locked設(shè)置為1就行了。下一個線程發(fā)現(xiàn)自己的locked為1就會結(jié)束自旋,從而獲得了鎖。

五、隊列自旋鎖

MCS鎖有一個很大的問題就是它改變了自旋鎖的接口,這是一個很嚴重的問題,內(nèi)核里使用自旋鎖的地方很多,如果把自旋鎖都改為MCS自旋鎖,那將是非常麻煩的。同時MCS還有一個問題就是它的體積增大了,這也是一個很嚴重的問題。為了解決MCS自旋鎖的問題,內(nèi)核又開發(fā)了隊列自旋鎖。它結(jié)合了MCS鎖的優(yōu)點,但是又做了很大的改進,同時又優(yōu)化了鎖競爭比較少的場景。隊列自旋鎖對MCS自旋鎖的優(yōu)化原理是,一個系統(tǒng)最多同時有NR_CPU個自旋鎖在運行,所以沒必要每個加鎖線程都自己分配一個鎖節(jié)點,我們在系統(tǒng)全局預(yù)分配NR_CPU個鎖節(jié)點就可以了,哪個CPU上要執(zhí)行自旋鎖,就去用對應(yīng)的鎖節(jié)點就可以了。這是對于只有線程的情況,實際上還有軟中斷、硬中斷、NMI,它們后者都可以搶占前者,都能搶占線程,所以整個系統(tǒng)實際上總共需要NR_CPU * 4 個鎖節(jié)點就足夠了。隊列自旋鎖還對只有兩個線程去搶鎖的情況作了優(yōu)化,這種情況下不會使用MCS的排隊邏輯。

下面我們用一個比喻來說一下隊列自旋鎖的總體邏輯。我們把鎖的位置比作皇位,搶到皇位就是加鎖成功就可以進入臨界區(qū)了。第一個來搶鎖的人就是直接搶鎖成功,搶到皇位。第二個來搶鎖的人發(fā)現(xiàn)皇位已經(jīng)被搶了,退而求其次,搶占太子位,然后一直自旋皇位,一旦皇帝駕崩讓出皇位,自己就立馬搶占皇位。第三個來搶鎖的人發(fā)現(xiàn)皇位和太子位都被搶了,沒有辦法只能去搶太孫的位置,然后同時自旋太子位和皇位。當(dāng)皇位空缺的時候,太子會替補到皇位,此時太子位空缺,但是太孫并不會去搶占太子位,他還待在太孫位上,直到太子位和皇位同時空缺了,他才會一步到位,直接從太孫位上登基為皇帝。第四個人來了發(fā)現(xiàn)皇位、太子位、太孫位都被搶了,就只能占個皇孫位了,從第四個人開始包括后面來的每個人都是皇孫,所有皇孫依次排好隊等待進位成太孫。太孫其實也算是皇孫,太孫是第一皇孫,其它的都是普通皇孫。皇孫也在自旋,只不過皇孫是在自己家門口自旋,他一直在等待上一任太孫到自己家門口叫自己。太孫發(fā)現(xiàn)皇位和太子位同時空缺了之后就會去繼承皇帝位,同時去通知第二皇孫,太孫位空出來了,你可以來當(dāng)太孫了。然后第二皇孫就變成太孫了,變成太孫之后他也是去同時自旋太子位和皇位。當(dāng)他也登基稱帝之后他也會去通知后面的第二皇孫來進位太孫位。然后就一直繼續(xù)這樣的邏輯,后面來的每個人只要發(fā)現(xiàn)有太孫存在就只能去占個皇孫位來排隊,然后在自己家門口自旋。在這個過程中太子位是一直空缺的。除非最后一個太孫登基稱帝之后發(fā)現(xiàn)沒有皇孫了,此時就沒有人進位成太孫了,如果此時再來了人搶位子,而皇位還被占著,他才會去搶太子位。

前面說的邏輯比較復(fù)雜,我們再來總結(jié)一下,當(dāng)只有兩個人搶鎖時,一個占據(jù)皇帝位也就是搶鎖成功,一個人占據(jù)太子位,同時自旋皇位。也就是說同時搶鎖的人小于等于兩人時不會使用排隊機制。第三人來搶鎖的話就會啟動排隊機制,他排在隊列的第一位,是第一皇孫,也叫太孫,之后來的人都是普通皇孫,要依次排隊?;蕦O都是在自己家門口自旋自己,等待前太孫來通知自己進位為太孫。太孫的邏輯是最為復(fù)雜的,他要同時自旋太子位和皇位,只有當(dāng)太子位和皇位都空缺時,太孫才會一步到位登基稱帝,然后通知第二皇孫進位為太孫。解鎖的邏輯很簡單,只需要把皇位設(shè)為0就可以了,什么都不用管,因為太子、太孫他們自己會自旋皇位。

隊列自旋鎖的實現(xiàn)是把原先的鎖變量int分為三部分,一個locked字節(jié),對應(yīng)我們所說的皇位,一個pending字節(jié),對應(yīng)我們所說的太子位,一個tail雙字節(jié),它指向皇孫隊列的末尾,皇孫隊列的隊首是太孫。tail不是個指針,而是邏輯指針,它是通過編碼的方式指向隊尾皇孫的。每個皇孫都對應(yīng)一個鎖節(jié)點,系統(tǒng)預(yù)先分配了NR_CPU * 4個鎖節(jié)點,NR_CPU代表1個CPU 1個,為什么乘以4呢,因為1個CPU上最多可以同時嵌套4個執(zhí)行流,分別是線程、軟中斷、硬中斷、非屏蔽中斷。tail有16位,分兩部分編碼,其中2位用來編碼是哪個執(zhí)行流,14位用來編碼CPU index。CPU編碼時要加1,因為CPU index從0開始,而tail等于0有特殊含義,代表的是空指針,也就是沒有皇孫來競爭,所以要加上1做區(qū)分。當(dāng)一個線程(執(zhí)行流)來爭鎖時,如果太子位被搶了或者已經(jīng)有太孫了,自己就需要加入皇孫隊列,加入皇孫隊列的方法就是根據(jù)自己所在的CPU index 和自己的執(zhí)行流等級去拿一個鎖節(jié)點,把這個鎖節(jié)點加入到隊列中去,并自旋這個鎖節(jié)點。

下面我們畫圖來看一下。

4d485d90-16ae-11ed-ba43-dac502259ad0.png

4d6cbb0e-16ae-11ed-ba43-dac502259ad0.png

4d90f384-16ae-11ed-ba43-dac502259ad0.png

4db154bc-16ae-11ed-ba43-dac502259ad0.png

4dd89aa4-16ae-11ed-ba43-dac502259ad0.png

現(xiàn)在大家應(yīng)該對隊列自旋鎖的邏輯很熟悉了,下面我們以內(nèi)核版本5.15.28為例,來看一下隊列自旋的代碼實現(xiàn)。

5.1 定義與初始化

我們先來看一下隊列自旋鎖的定義:

linux-src/include/linux/spinlock_types.h

typedef struct spinlock {struct raw_spinlock rlock;} spinlock_t;

linux-src/include/linux/spinlock_types_raw.h

typedef struct raw_spinlock {arch_spinlock_t raw_lock;} raw_spinlock_t;

linux-src/include/asm-generic/qspinlock_types.h

typedef struct qspinlock {union {atomic_t val;
struct {u8locked;u8pending;};struct {u16locked_pending;u16tail;};};} arch_spinlock_t;

可以看出隊列自旋鎖的定義最終與原始自旋鎖和票號自旋鎖的大小是一樣的。隊列自旋鎖也使用了共用體的技巧,把一個4字節(jié)的int拆分成了1個字節(jié)的locked,一個字節(jié)的pending,兩個字節(jié)的tail。

下面我們來看一下初始化,先看靜態(tài)初始化:

linux-src/include/linux/spinlock_types.h

#define DEFINE_SPINLOCK(x)spinlock_t x = __SPIN_LOCK_UNLOCKED(x)#define __SPIN_LOCK_UNLOCKED(lockname) (spinlock_t) __SPIN_LOCK_INITIALIZER(lockname)#define __SPIN_LOCK_INITIALIZER(lockname) { { .rlock = ___SPIN_LOCK_INITIALIZER(lockname) } }#define ___SPIN_LOCK_INITIALIZER(lockname){.raw_lock = __ARCH_SPIN_LOCK_UNLOCKED,}

linux-src/include/asm-generic/qspinlock_types.h

#define__ARCH_SPIN_LOCK_UNLOCKED{ { .val = ATOMIC_INIT(0) } }

再看動態(tài)初始化

linux-src/include/linux/spinlock.h

# define spin_lock_init(_lock)do {*(_lock) = __SPIN_LOCK_UNLOCKED(_lock);} while (0)

無論靜態(tài)初始化還是動態(tài)初始化都是把鎖變量的整個int初始化為0。

5.2 加鎖操作

我們先來看一下鎖節(jié)點的定義和相關(guān)操作:

linux-src/kernel/locking/qspinlock.c

struct qnode {struct mcs_spinlock mcs;};static DEFINE_PER_CPU_ALIGNED(struct qnode, qnodes[MAX_NODES]);#define MAX_NODES4

可以看到鎖節(jié)點用的是MCS自旋鎖的鎖節(jié)點類型,然后定義了一個per CPU變量,每個CPU上有4個節(jié)點,代表4層執(zhí)行流,線程、軟中斷、硬中斷、屏蔽中斷。和MCS自旋鎖不同的是,MCS自旋鎖需要每個線程在申請鎖時自己提供鎖節(jié)點,而隊列自旋鎖是提前定義好的全局靜態(tài)變量,每個執(zhí)行流在申請鎖時根據(jù)自己所在的CPU index 和執(zhí)行流層級去使用對應(yīng)的鎖節(jié)點,加鎖成功后鎖節(jié)點就默認放回了。使用鎖節(jié)點時執(zhí)行個查詢操作就可以了,放回鎖節(jié)點什么也不用做,這是自旋鎖的特點所決定的。因為自旋鎖是不能休眠的,所以自旋鎖的臨界區(qū)是一口氣執(zhí)行完,不會切走讓其它線程也來申請自旋鎖,一個CPU上最左嵌套4層執(zhí)行流,所以整個系統(tǒng)最多能同時有NR_CPU * 4個自旋鎖申請。所以系統(tǒng)預(yù)定義NR_CPU * 4個鎖節(jié)點就足夠了,用的時候就直接用,用完啥也不用管。

下面我們來看一下鎖節(jié)點的編碼與查找:

linux-src/kernel/locking/qspinlock.c

/* * We must be able to distinguish between no-tail and the tail at 0:0, * therefore increment the cpu number by one. */static inline __pure u32 encode_tail(int cpu, int idx){u32tail;
tail  = (cpu + 1) << _Q_TAIL_CPU_OFFSET;tail |= idx << _Q_TAIL_IDX_OFFSET; /* assume < 4 */return tail;}
static inline __pure struct mcs_spinlock *decode_tail(u32 tail){int cpu = (tail >> _Q_TAIL_CPU_OFFSET) - 1;int idx = (tail &  _Q_TAIL_IDX_MASK) >> _Q_TAIL_IDX_OFFSET;
return per_cpu_ptr(&qnodes[idx].mcs, cpu);}
static inline __purestruct mcs_spinlock *grab_mcs_node(struct mcs_spinlock *base, int idx){return &((struct qnode *)base + idx)->mcs;}

可以看到知道了CPU index和執(zhí)行流層級就可以編碼出tail,知道了tail就可以解碼出CPU index和執(zhí)行流層級,就可以去全局變量qnodes中找到對應(yīng)的鎖節(jié)點。如果已經(jīng)知道了CPU index對應(yīng)的鎖節(jié)點base,再根據(jù)執(zhí)行流層級也可以找到對應(yīng)的鎖節(jié)點。

下面我們來看一下隊列自旋鎖的加鎖操作:

linux-src/include/linux/spinlock.h

static __always_inline void spin_lock(spinlock_t *lock){raw_spin_lock(&lock->rlock);}#define raw_spin_lock(lock)_raw_spin_lock(lock)

linux-src/kernel/locking/spinlock.c

void __lockfunc _raw_spin_lock(raw_spinlock_t *lock){__raw_spin_lock(lock);}

linux-src/include/linux/spinlock_api_smp.h

static inline void __raw_spin_lock(raw_spinlock_t *lock){preempt_disable();LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);}

linux-src/include/linux/lockdep.h

#define LOCK_CONTENDED(_lock, try, lock) lock(_lock)

linux-src/include/linux/spinlock.h

static inline void do_raw_spin_lock(raw_spinlock_t *lock) __acquires(lock){arch_spin_lock(&lock->raw_lock);}

linux-src/include/asm-generic/qspinlock.h

#define arch_spin_lock(l)queued_spin_lock(l)static __always_inline void queued_spin_lock(struct qspinlock *lock){int val = 0;
if (likely(atomic_try_cmpxchg_acquire(&lock->val, &val, _Q_LOCKED_VAL)))return;
queued_spin_lock_slowpath(lock, val);}

linux-src/include/asm-generic/qspinlock_types.h

#define _Q_LOCKED_VAL(1U << _Q_LOCKED_OFFSET)#define _Q_LOCKED_OFFSET0

linux-src/kernel/locking/qspinlock.c

void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val){  struct mcs_spinlock *prev, *next, *node;  u32 old, tail;  int idx;
  BUILD_BUG_ON(CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS));
  if (pv_enabled())    goto pv_queue;
  if (virt_spin_lock(lock))    return;
  /*   * Wait for in-progress pending->locked hand-overs with a bounded   * number of spins so that we guarantee forward progress.   *   * 0,1,0 -> 0,0,1   */  if (val == _Q_PENDING_VAL) {    int cnt = _Q_PENDING_LOOPS;    val = atomic_cond_read_relaxed(&lock->val,                 (VAL != _Q_PENDING_VAL) || !cnt--);  }
  /*   * If we observe any contention; queue.   */  if (val & ~_Q_LOCKED_MASK)    goto queue;
  /*   * trylock || pending   *   * 0,0,* -> 0,1,* -> 0,0,1 pending, trylock   */  val = queued_fetch_set_pending_acquire(lock);
  /*   * If we observe contention, there is a concurrent locker.   *   * Undo and queue; our setting of PENDING might have made the   * n,0,0 -> 0,0,0 transition fail and it will now be waiting   * on @next to become !NULL.   */  if (unlikely(val & ~_Q_LOCKED_MASK)) {
    /* Undo PENDING if we set it. */    if (!(val & _Q_PENDING_MASK))      clear_pending(lock);
    goto queue;  }
  /*   * We're pending, wait for the owner to go away.   *   * 0,1,1 -> 0,1,0   *   * this wait loop must be a load-acquire such that we match the   * store-release that clears the locked bit and create lock   * sequentiality; this is because not all   * clear_pending_set_locked() implementations imply full   * barriers.   */  if (val & _Q_LOCKED_MASK)    atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_MASK));
  /*   * take ownership and clear the pending bit.   *   * 0,1,0 -> 0,0,1   */  clear_pending_set_locked(lock);  lockevent_inc(lock_pending);  return;
  /*   * End of pending bit optimistic spinning and beginning of MCS   * queuing.   */queue:  lockevent_inc(lock_slowpath);pv_queue:  node = this_cpu_ptr(&qnodes[0].mcs);  idx = node->count++;  tail = encode_tail(smp_processor_id(), idx);
  /*   * 4 nodes are allocated based on the assumption that there will   * not be nested NMIs taking spinlocks. That may not be true in   * some architectures even though the chance of needing more than   * 4 nodes will still be extremely unlikely. When that happens,   * we fall back to spinning on the lock directly without using   * any MCS node. This is not the most elegant solution, but is   * simple enough.   */  if (unlikely(idx >= MAX_NODES)) {    lockevent_inc(lock_no_node);    while (!queued_spin_trylock(lock))      cpu_relax();    goto release;  }
  node = grab_mcs_node(node, idx);
  /*   * Keep counts of non-zero index values:   */  lockevent_cond_inc(lock_use_node2 + idx - 1, idx);
  /*   * Ensure that we increment the head node->count before initialising   * the actual node. If the compiler is kind enough to reorder these   * stores, then an IRQ could overwrite our assignments.   */  barrier();
  node->locked = 0;  node->next = NULL;  pv_init_node(node);
  /*   * We touched a (possibly) cold cacheline in the per-cpu queue node;   * attempt the trylock once more in the hope someone let go while we   * weren't watching.   */  if (queued_spin_trylock(lock))    goto release;
  /*   * Ensure that the initialisation of @node is complete before we   * publish the updated tail via xchg_tail() and potentially link   * @node into the waitqueue via WRITE_ONCE(prev->next, node) below.   */  smp_wmb();
  /*   * Publish the updated tail.   * We have already touched the queueing cacheline; don't bother with   * pending stuff.   *   * p,*,* -> n,*,*   */  old = xchg_tail(lock, tail);  next = NULL;
  /*   * if there was a previous node; link it and wait until reaching the   * head of the waitqueue.   */  if (old & _Q_TAIL_MASK) {    prev = decode_tail(old);
    /* Link @node into the waitqueue. */    WRITE_ONCE(prev->next, node);
    pv_wait_node(node, prev);    arch_mcs_spin_lock_contended(&node->locked);
    /*     * While waiting for the MCS lock, the next pointer may have     * been set by another lock waiter. We optimistically load     * the next pointer & prefetch the cacheline for writing     * to reduce latency in the upcoming MCS unlock operation.     */    next = READ_ONCE(node->next);    if (next)      prefetchw(next);  }
  /*   * we're at the head of the waitqueue, wait for the owner & pending to   * go away.   *   * *,x,y -> *,0,0   *   * this wait loop must use a load-acquire such that we match the   * store-release that clears the locked bit and create lock   * sequentiality; this is because the set_locked() function below   * does not imply a full barrier.   *   * The PV pv_wait_head_or_lock function, if active, will acquire   * the lock and return a non-zero value. So we have to skip the   * atomic_cond_read_acquire() call. As the next PV queue head hasn't   * been designated yet, there is no way for the locked value to become   * _Q_SLOW_VAL. So both the set_locked() and the   * atomic_cmpxchg_relaxed() calls will be safe.   *   * If PV isn't active, 0 will be returned instead.   *   */  if ((val = pv_wait_head_or_lock(lock, node)))    goto locked;
  val = atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_PENDING_MASK));
locked:  /*   * claim the lock:   *   * n,0,0 -> 0,0,1 : lock, uncontended   * *,*,0 -> *,*,1 : lock, contended   *   * If the queue head is the only one in the queue (lock value == tail)   * and nobody is pending, clear the tail code and grab the lock.   * Otherwise, we only need to grab the lock.   */
  /*   * In the PV case we might already have _Q_LOCKED_VAL set, because   * of lock stealing; therefore we must also allow:   *   * n,0,1 -> 0,0,1   *   * Note: at this point: (val & _Q_PENDING_MASK) == 0, because of the   *       above wait condition, therefore any concurrent setting of   *       PENDING will make the uncontended transition fail.   */  if ((val & _Q_TAIL_MASK) == tail) {    if (atomic_try_cmpxchg_relaxed(&lock->val, &val, _Q_LOCKED_VAL))      goto release; /* No contention */  }
  /*   * Either somebody is queued behind us or _Q_PENDING_VAL got set   * which will then detect the remaining tail and queue behind us   * ensuring we'll see a @next.   */  set_locked(lock);
  /*   * contended path; wait for next if not observed yet, release.   */  if (!next)    next = smp_cond_load_relaxed(&node->next, (VAL));
  arch_mcs_spin_unlock_contended(&next->locked);  pv_kick_node(lock, next);
release:  /*   * release the node   */  __this_cpu_dec(qnodes[0].mcs.count);}

加鎖的時候要首先看一下是不是鎖變量的整個int都是0,如果是的話,說明皇位、太子位、太孫位都是空的,鎖現(xiàn)在是空閑的,沒有任何人競爭,我們直接把鎖變量設(shè)為1(用的是原子操作),代表我們搶鎖成功,直接返回。如果整個鎖變量不為0,說明存在鎖競爭,我們要走慢速路徑。

在慢速路徑里,首先處理的是如果遇到太子正在登基,則自旋等待太子登基成功。然后查看太子位是否被占,如果被占,則goto queue,也就是進入皇孫排隊流程(這個后面再講)。如果太子位沒被占,則嘗試占領(lǐng)太子位。如果搶占太子失敗,說明有其它線程也在搶太子位,我們搶失敗了,則我們則goto queue,也就是進入皇孫排隊流程(這個后面再講)。如果搶占太子位成功,則自旋皇帝位,一直自旋到皇帝駕崩把鎖置為0,則我們結(jié)束自旋,原子地占領(lǐng)皇位釋放太子位,然后return。

接下來是皇孫排隊邏輯,每一個新來的皇孫都要排到隊尾。隊尾是用鎖變量中的tail來記錄的。我們要先生成自己的隊尾編碼tail,找到自己對應(yīng)的鎖節(jié)點。此時再嘗試一下加鎖操作,因為有可能現(xiàn)在太子太孫皇位都是空的,如果嘗試成功就結(jié)束流程,如果失敗則繼續(xù)往下走。然后原子地交換鎖變量的tail和自己的tail,這樣我們就成為新的隊尾了。然后我們再看old tail,分兩種情況,如果old tail是空,則說明我們是第一個皇孫,也就是太孫,走太孫邏輯,如果old tail不空,則說明我們是普通皇孫,走皇孫排隊邏輯。我們先說皇孫排隊邏輯?;蕦O排隊先解碼old tail,找到其對應(yīng)的鎖節(jié)點prev,然后讓prev的next指向自己,這樣我們就加入了排隊隊列。然后我們就在自己家里自旋,也就是自旋自己的node->locked。我們的自旋是在等待prev先成為太孫,然后當(dāng)他登基稱帝之后,他就會來解除我們的自旋,然后我們就成為了太孫。

下面我們講太孫的邏輯,太孫的來源有兩種,一種是上面說的old tail為空,則我們直接就是太孫,是第一位太孫。第二種來源是普通皇孫進位為太孫。不管哪種來源的太孫,操作都是一樣的。太孫首先自旋太子位和皇位,當(dāng)太子位和皇位同時空缺的時候才會結(jié)束自旋。結(jié)束自旋之后,先看看自己是不是唯一的皇孫,如果是的話則原子地加鎖。如果加鎖成功則結(jié)束流程,如果加鎖失敗則說明剛才發(fā)生了沖突,又有了新的皇孫加入。如果自己不是唯一的皇孫或者又有新的皇孫加入,則自己先搶占皇位,然后通知next皇孫結(jié)束自旋,next皇孫就會成為新的太孫,繼續(xù)執(zhí)行太孫的流程。

5.3 解鎖操作

下面我們看一下隊列自旋鎖的解鎖操作:

linux-src/include/linux/spinlock.h

static __always_inline void spin_unlock(spinlock_t *lock){  raw_spin_unlock(&lock->rlock);}#defineraw_spin_unlock(lock)_raw_spin_unlock(lock)

linux-src/kernel/locking/spinlock.c

void __lockfunc _raw_spin_unlock(raw_spinlock_t *lock){  __raw_spin_unlock(lock);}

linux-src/include/linux/spinlock_api_smp.h

static inline void __raw_spin_unlock(raw_spinlock_t *lock){do_raw_spin_unlock(lock);preempt_enable();}

linux-src/include/linux/spinlock.h

static inline void do_raw_spin_unlock(raw_spinlock_t *lock) __releases(lock){arch_spin_unlock(&lock->raw_lock);}

linux-src/include/asm-generic/qspinlock.h

#define arch_spin_unlock(l)queued_spin_unlock(l)

linux-src/include/asm-generic/qspinlock.h

static __always_inline void queued_spin_unlock(struct qspinlock *lock){smp_store_release(&lock->locked, 0);}

可以看到隊列自旋鎖的解鎖確實很簡單,只需要讓出皇位也就是把locked字節(jié)設(shè)為0就可以了。

六、自旋鎖的使用

前面幾節(jié)我們講了自旋鎖的發(fā)展歷史,以及每一代自旋鎖的實現(xiàn)原理?,F(xiàn)在我們來講一講自旋鎖的使用問題,包括自旋鎖的適用場景、自旋鎖與禁用偽并發(fā)的配合使用問題,還有spinlock_t、raw_spin_lock該如何選擇的問題。

6.1 自旋鎖的適用場景

內(nèi)核里有很多同步措施,我們什么時候應(yīng)該使用自旋鎖呢,使用自旋鎖應(yīng)該注意什么呢?首先自旋鎖適用那些臨界區(qū)比較小的地方,具體要多小呢,并沒有絕對的標準,我記的有的書上說要小于1000個指令或者100行代碼。其次臨界區(qū)內(nèi)不能休眠,也就是不能有阻塞操作,如果臨界區(qū)內(nèi)某些函數(shù)調(diào)用可能會阻塞,那就不能使用自旋鎖。使用自旋鎖要注意的點也是臨界區(qū)不能調(diào)用阻塞函數(shù)。但是很多時候并不太好判斷,有些函數(shù)明顯就是阻塞函數(shù),肯定不能調(diào)用。但是有些函數(shù)自己不是阻塞的,而它層層調(diào)用的函數(shù)中有阻塞的,這就不太好發(fā)現(xiàn)了。

線程是可調(diào)度的,所以線程可以用互斥鎖、信號量,也能用自旋鎖。但是中斷(包括硬中斷和軟中斷)是不可調(diào)度的,也就是說,是不能休眠的,所以只能使用自旋鎖。

6.2 自旋鎖與禁用偽并發(fā)的配合使用

內(nèi)核里有四種不同類型的執(zhí)行流,線程、軟中斷、硬中斷、NMI中斷,前者不能搶占后者,但是后者能搶占前者。自旋鎖能防止兩個CPU同時進入臨界區(qū),但是并不能防止本CPU的臨界區(qū)被高級的執(zhí)行流所搶占。所以當(dāng)兩個關(guān)聯(lián)臨界區(qū)在不同類型的執(zhí)行流的時候,只使用自旋鎖是不夠的,低級執(zhí)行流還得臨時禁止高級執(zhí)行流的搶占才行。由于NMI中斷是不可禁止的,而且NMI中斷發(fā)生的概率非常低,一般我們的代碼也不會與NMI中斷發(fā)生關(guān)聯(lián),所以NMI中斷就不考慮了?,F(xiàn)在只剩下線程、軟中斷、硬中斷三種情況了。組合下來有6種情況,我們依依說明。線程對線程,自旋鎖內(nèi)部已經(jīng)禁用了線程搶占,所以兩個線程之間的臨界區(qū)直接使用自旋鎖就可以了。線程對軟中斷,線程會被軟中斷搶占,所以線程中要自旋鎖加禁用軟中斷,而軟中斷不會被線程搶占,所以軟中斷中只使用自旋鎖就可以了。線程對硬中斷,線程會被硬中斷搶占,所以線程中要自旋鎖加禁用硬中斷,而硬中斷不會被線程搶占,所以硬中斷中只使用自旋鎖就可以了。軟中斷對軟中斷,軟中斷中發(fā)生硬中斷,硬中斷返回時發(fā)現(xiàn)正在軟中斷中,不會再去執(zhí)行軟中斷,只會排隊軟中斷,所以軟中斷對軟中斷只使用自旋鎖就可以了。軟中斷對硬中斷,由于硬中斷會搶占軟中斷,所以軟中斷中要禁用硬中斷,硬中斷中直接使用自旋鎖就可以了。硬中斷對硬中斷,現(xiàn)在內(nèi)核里已經(jīng)禁止中斷嵌套了,所以只使用自旋鎖就可以了。

我們下面來看一下它們的接口與實現(xiàn)。

自旋鎖并禁用軟中斷,軟中斷在這里就是下半部。

void spin_lock_bh(spinlock_t *lock)

void spin_unlock_bh(spinlock_t *lock)

實現(xiàn)如下,只分析加鎖部分,解鎖部分就不再分析了。

linux-src/include/linux/spinlock.h

static __always_inline void spin_lock_bh(spinlock_t *lock){raw_spin_lock_bh(&lock->rlock);}#define raw_spin_lock_bh(lock)_raw_spin_lock_bh(lock)

inux-src/kernel/locking/spinlock.c

void __lockfunc _raw_spin_lock_bh(raw_spinlock_t *lock){__raw_spin_lock_bh(lock);}

linux-src/include/linux/spinlock_api_smp.h

static inline void __raw_spin_lock_bh(raw_spinlock_t *lock){__local_bh_disable_ip(_RET_IP_, SOFTIRQ_LOCK_OFFSET);LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);}

可以看到自旋鎖的實現(xiàn)部分是一樣的,只是加了一個禁用軟中斷的調(diào)用,禁用軟中斷本身也會禁用線程搶占,所以這里沒有再去禁用搶占。

自旋鎖并禁用硬中斷,禁用軟中斷本身是帶計數(shù)功能的,可以嵌套調(diào)用,但是禁用硬中斷本身是沒有計數(shù)的,不能嵌套調(diào)用,所以內(nèi)核提供了兩個版本,irq版lock會禁用中斷,unlock會開啟中斷,irqsave版lock會禁用中斷并保存現(xiàn)在的中斷狀態(tài),unlock會恢復(fù)之前保存的中斷狀態(tài)。

void spin_lock_irq(spinlock_t *lock)

void spin_unlock_irq(spinlock_t *lock)

void spin_lock_irqsave(lock, flags)

void spin_unlock_irqsave(lock, flags)

實現(xiàn)如下,只分析加鎖部分,解鎖部分就不再分析了。

spin_lock_irq

linux-src/include/linux/spinlock.h

static __always_inline void spin_lock_irq(spinlock_t *lock){raw_spin_lock_irq(&lock->rlock);}#define raw_spin_lock_irq(lock)_raw_spin_lock_irq(lock)

linux-src/kernel/locking/spinlock.c

void __lockfunc _raw_spin_lock_irq(raw_spinlock_t *lock){__raw_spin_lock_irq(lock);}

linux-src/include/linux/spinlock_api_smp.h

static inline void __raw_spin_lock_irq(raw_spinlock_t *lock){local_irq_disable();preempt_disable();LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);}

spin_lock_irqsave

linux-src/include/linux/spinlock.h

#define spin_lock_irqsave(lock, flags)do {raw_spin_lock_irqsave(spinlock_check(lock), flags);} while (0)#define raw_spin_lock_irqsave(lock, flags)do {typecheck(unsigned long, flags);flags = _raw_spin_lock_irqsave(lock);} while (0)

linux-src/kernel/locking/spinlock.c

unsigned long __lockfunc _raw_spin_lock_irqsave(raw_spinlock_t *lock){return __raw_spin_lock_irqsave(lock);}

linux-src/include/linux/spinlock_api_smp.h

static inline unsigned long __raw_spin_lock_irqsave(raw_spinlock_t *lock){unsigned long flags;
local_irq_save(flags);preempt_disable();do_raw_spin_lock_flags(lock, &flags);return flags;}

linux-src/include/linux/spinlock.h

static inline voiddo_raw_spin_lock_flags(raw_spinlock_t *lock, unsigned long *flags) __acquires(lock){arch_spin_lock_flags(&lock->raw_lock, *flags);}#define arch_spin_lock_flags(lock, flags)arch_spin_lock(lock)

可以看到自旋鎖的實現(xiàn)部分是一樣的,只是加了一個禁用硬中斷和禁止搶占的調(diào)用。

6.3 raw_spin_lock的使用問題

可能很多人在看到內(nèi)核代碼時會感到有些奇怪,為啥有些地方用的是spinlock_t,有些地方用的卻是raw_spinlock_t?raw_spinlock_t不是spinlock_t的實現(xiàn)細節(jié)嗎,我們不是應(yīng)該只使用接口性的東西,而不要使用實現(xiàn)性的東西嗎?再仔細看spinlock_t和raw_spinlock_t的實質(zhì)邏輯,好像也沒啥區(qū)別?。恳卮疬@個問題,我們就要先從一件事情談起,RTLinux。什么是RTLinux,什么是實時性?實時性是指一個系統(tǒng)對外部事件響應(yīng)的及時性。很多嵌入式系統(tǒng)的OS都是實時OS,它們可以快速地對外部事件進行響應(yīng)。這倒不是因為它們有多厲害,而是因為嵌入式系統(tǒng)都比較簡單,它們面臨的環(huán)境比較簡單,要做的事情也比較簡單,所以能做到及時性。而Linux是一個通用操作系統(tǒng)內(nèi)核,通用這個詞就代表Linux要面臨很多情況,處理很多問題,所以就很難做到及時性。做到及時性最根本的一點就是要及時處理中斷,因為中斷代表的就是外部事件。但是在Linux內(nèi)核里,有很多需要同步的地方都會禁用中斷,這就導(dǎo)致中斷不能及時響應(yīng)。Linux在處理中斷的時候也會禁用中斷,Linux在這方面已經(jīng)想了很多辦法來解決,比如盡可能地縮小中斷處理程序,把事情盡量都放到軟中斷或者線程里面去做。當(dāng)很多中斷處理的事情都被放到線程中去執(zhí)行了,我們又面臨著另外一個問題,如何盡快地讓這些線程去搶占CPU立馬獲得執(zhí)行。當(dāng)一個非常不緊急的線程正好執(zhí)行到自旋鎖的臨界區(qū)時,我們的非常著急的中斷處理線程想獲得CPU卻沒有辦法,因為自旋鎖的臨界區(qū)不能休眠也就是說不可搶占,我們只能干等著。因此把自旋鎖變得可休眠就成為了提高Linux的實時性的重要方法。為此Ingo Molnar等人開發(fā)了一個項目RTLinux,專門來提高Linux的實時性。其中一個很重要的方法就是把自旋鎖替換為可休眠鎖。但是有些臨界區(qū)是確實不能休眠的,那怎么辦呢?這些臨界區(qū)就用raw_spinlock_t,raw_spinlock_t還保持原來的自旋語義,不會休眠。到目前為止(內(nèi)核版本5.15.28),RTLinux還沒有合入標準內(nèi)核,所以目前的標準內(nèi)核里raw_spinlock_t和spinlock_t效果是一樣的。但是大家在內(nèi)核編程的時候還是要盡量使用spinlock_t,除非你的臨界區(qū)真的不能休眠,才去使用raw_spinlock_t。

審核編輯:湯梓紅


聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報投訴
  • 內(nèi)核
    +關(guān)注

    關(guān)注

    3

    文章

    1351

    瀏覽量

    40160
  • Linux
    +關(guān)注

    關(guān)注

    87

    文章

    11182

    瀏覽量

    208533
  • 自旋鎖
    +關(guān)注

    關(guān)注

    0

    文章

    11

    瀏覽量

    1571

原文標題:深入理解Linux自旋鎖

文章出處:【微信號:LinuxDev,微信公眾號:Linux閱碼場】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。

收藏 人收藏

    評論

    相關(guān)推薦

    深度解析自旋自旋的實現(xiàn)方案

    入場券自旋和MCS自旋都屬于排隊自旋(queued spinlock),進程按照申請
    發(fā)表于 09-19 11:39 ?4330次閱讀
    深度解析<b class='flag-5'>自旋</b><b class='flag-5'>鎖</b>及<b class='flag-5'>自旋</b><b class='flag-5'>鎖</b>的實現(xiàn)方案

    Linux驅(qū)動開發(fā)筆記-自旋和信號量

    :如果在寫代碼時,有以上的競態(tài)發(fā)生,一定要注意進行互斥訪問7.解決競態(tài)的方法:中斷屏蔽原子操作自旋信號量如何使用以上4個機制呢?1.中斷屏蔽解決哪些情況的競態(tài):進程和進程的搶占中斷和進程中斷和中斷
    發(fā)表于 08-30 18:08

    信號量、互斥、自旋

    信號量、互斥、自旋http://bbs.edu118.com/forum.php?mod=viewthread&tid=488&fromuid=231(出處: 信盈達IT技術(shù)社
    發(fā)表于 08-29 09:48

    Linux內(nèi)核同步機制的自旋原理是什么?

    自旋是專為防止多處理器并發(fā)而引入的一種,它在內(nèi)核中大量應(yīng)用于中斷處理等部分(對于單處理器來說,防止中斷處理中的并發(fā)可簡單采用關(guān)閉中斷的方式,即在標志寄存器中關(guān)閉/打開中斷標志位,不需要自旋
    發(fā)表于 03-31 08:06

    怎么在atmega128中實現(xiàn)自旋

    什么是自旋?有哪些缺陷?怎么在atmega128中實現(xiàn)自旋?
    發(fā)表于 01-24 06:54

    Linux內(nèi)核同步機制的自旋原理

    一、自旋 自旋是專為防止多處理器并發(fā)而引入的一種,它在內(nèi)核中大量應(yīng)用于中斷處理等部分(對于單處理器來說,防止中斷處理中的并發(fā)可簡單采
    發(fā)表于 06-08 14:50 ?1296次閱讀

    AWorks軟件設(shè)計,郵箱、消息隊列和自旋使用方法

    本文介紹了郵箱、消息隊列和自旋使用方法。信號量只能用于任務(wù)間的同步,不能傳遞更多的信息,為此,AWorks提供了郵箱和消息隊列服務(wù),它們的主要區(qū)別在于支持的消息長度不同,在郵箱中,每條消息的長度固定為4字節(jié),而在消息隊列中…
    的頭像 發(fā)表于 06-13 09:13 ?1.2w次閱讀
    AWorks軟件設(shè)計,郵箱、消息隊列和<b class='flag-5'>自旋</b><b class='flag-5'>鎖</b><b class='flag-5'>使用方法</b>

    信號量和自旋

    。??? Linux 使用的同步機制可以說從2.0到2.6以來不斷發(fā)展完善。從最初的原子操作,到后來的信號量,從大內(nèi)核到今天的自旋。這些同步機制的
    發(fā)表于 04-02 14:43 ?791次閱讀

    Linux 自旋spinlock

    ,所以同一時刻只能有一個任務(wù)獲取到。 內(nèi)核當(dāng)發(fā)生訪問資源沖突的時候,通常有兩種處理方式: 一個是原地等待 一個是掛起當(dāng)前進程,調(diào)度其他進程執(zhí)行(睡眠) 自旋 Spinlock 是內(nèi)核中提供的一種比較常見的
    的頭像 發(fā)表于 09-11 14:36 ?2028次閱讀

    使用Linux自旋實現(xiàn)互斥點燈

    自旋最多只能被一個可執(zhí)行線程持有。如果一個線程試圖獲得一個已經(jīng)被持有的自旋,那么該線程將循環(huán)等待,然后不斷的判斷是否能夠被成功獲取,直
    的頭像 發(fā)表于 04-13 15:09 ?733次閱讀
    使用Linux<b class='flag-5'>自旋</b><b class='flag-5'>鎖</b>實現(xiàn)互斥點燈

    自旋和互斥的區(qū)別有哪些

    自旋 自旋與互斥很相似,在訪問共享資源之前對自旋
    的頭像 發(fā)表于 07-21 11:19 ?9255次閱讀

    如何用C++11實現(xiàn)自旋

    下面我會分析一下自旋,并代碼實現(xiàn)自旋和互斥的性能對比,以及利用C++11實現(xiàn)自旋
    的頭像 發(fā)表于 11-11 16:48 ?1326次閱讀
    如何用C++11實現(xiàn)<b class='flag-5'>自旋</b><b class='flag-5'>鎖</b>

    互斥自旋的區(qū)別 自旋臨界區(qū)可以被中斷嗎?

    互斥自旋的區(qū)別 自旋臨界區(qū)可以被中斷嗎? 互斥
    的頭像 發(fā)表于 11-22 17:41 ?724次閱讀

    自旋和互斥的使用場景是什么

    自旋和互斥是兩種常見的同步機制,它們在多線程編程中被廣泛使用。在本文中,我們將介紹自旋和互斥
    的頭像 發(fā)表于 07-10 10:05 ?750次閱讀

    互斥自旋的實現(xiàn)原理

    互斥自旋是操作系統(tǒng)中常用的同步機制,用于控制對共享資源的訪問,以避免多個線程或進程同時訪問同一資源,從而引發(fā)數(shù)據(jù)不一致或競爭條件等問題。 互斥(Mutex) 互斥
    的頭像 發(fā)表于 07-10 10:07 ?351次閱讀