日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
面試官:哥們,Go語言的讀寫鎖了解多少?

讀寫鎖簡介

為什么要有讀鎖

有些朋友可能會有疑惑,為什么要有讀鎖,讀操作又不會修改數(shù)據(jù),多線程同時讀取相同的資源就是安全的,為什么還要加一個讀鎖呢?

舉個例子說明,在Golang中變量的賦值不是并發(fā)安全的,比如對一個int型變量執(zhí)行count++操作,在并發(fā)下執(zhí)行就會出現(xiàn)預(yù)期之外的結(jié)果,因為count++操作分為三部分:讀取count的值、將count的值加1,然后再將結(jié)果賦值給count,這不是一個原子性操作,未加鎖時在多個線程同時對該變量執(zhí)行count++操作會造成數(shù)據(jù)不一致,通過加上寫鎖可以解決這個問題,但是在讀取的時候我們不加讀鎖會怎么樣呢?寫個例子來看一下,只加寫鎖,不加讀鎖:

package main

import "sync"

const maxValue = 3

type test struct {
rw sync.RWMutex
index int
}

func (t *test) Get() int {
return t.index
}

func (t *test)Set() {
t.rw.Lock()
t.index++
if t.index >= maxValue{
t.index =0
}
t.rw.Unlock()
}

func main() {
t := test{}
sw := sync.WaitGroup{}
for i:=0; i < 100000; i++{
sw.Add(2)
go func() {
t.Set()
sw.Done()
}()
go func() {
val := t.Get()
if val >= maxValue{
print("get value error| value=", val, "\n")
}
sw.Done()
}()
}
sw.Wait()
}

運行結(jié)果:

get value error| value=3
get value error| value=3
get value error| value=3
get value error| value=3
get value error| value=3
.....

每次運行結(jié)果都是不固定的,因為我們沒有加讀鎖,如果允許同時讀和寫,讀取到的數(shù)據(jù)有可能就是中間狀態(tài),所以我們可以總結(jié)出來讀鎖是很有必要的,讀鎖可以防止讀到寫中間的值。

讀寫鎖的插隊策略

多個讀操作同時進行時也是線程安全的,一個線程獲取讀鎖后,另外一個線程同樣可以獲取讀鎖,因為讀鎖是共享的,如果一直都有線程加讀鎖,后面再有線程加寫鎖就會一直獲取不到鎖造成阻塞,這時就需要一些策略來保證鎖的公平性,避免出現(xiàn)鎖饑餓,那么Go語言中讀寫鎖采用的是什么插隊策略來避免饑餓問題呢?

這里我們用一個例子來說明一下Go語言的插隊策略:

假設(shè)現(xiàn)在有5個goroutine分別是G1、G2、G3、G4、G5,現(xiàn)在G1、G2獲取讀鎖成功,還沒釋放讀鎖,G3要執(zhí)行寫操作,獲取寫鎖失敗就會阻塞等待,當(dāng)前阻塞寫鎖的讀鎖goroutine數(shù)量為2:

后續(xù)G4進來想要獲取讀鎖,這時她就會判斷如果當(dāng)前有寫鎖的goroutine正在阻塞等待,為了避免寫鎖饑餓,那這個G4也會進入阻塞等待,后續(xù)G5進來想要獲取寫鎖,因為G3在占用互斥鎖,所以G5會進入自旋/休眠 阻塞等待;

現(xiàn)在G1、G2釋放了讀鎖,當(dāng)釋放讀鎖是判斷如果阻塞寫鎖goroutine的讀鎖goroutine數(shù)量為0了并且有寫鎖等待就會喚醒正在阻塞等待的寫鎖G3,G3得到了喚醒:

G3處理完寫操作后會釋放寫鎖,這一步會同時喚醒等待的讀鎖/寫鎖的goroutine,至于G4、G5誰能先獲取鎖就看誰比較快了,就像搶媳婦一樣,先下手的先得呀。

讀寫鎖的實現(xiàn)

接下來我們就深入源碼分析一下,先看一下RWMutex結(jié)構(gòu)都有啥:

type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount int32 // number of pending readers
readerWait int32 // number of departing readers
}
  • w:復(fù)用互斥鎖提供的能力;
  • writerSem:寫操作goroutine阻塞等待信號量,當(dāng)阻塞寫操作的讀操作goroutine釋放讀鎖時,通過該信號量通知阻塞的寫操作的goroutine;
  • readerSem:讀操作goroutine阻塞等待信號量,當(dāng)寫操作goroutine釋放寫鎖時,通過該信號量通知阻塞的讀操作的goroutine;
  • redaerCount:當(dāng)前正在執(zhí)行的讀操作goroutine數(shù)量;
  • readerWait:當(dāng)寫操作被阻塞時等待的讀操作goroutine個數(shù)。

讀鎖

讀鎖的對應(yīng)方法如下:

