Go Atomic
這里填寫標題
- 1. Go Atomic
- 1.1. 讀取
- 1.2. 賦值
- 1.3. 加法
- 1.4. 減法
- 1.4.1. 減法封裝函數
- 1.5. 比較并交換
- 1.6. 交換
- 1.7. 擴展知識
- 1.8. 實戰場景: 環形隊列
- 1.9. 參考
1. Go Atomic
我們已經知道, 原子操作即是進行過程中不能被中斷的操作。也就是說, 針對某個值的原子操作在被進行的過程當中, CPU 絕不會再去進行其它的針對該值的操作。為了實現這樣的嚴謹性, 原子操由 CPU 提供芯片級別的支持, 所以絕對有效, 即使在擁有多 CPU 核心, 或者多 CPU 的計算機系統中, 原子操作的保證也是不可撼動的。這使得原子操作可以完全地消除競態條件, 并能夠絕對地保證并發安全性, 它的執行速度要比其他的同步工具快得多, 通常會高出好幾個數量級。
不過它的缺點也很明顯, 正因為原子操作不能被中斷, 所以它需要足夠簡單, 并且要求快速。你可以想象一下, 如果原子操作遲遲不能完成, 而它又不會被中斷, 那么將會給計算機執行指令的效率帶來多么大的影響, 所以操作系統層面只對針對二進制位或整數的原子操作提供了支持。
因此, 我們可以結合實際情況, 來判斷是否可以將鎖替換成原子操作。
1.1. 讀取
atomic 包中提供了如下以 Load 為前綴的增減操作:
func LoadInt32(addr *int32) (val int32) func LoadInt64(addr *int64) (val int64) func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer) func LoadUint32(addr *uint32) (val uint32) func LoadUint64(addr *uint64) (val uint64) func LoadUintptr(addr *uintptr) (val uintptr)載入操作能夠保證原子的讀變量的值, 當讀取的時候, 任何其他 CPU 操作都無法對該變量進行讀寫, 其實現機制受到底層硬件的支持。
假設我已經保證了對一個變量的寫操作都是原子操作, 比如: 加或減、存儲、交換等等, 那我對它進行讀操作的時候, 還有必要使用原子操作嗎?
答案是很有必要, 你可以對照一下讀寫鎖, 為什么在讀寫鎖保護下的寫操作和讀操作之間是互斥的? 這是為了防止讀操作讀到沒有被修改完的值, 如果寫操作還沒有進行完, 讀操作就來讀了, 那么就只能讀到僅修改了一部分的值, 這顯然破壞了值的完整性。因此一旦決定要對一個共享資源進行保護, 那就要做到完全的保護, 不完全的保護基本上與不保護沒有什么區別。
1.2. 賦值
atomic 包中提供了如下以 Store 為前綴的存儲操作:
func StoreInt32(addr *int32, val int32) func StoreInt64(addr *int64, val int64) func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer) func StoreUint32(addr *uint32, val uint32) func StoreUint64(addr *uint64, val uint64) func StoreUintptr(addr *uintptr, val uintptr)此類操作確保了寫變量的原子性, 避免其他操作讀到了修改變量過程中的臟數據。
然后對于存儲, 需要掌握 2 條規則:
- 我們不能把 nil 作為參數值傳入原子值的 Store 方法, 否則就會引發一個 panic。這里要注意, 如果有一個接口類型的變量, 它的動態值是 nil, 但動態類型卻不是 nil, 那么它的值就不等于 nil, 這樣一個變量的值是可以被存入原子值, 這塊知識可以在接口這一章中查看。
- 我們向原子值存儲的第一個值, 決定了它今后能且只能存儲哪一個類型的值。例如, 我第一次向一個原子值存儲了一個 string 類型的值, 那我在后面就只能用該原子值來存儲字符串了。如果我又想用它存儲結構體, 那么在調用它的 Store 方法的時候就會引發一個 panic, 這個 panic 會告訴我, 這次存儲的值的類型與之前的不一致。
1.3. 加法
atomic 包中提供了如下以 Add 為前綴的增減操作:
func AddInt32(addr *int32, delta int32) (new int32) func AddInt64(addr *int64, delta int64) (new int64) func AddUint32(addr *uint32, delta uint32) (new uint32) func AddUint64(addr *uint64, delta uint64) (new uint64) func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)需要注意的是, 第一個參數必須是指針類型的值, 通過指針變量可以獲取被操作數在內存中的地址, 從而施加特殊的 CPU 指令, 確保同一時間只有一個 goroutine 能夠進行操作, 看個簡單的示例:
func Foo() {var opts int64 = 0for i := 0; i < 50; i++ {// 注意第一個參數必須是地址atomic.AddInt64(&opts, 3) //加操作//atomic.AddInt64(&opts, -1) 減操作time.Sleep(time.Millisecond)}time.Sleep(time.Second)fmt.Println("opts: ", atomic.LoadInt64(&opts)) }1.4. 減法
用于原子加法操作的函數可以做原子減法嗎? 比如 atomic.AddInt32 函數可以用于減小那個被操作的整數值嗎? atomic.AddInt32 函數的第二個參數代表差量, 它的類型 int32 是有符號的, 如果我們想做原子減法, 那么把這個差量設置為負整數就可以了, 對于 atomic.AddInt64 函數來說也是類似的。不過如果想用 atomic.AddUint32 和 atomic.AddUint64 函數做原子減法, 因為它們的第二個參數的類型 uint32 和 uint64 都是無符號的, 就不能同 AddInt32 進行相同處理, 但是可以依據下面這個表達式來給定 atomic.AddUint32 函數的第二個參數值:
^uint32(-N-1))其中的 N 代表由負整數表示的差量, 我們先要把差量的絕對值減去 1, 然后再把得到的這個無類型的整數常量, 轉換為 uint32 類型的值, 最后在該值之上做按位異或操作, 就可以獲得最終的參數值。簡單來說, 此表達式的結果值的補碼, 與使用前一種方法得到的值的補碼相同, 所以這兩種方式是等價的。
1.4.1. 減法封裝函數
// AtomicMinusUint64 returns the new changed value, the origin value will be changed. func AtomicMinusUint64(addr *uint64, minus int64) uint64 {if minus < 0 {minus = -minus // Even though subtraction, the value should be positive.}// https://pkg.go.dev/sync/atomic#AddUint64// To subtract a signed positive constant value c from x, do AddUint64(&x, ^uint64(c-1)).// In particular, to decrement x, do AddUint64(&x, ^uint64(0)).return atomic.AddUint64(addr, ^uint64(minus-1)) }Test case:
// go test -v -timeout 30s -run ^TestAtomicMinusUint64$ gitlab.xxx.com/xx func TestAtomicMinusUint64(t *testing.T) {cases := []struct {name stringval uint64minus int64out uint64}{{name: "positive",val: 20,minus: 9,out: 11,},{name: "negative",val: 20,minus: -9,out: 11,},}for _, tc := range cases {t.Run(tc.name, func(t *testing.T) {out := AtomicMinusUint64(&tc.val, tc.minus)assert.Equal(t, tc.out, out)})} }1.5. 比較并交換
該操作簡稱 CAS(Compare And Swap), 第一個參數的值應該是指向被操作值的指針值, 該值的類型即為* int32, 后兩個參數的類型都是 int32 類型, 它們的值應該分別代表被操作值的舊值和新值, 函數在被調用之后會先判斷參數 addr 指向的被操作值與參數 old 的值是否相等。僅當此判斷得到肯定的結果之后, 該函數才會用參數 new 代表的新值替換掉原先的舊值。否則, 后面的替換操作就會被忽略。
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool) func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool) func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool) func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool) func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool) func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)當有大量的 goroutine 對變量進行讀寫操作時, 可能導致 CAS 操作無法成功, 這時可以利用 for 循環多次嘗試:
var value int64func atomicAddOp(tmp int64) {for {oldValue := valueif atomic.CompareAndSwapInt64(&value, oldValue, oldValue+tmp) {return}} }比較并交換操作與交換操作相比有什么不同, 優勢在哪里呢? 比較并交換操作即 CAS 操作, 是有條件的交換操作, 只有在條件滿足的情況下才會進行值的交換。CAS 操作并不是單一的操作, 而是一種操作組合, 這與其他的原子操作都不同。正因為如此, 它的用途要更廣泛一些, 例如我們將它與 for 語句聯用就可以實現一種簡易的自旋鎖(spinlock):
for {if atomic.CompareAndSwapInt32(&num2, 10, 0) {fmt.Println("The second number has gone to zero.")break}time.Sleep(time.Millisecond * 500)}1.6. 交換
atomic 包中提供了如下以 Swap 為前綴的交換操作:
func SwapInt32(addr *int32, new int32) (old int32) func SwapInt64(addr *int64, new int64) (old int64) func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer) func SwapUint32(addr *uint32, new uint32) (old uint32) func SwapUint64(addr *uint64, new uint64) (old uint64) func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)相對于 CAS, 明顯此類操作更為暴力直接, 并不管變量的舊值是否被改變, 直接賦予新值然后返回背替換的值。
1.7. 擴展知識
對于原子操作, 還有幾條具體的使用建議:
除了上述使用建議之外, 我還要再特別強調一點: 盡量不要向原子值中存儲引用類型的值。因為這很容易造成安全漏洞。請看下面的代碼:
var box6 atomic.Valuev6 := []int{1, 2, 3} box6.Store(v6)v6[1] = 4 // 注意, 此處的操作不是并發安全的!我把一個[]int類型的切片值 v6 存入了原子值 box6, 由于切片類型屬于引用類型, 我在外面改動這個切片值, 就等于修改了 box6 中存儲的那個值, 這相當于繞過了原子值而進行了非并發安全的操作, 那么應該怎樣修補這個漏洞呢? 可以這樣做:
store := func(v []int) {replica := make([]int, len(v))copy(replica, v)box6.Store(replica)}store(v6)v6[2] = 5 // 此處的操作是安全的。我先為切片值 v6 創建了一個完全的副本, 這個副本涉及的數據已經與原值毫不相干, 然后再把這個副本存入 box6, 因此無論我再對 v6 的值做怎樣的修改, 都不會破壞 box6 提供的安全保護。
1.8. 實戰場景: 環形隊列
// 環形隊列 type RingBuffer struct {err errorcount int32size int32head int32tail int32buf []unsafe.Pointer }// Get 方法從 buf 中取出對象 func (r *RingBuffer) Get() interface{} {// 在高并發開始的時候, 隊列容易空, 直接判斷空性能最優if atomic.LoadInt32(&r.count) <= 0 {return nil}// 當扣減數量后沒有超, 就從隊列里取出對象if atomic.AddInt32(&r.count, -1) >= 0 {idx := (atomic.AddInt32(&r.head, 1) - 1) % r.sizeif obj := atomic.LoadPointer(&r.buf[idx]); obj != unsafe.Pointer(nil) {o := *(*interface{})(obj)atomic.StorePointer(&r.buf[idx], nil)return o}} else {// 當減數量超了, 再加回去atomic.AddInt32(&r.count, 1)}return nil }// Put 方法將對象放回到 buf 中。如果 buf 滿了, 返回 false func (r *RingBuffer) Put(obj interface{}) bool {// 在高并發結束的時候, 隊列容易滿, 直接判滿性能最優if atomic.LoadInt32(&r.count) >= r.size {return false}// 當增加數量后沒有超, 就將對象放到隊列里if atomic.AddInt32(&r.count, 1) <= r.size {idx := (atomic.AddInt32(&r.tail, 1) - 1) % r.sizeatomic.StorePointer(&r.buf[idx], unsafe.Pointer(&obj))return true}// 當加的數量超了, 再減回去atomic.AddInt32(&r.count, -1)return false }1.9. 參考
總結
- 上一篇: python画椭圆形_Python易学就
- 下一篇: html5体感游戏开发,使用HTML5开