func (rw *RWMutex) RLock() {
// 原子操作readerCount 只要值不是負數(shù)就表示獲取讀鎖成功
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// 有一個正在等待的寫鎖,為了避免饑餓后面進來的讀鎖進行阻塞等待
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
}

精簡了競態(tài)檢測的方法,讀鎖方法就只有兩行代碼了,邏輯如下:

使用原子操作更新readerCount,將readercount值加1,只要原子操作后值不為負數(shù)就表示加讀鎖成功,如果值為負數(shù)表示已經(jīng)有寫鎖獲取互斥鎖成功,寫鎖goroutine正在等待或運行,所以為了避免饑餓后面進來的讀鎖要進行阻塞等待,調(diào)用runtime_SemacquireMutex阻塞等待。

非阻塞加讀鎖

Go語言在1.18中引入了非阻塞加讀鎖的方法:

func (rw *RWMutex) TryRLock() bool {
for {
// 讀取readerCount值能知道當(dāng)前是否有寫鎖在阻塞等待,如果值為負數(shù),那么后面的讀鎖就會被阻塞住
c := atomic.LoadInt32(&rw.readerCount)
if c < 0 {
if race.Enabled {
race.Enable()
}
return false
}
// 嘗試獲取讀鎖,for循環(huán)不斷嘗試
if atomic.CompareAndSwapInt32(&rw.readerCount, c, c+1) {
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
return true
}
}
}

因為讀鎖是共享的,在沒有寫鎖阻塞等待時多個線程可以同時獲取,所以原子性操作可能會失敗,這里采用for循環(huán)來增加嘗試次數(shù),很是巧妙。

釋放讀鎖

釋放讀鎖代碼主要分為兩部分,第一部分:

func (rw *RWMutex) RUnlock() {
// 將readerCount的值減1,如果值等于等于0直接退出即可;否則進入rUnlockSlow處理
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
}

我們都知道readerCount的值代表當(dāng)前正在執(zhí)行的讀操作goroutine數(shù)量,執(zhí)行遞減操作后的值大于等于0表示當(dāng)前沒有異常場景或?qū)戞i阻塞等待,所以直接退出即可,否則需要處理這兩個邏輯:

rUnlockSlow邏輯如下:

func (rw *RWMutex) rUnlockSlow(r int32) {
// r+1等于0表示沒有加讀鎖就釋放讀鎖,異常場景要拋出異常
// r+1 == -rwmutexMaxReaders 也表示沒有加讀鎖就是釋放讀鎖
// 因為寫鎖加鎖成功后會將readerCout的值減去rwmutexMaxReaders
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// 如果有寫鎖正在等待讀鎖時會更新readerWait的值,所以一步遞減rw.readerWait值
// 如果readerWait在原子操作后的值等于0了說明當(dāng)前阻塞寫鎖的讀鎖都已經(jīng)釋放了,需要喚醒等待的寫鎖
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}

解讀一下這段代碼:

  • r+1等于0說明當(dāng)前goroutine沒有加讀鎖就進行釋放讀鎖操作,屬于非法操作;
  • r+1 == -rwmutexMaxReaders 說明寫鎖加鎖成功了會將readerCount的減去rwmutexMaxReaders變成負數(shù),如果此前沒有加讀鎖,那么直接釋放讀鎖就會造成這個等式成立,也屬于沒有加讀鎖就進行釋放讀鎖操作,屬于非法操作;
  • readerWait代表寫操作被阻塞時讀操作的goroutine數(shù)量,如果有寫鎖正在等待時就會更新readerWait的值,讀鎖釋放鎖時需要readerWait進行遞減,如果遞減后等于0說明當(dāng)前阻塞寫鎖的讀鎖都已經(jīng)釋放了,需要喚醒等待的寫鎖。(看下文寫鎖的代碼就呼應(yīng)上了)。

寫鎖

寫鎖對應(yīng)的方法如下:

const rwmutexMaxReaders = 1 << 30
func (rw *RWMutex) Lock() {
// First, resolve competition with other writers.
// 寫鎖也就是互斥鎖,復(fù)用互斥鎖的能力來解決與其他寫鎖的競爭
// 如果寫鎖已經(jīng)被獲取了,其他goroutine在獲取寫鎖時會進入自旋或者休眠
rw.w.Lock()
// 將readerCount設(shè)置為負值,告訴讀鎖現(xiàn)在有一個正在等待運行的寫鎖(獲取互斥鎖成功)
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// 獲取互斥鎖成功并不代表goroutine獲取寫鎖成功,我們默認最大有2^30的讀操作數(shù)目,減去這個最大數(shù)目
// 后仍然不為0則表示前面還有讀鎖,需要等待讀鎖釋放并更新寫操作被阻塞時等待的讀操作goroutine個數(shù);
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
}

代碼量不是很大,但是理解起來還有一點復(fù)雜,我嘗試用文字來解析一下,主要分為兩部分:

  • 獲取互斥鎖,寫鎖也就是互斥鎖,這里我們復(fù)用互斥鎖mutex的加鎖能力,當(dāng)互斥鎖加鎖成功后,其他寫鎖goroutine再次嘗試獲取鎖時就會進入自旋休眠等待;
  • 判斷獲取寫鎖是否成功,這里有一個變量rwmutexMaxReaders = 1 << 30表示最大支持2^30個并發(fā)讀,互斥鎖加鎖成功后,假設(shè)2^30個讀操作都已經(jīng)釋放了讀鎖,通過原子操作將readerCount設(shè)置為負數(shù)在加上2^30,如果此時r仍然不為0說面還有讀操作正在進行,則寫鎖需要等待,同時通過原子操作更新readerWait字段,也就是更新寫操作被阻塞時等待的讀操作goroutine個數(shù);readerWait在上文的讀鎖釋放鎖時會進行判斷,進行遞減,當(dāng)前readerWait遞減到0時就會喚醒寫鎖。

非阻塞加寫鎖

Go語言在1.18中引入了非阻塞加鎖的方法:

func (rw *RWMutex) TryLock() bool {
// 先判斷獲取互斥鎖是否成功,沒有成功則直接返回false
if !rw.w.TryLock() {
if race.Enabled {
race.Enable()
}
return false
}
// 互斥鎖獲取成功了,接下來就判斷是否是否有讀鎖正在阻塞該寫鎖,如果沒有直接更新readerCount為
// 負數(shù)獲取寫鎖成功;
if !atomic.CompareAndSwapInt32(&rw.readerCount, 0, -rwmutexMaxReaders) {
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
return false
}
return true
}

釋放寫鎖

func (rw *RWMutex) Unlock() {
// Announce to readers there is no active writer.
// 將readerCount的恢復(fù)為正數(shù),也就是解除對讀鎖的互斥
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// 如果后面還有讀操作的goroutine則需要喚醒他們
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// 釋放互斥鎖,寫操作的goroutine和讀操作的goroutine同時競爭
rw.w.Unlock()
}

釋放寫鎖的邏輯比較簡單,釋放寫鎖會將會面的讀操作和寫操作的goroutine都喚醒,然后他們在進行競爭。

總結(jié)

因為我們上文已經(jīng)分享了互斥鎖的實現(xiàn)方式,再來看讀寫鎖就輕松許多了,文末我們再來總結(jié)一下讀寫鎖:

  • 讀寫鎖提供四種操作:讀上鎖,讀解鎖,寫上鎖,寫解鎖;加鎖規(guī)則是讀讀共享,寫寫互斥,讀寫互斥,寫讀互斥;
  • 讀寫鎖中的讀鎖是一定要存在的,其目的是也是為了規(guī)避原子性問題,只有寫鎖沒有讀鎖的情況下會導(dǎo)致我們讀取到中間值;
  • Go語言的讀寫鎖在設(shè)計上也避免了寫鎖饑餓的問題,通過字段readerCount、readerWait進行控制,當(dāng)寫鎖的goroutine被阻塞時,后面進來想要獲取讀鎖的goroutine也都會被阻塞住,當(dāng)寫鎖釋放時,會將后面的讀操作goroutine、寫操作的goroutine都喚醒,剩下的交給他們競爭吧;
  • 讀鎖獲取鎖流程:

鎖空閑時,讀鎖可以立馬被獲取

如果當(dāng)前有寫鎖正在阻塞,那么想要獲取讀鎖的goroutine就會被休眠

  • 釋放讀鎖流程:

當(dāng)前沒有異常場景或?qū)戞i阻塞等待出現(xiàn)的話,則直接釋放讀鎖成功

若沒有加讀鎖就釋放讀鎖則拋出異常;

寫鎖被讀鎖阻塞等待的場景下,會將readerWait的值進行遞減,readerWait表示阻塞寫操作goroutine的讀操作goroutine數(shù)量,當(dāng)readerWait減到0時則可以喚醒被阻塞寫操作的goroutine了;

  • 寫鎖獲取鎖流程

寫鎖復(fù)用了mutex互斥鎖的能力,首先嘗試獲取互斥鎖,獲取互斥鎖失敗就會進入自旋/休眠;

獲取互斥鎖成功并不代表寫鎖加鎖成功,此時如果還有占用讀鎖的goroutine,那么就會阻塞住,否則就會加寫鎖成功

  • 釋放寫鎖流程

釋放寫鎖會將負值的readerCount變成正值,解除對讀鎖的互斥

喚醒當(dāng)前阻塞住的所有讀鎖

釋放互斥鎖

讀寫鎖的代碼量不多,因為其復(fù)用了互斥鎖的設(shè)計,針對讀寫鎖的功能多做了一些工作,理解起來比互斥鎖要容易很多,你學(xué)會了嗎?


網(wǎng)站欄目:面試官:哥們,Go語言的讀寫鎖了解多少?
本文鏈接:http://www.5511xx.com/article/dphicdh.html