Go 学习笔记(25)— 并发(04)[有缓冲/无缓冲通道、WaitGroup 协程同步、select 多路监听通道、close 关闭通道、channel 传参或作为结构体成员]
1. 無緩沖的通道
無緩沖的通道(unbuffered channel)是指在接收前沒有能力保存任何值的通道。
這種類型的通道要求發送 goroutine 和接收 goroutine 同時準備好,才能完成發送和接收操作。
如果兩個 goroutine 沒有同時準備好,通道會導致先執行發送或接收操作的 goroutine 阻塞等待。這種對通道進行發送和接收的交互行為本身就是同步的。其中任意一個操作都無法離開另一個操作單獨存在。
下圖展示兩個 goroutine 如何利用無緩沖的通道來共享一個值。
- 兩個
goroutine都到達通道,但兩者都沒有開始執行發送或者接收。 - 左側的
goroutine將它的手伸進了通道,這模擬了向通道發送數據的行為。這時,這個goroutine會在通道中被鎖住,直到交換完成。 - 右側的
goroutine將它的手放入通道,這模擬了從通道里接收數據。這個goroutine一樣也會在通道中被鎖住,直到交換完成。 - 進行交換。
- 右側的
goroutine拿到數據。 - 兩個
goroutine都將它們的手從通道里拿出來,這模擬了被鎖住的goroutine得到釋放。兩個goroutine現在都可以去做別的事情了。
圖:使用無緩沖的通道在 goroutine 之間同步, 摘自 《Go 語言實戰》
package mainimport ("runtime"
)func main() {c := make(chan struct{})go func(i chan struct{}) {sum := 0for i := 0; i <= 10000; i++ {sum += i}println("sum is :", sum)// 寫通道c <- struct{}{}}(c)//NumGoroutine 可以返回當前程序的 goroutine 數目println("NumGoroutine=", runtime.NumGoroutine())// 讀取通道 c, 通過通道進行同步等待<-c
}
無緩沖通道需要發送和接收配對。否則會被阻塞,直到另一方準備好后被喚醒。
package mainimport "fmt"func main() {data := make(chan int) // 數據交換隊列exit := make(chan bool) // 退出通知go func() {for d := range data { // 從隊列迭代接收數據,直到 close 。fmt.Println(d)}fmt.Println("recv over.")exit <- true // 發出退出通知。}()data <- 1 // 發送數據。data <- 2data <- 3close(data) // 關閉隊列。fmt.Println("send over.")<-exit // 等待退出通知。
}
輸出:
1
2
3
send over.
recv over.
2. 有緩沖的通道
在無緩沖通道的基礎上,為通道增加一個有限大小的存儲空間形成帶緩沖通道。帶緩沖通道在發送時無需等待接收方接收即可完成發送過程,并且不會發生阻塞,只有當存儲空間滿時才會發生阻塞。同理,如果緩沖通道中有數據,接收時將不會發生阻塞,直到通道中沒有數據可讀時,通道將會再度阻塞。
有緩沖的通道(buffered channel)是一種在被接收前能存儲一個或者多個值的通道。
這種類型的通道并不強制要求 goroutine 之間必須同時完成發送和接收。通道會阻塞發送和接收動作的條件也會不同。
只有在通道中沒有要接收的值時,接收動作才會阻塞。只有在通道沒有可用緩沖區容納被發送的值時,發送動作才會阻塞。
這導致有緩沖的通道和無緩沖的通道之間的一個很大的不同:
-
無緩沖的通道保證進行發送和接收的
goroutine會在同一時間進行數據交換; -
有緩沖的通道沒有這種保證。
在下圖中可以看到兩個 goroutine 分別向有緩沖的通道里增加一個值和從有緩沖的通道里移除一個值。
- 右側的
goroutine正在從通道接收一個值。 - 右側的
goroutine獨立完成了接收值的動作,而左側的goroutine正在發送一個新值到通道里。 - 左側的
goroutine還在向通道發送新值,而右側的goroutine正在從通道接收另外一個值。這個步驟里的兩個操作既不是同步的,也不會互相阻塞。 - 所有的發送和接收都完成,而通道里還有幾個值,也有一些空間可以存更多的值。
圖:使用有緩沖的通道在 goroutine 之間同步數據,摘自 《Go 語言實戰》
有緩沖通道例子
package mainimport ("runtime"
)func main() {c := make(chan struct{})ci := make(chan int, 100)go func(i chan struct{}, j chan int) {for i := 0; i <= 10; i++ {ci <- i}close(ci)// 寫通道c <- struct{}{}}(c, ci)//NumGoroutine 可以返回當前程序的 goroutine 數目println("NumGoroutine=", runtime.NumGoroutine())// 讀取通道 c, 通過通道進行同步等待<-c// 此時ci 通道已經關閉,匿名函數啟動的goroutine 已經退出println("NumGoroutine=", runtime.NumGoroutine())// 但是通道 ci 還可以繼續讀取for v := range ci {println("v is :", v)}
}
異步方式也就是有緩沖的通道通過判斷緩沖區來決定是否阻塞。
- 緩沖區已滿,發送被阻塞;
- 緩沖區為空,接收被阻塞;
通常情況下,異步 channel 可減少排隊阻塞,具備更高的效率。但應該考慮使用指針規避大對象拷貝,將多個元素打包,減小緩沖區大小等。
為什么Go語言對通道要限制長度而不提供無限長度的通道?
我們知道通道( channel )是在兩個 goroutine 間通信的橋梁。使用 goroutine 的代碼必然有一方提供數據,一方消費數據。當提供數據一方的數據供給速度大于消費方的數據處理速度時,如果通道不限制長度,那么內存將不斷膨脹直到應用崩潰。
因此,限制通道的長度有利于約束數據提供方的供給速度,供給數據量必須在消費方處理量+通道長度的范圍內,才能正常地處理數據。
package mainimport "fmt"func main() {data := make(chan int, 3) // 緩沖區可以存儲 3 個元素exit := make(chan bool)data <- 1 // 在緩沖區未滿前,不會阻塞。data <- 2data <- 3go func() {for d := range data { // 在緩沖區未空前,不會阻塞。fmt.Println(d)}exit <- true}()data <- 4 // 如果緩沖區已滿,阻塞。data <- 5close(data)<-exit
}
緩沖區是內部屬性,并非類型構成要素。
var a, b chan int = make(chan int), make(chan int, 3)
除用 range 外,還可用 ok-idiom 模式判斷 channel 是否關閉。
for {if d, ok := <-data; ok {fmt.Println(d)} else {break}
}
向 closed channel 發送數據引發 panic 錯誤,接收立即返回零值。而 nil channel,無論收發都會被阻塞。
// 這個示例程序展示如何使用
// 有緩沖的通道和固定數目的
// goroutine來處理一堆工作
package mainimport ("fmt""math/rand""sync""time"
)const (numberGoroutines = 4 // 要使用的goroutine的數量taskLoad = 10 // 要處理的工作的數量
)// wg用來等待程序完成
var wg sync.WaitGroup// init初始化包,Go語言運行時會在其他代碼執行之前
// 優先執行這個函數
func init() {// 初始化隨機數種子rand.Seed(time.Now().Unix())
}// main是所有Go程序的入口
func main() {// 創建一個有緩沖的通道來管理工作tasks := make(chan string, taskLoad)// 啟動goroutine來處理工作wg.Add(numberGoroutines)for gr := 1; gr <= numberGoroutines; gr++ {go worker(tasks, gr)}// 增加一組要完成的工作for post := 1; post <= taskLoad; post++ {tasks <- fmt.Sprintf("Task : %d", post)}// 當所有工作都處理完時關閉通道// 以便所有goroutine退出close(tasks)// 等待所有工作完成wg.Wait()
}// worker作為goroutine啟動來處理
// 從有緩沖的通道傳入的工作
func worker(tasks chan string, worker int) {// 通知函數已經返回defer wg.Done()for {// 等待分配工作task, ok := <-tasksif !ok {// 這意味著通道已經空了,并且已被關閉fmt.Printf("Worker: %d : Shutting Down\n", worker)return}// 顯示我們開始工作了fmt.Printf("Worker: %d : Started %s\n", worker, task)// 隨機等一段時間來模擬工作sleep := rand.Int63n(100)time.Sleep(time.Duration(sleep) * time.Millisecond)// 顯示我們完成了工作fmt.Printf("Worker: %d : Completed %s\n", worker, task)}
}
輸出:
Worker: 4 : Started Task : 2
Worker: 1 : Started Task : 1
Worker: 2 : Started Task : 3
Worker: 3 : Started Task : 4
Worker: 4 : Completed Task : 2
Worker: 4 : Started Task : 5
Worker: 2 : Completed Task : 3
Worker: 2 : Started Task : 6
Worker: 3 : Completed Task : 4
Worker: 3 : Started Task : 7
Worker: 3 : Completed Task : 7
Worker: 3 : Started Task : 8
Worker: 4 : Completed Task : 5
Worker: 4 : Started Task : 9
Worker: 1 : Completed Task : 1
Worker: 1 : Started Task : 10
Worker: 3 : Completed Task : 8
Worker: 3 : Shutting Down
Worker: 2 : Completed Task : 6
Worker: 2 : Shutting Down
Worker: 1 : Completed Task : 10
Worker: 1 : Shutting Down
Worker: 4 : Completed Task : 9
Worker: 4 : Shutting Down
在main函數的第31行,創建了一個string類型的有緩沖的通道,緩沖的容量是10。在第34行,給WaitGroup賦值為4,代表創建了4個工作 goroutine。之后在第35行到第37行,創建了4個 goroutine,并傳入用來接收工作的通道。在第40行到第42行,將10個字符串發送到通道,模擬發給 goroutine 的工作。一旦最后一個字符串發送到通道,通道就會在第46行關閉,而main函數就會在第49行等待所有工作的完成。
第46行中關閉通道的代碼非常重要。當通道關閉后,goroutine 依舊可以從通道接收數據,但是不能再向通道里發送數據。能夠從已經關閉的通道接收數據這一點非常重要,因為這允許通道關閉后依舊能取出其中緩沖的全部值,而不會有數據丟失。從一個已經關閉且沒有數據的通道里獲取數據,總會立刻返回,并返回一個通道類型的零值。如果在獲取通道時還加入了可選的標志,就能得到通道的狀態信息。
在worker函數里,可以在第58行看到一個無限的for循環。在這個循環里,會處理所有接收到的工作。每個 goroutine 都會在第60行阻塞,等待從通道里接收新的工作。一旦接收到返回,就會檢查ok標志,看通道是否已經清空而且關閉。如果ok的值是false,goroutine 就會終止,并調用第56行通過defer聲明的Done函數,通知main有工作結束。
如果ok標志是true,表示接收到的值是有效的。第71行和第72行模擬了處理的工作。一旦工作完成,goroutine 會再次阻塞在第60行從通道獲取數據的語句。一旦通道被關閉,這個從通道獲取數據的語句會立刻返回,goroutine 也會終止自己。
3. WaitGroup
Go 語言中除了可以使用通道(channel)和互斥鎖進行兩個并發程序間的同步外,還可以使用等待組進行多個任務的同步,等待組可以保證在并發環境中完成指定數量的任務。sync.WaitGroup 類型(以下簡稱WaitGroup類型)是開箱即用的,也是并發安全的。
一般情況下,我會用這個方法來記錄需要等待的 goroutine 的數量。相對應的,這個類型的 Done 方法,用于對其所屬值中計數器的值進行減一操作。我們可以在需要等待的 goroutine 中,通過 defer 語句調用它。而此類型的 Wait 方法的功能是,阻塞當前的 goroutine ,直到其所屬值中的計數器歸零。如果在該方法被調用的時候,那個計數器的值就是 0,那么它將不會做任何事情。
goroutine 和 chan , 一個用于并發,另一個用于通信。沒有緩沖的通道具有同步的功能,除此之外, sync 包也提供了多個 goroutine 同步的機制,主要是通過 WaitGroup 實現的。
WaitGroup 值中計數器的值不能小于 0,是因為這樣會引發一個 panic 。
如果在一個此類值的 Wait 方法被執行期間,跨越了兩個計數周期,那么就會引發一個 panic 。縱觀上述會引發 panic 的后兩種情況,我們可以總結出這樣一條關于 WaitGroup 值的使用禁忌,
即:不要把增加其計數器值的操作和調用其Wait方法的代碼,放在不同的 goroutine 中執行。換句話說,要杜絕對同一個WaitGroup 值的兩種操作的并發執行。
我們最好用 先統一 Add ,再并發 Done ,最后 Wait 這種標準方式,來使用 WaitGroup 值。 尤其不要在調用 Wait 方法的同時,并發地通過調用 Add 方法去增加其計數器的值,因為這也有可能引發 panic 。
在 sync.WaitGroup (等待組)類型中,每個 sync.WaitGroup 值在內部維護著一個計數,此計數的初始默認值為零。
主要的接口如下:
type WaitGroup struct {// contains filtered or unexported fields
}// 添加等待信號
func (wg *WaitGroup) Add(delta int)// 釋放等待信號
func (wg *WaitGroup) Done()// 等待
func (wg *WaitGroup) Wait()
WaitGroup用來等待多個goroutine完成;main goroutine調用Add設置需要等待goroutine的數目;- 每一個
goroutine結束時調用Done(); Wait()被main用來等待所有的goroutine完成;
sync.WaitGroup 內部擁有一個計數器,計數器的值可以通過方法調用實現計數器的增加和減少。當我們添加了 N 個并發任務進行工作時,就將等待組的計數器值增加 N。每個任務完成時,這個值減 1。同時,在另外一個 goroutine 中等待這個等待組的計數器值為 0 時,表示所有任務已經完成。
代碼示例:
package mainimport ("net/http""sync"
)var wg sync.WaitGroup
var urls = []string{"http://www.baidu.com","http://www.sina.com","http://www.qq.com",
}func main() {for _, url := range urls {// 為每一個 url 啟動一個 goroutine,同時給 wg 加 1wg.Add(1)go func(url string) {// 當前go routine 結束后給wg 計數減1, wg.Done() 等價于wg.Add(-1)// defer wg.Add(-1)defer wg.Done()// 發送 http get 請求并打印 http 返回碼resp, err := http.Get(url)if err == nil {println(resp.Status)}}(url)}// 等待所有請求結束wg.Wait()
}
或者不使用匿名函數,如下
package mainimport ("net/http""sync"
)var wg sync.WaitGroup
var urls = []string{"http://www.baidu.com","http://www.sina.com","http://www.qq.com",
}func getURLStatus(url string) {// 當前go routine 結束后給wg 計數減1, wg.Done() 等價于wg.Add(-1)// defer wg.Add(-1)defer wg.Done()// 發送 http get 請求并打印 http 返回碼resp, err := http.Get(url)if err == nil {println(resp.Status)}
}func main() {for _, url := range urls {// 為每一個 url 啟動一個 goroutine,同時給 wg 加 1wg.Add(1)go getURLStatus(url)}// 等待所有請求結束wg.Wait()
}
4. select
select 是類 UNIX 系統提供的一個多路復用系統API, Go 語言借用多路復用的概念,提供了 select 關鍵字,用于多路監昕多個通道。
select 語句只能與通道聯用,它一般由若干個分支組成。每次執行這種語句的時候,一般只有一個分支中的代碼會被運行。
當監聽的通道沒有狀態是可讀或可寫的, select 是阻塞的;只要監聽的通道中有一個狀態是可讀或可寫的,則 select 就不會阻塞,而是進入處理就緒通道的分支流程。如果監聽的通道有多個可讀或可寫的狀態, 則 select 隨機選取一個處理。
select 的特點是只要其中有一個 case 已經完成,程序就會繼續往下執行,而不會考慮其他 case 的情況。
select 的用法與 switch 語言非常類似,由 select 開始一個新的選擇塊,每個選擇條件由 case 語句來描述。與 switch 語句相比, select 有比較多的限制,其中最大的一條限制就是每個 case 語句里必須是一個 IO 操作。結構如下:
select{case 操作1:響應操作1case 操作2:響應操作2…default:沒有操作情況
}
操作1、操作2:包含通道收發語句,請參考下表。
| 操 作 | 語句示例 |
|---|---|
| 接收任意數據 | case <- ch; |
| 接收變量 | case d := <- ch; |
| 發送數據 | case ch <- 100; |
在 Go 中,支持通信操作的類型只有 chan ,所以 select 中的 case 條件只能是對 chan 類型變量的讀寫操作。由于 chan 類型變量的讀寫操作可能會引起阻塞,為了在使用 select 選擇器時不陷入阻塞狀態,可以在 select 代碼塊中添加 default 關鍵字,當 case 條件全部都不滿足時,默認進入 default 分支,執行完 default 分支的代碼后,退出 select 選擇器。
package mainimport ("fmt""time"
)func main() {fmt.Println("開始時間:", time.Now().Format("2006-01-02 15:04:05"))select {case <-time.After(time.Second * 2):fmt.Println("2秒后的時間:", time.Now().Format("2006-01-02 15:04:05"))}
}
輸出結果:
開始時間: 2021-02-08 14-14-42
2秒后的時間: 2021-02-08 14:14:44
time.After函數返回一個通道類型的變量,然后在case中從這個通道中讀取信息,如果沒有協程給這個通道發送信息,那么case將會一直阻塞。在調用After函數時,傳入了一個時長作為參數,意思是從調用After函數算起,到設定的時長后,有協程將會向這個通道發送一條消息。當通道收到消息后,這個case條件滿足,這個case分支下的代碼將會被執
如果沒有任意一條 select 語句可以執行(即所有的通道都被阻塞),那么有如下兩種可能的情況:
-
如果給出了
default語句,那么就會執行default語句,同時程序的執行會從select語句后的語句中恢復; -
如果沒有
default語句,那么select語句將被阻塞,直到至少有一個通信可以進行下去;
package mainfunc main() {ch := make(chan int, 1)go func(chan int) { // go func(ch chan int) { 這樣寫也可以? 為啥?for {select {case ch <- 0:case ch <- 1:}}}(ch)for i := 0; i < 10; i++ {println(<-ch)}}
輸出結果:
1
1
0
1
0
1
0
1
0
1
如果需要同時處理多個 channel ,可使用 select 語句。它隨機選擇一個可用 channel 做收發操作,或執行 default case 。
package mainimport ("fmt""os"
)func main() {a, b := make(chan int, 3), make(chan int)go func() {v, ok, s := 0, false, ""for {select { // 隨機選擇可?用 channel,接收數據。case v, ok = <-a:s = "a"case v, ok = <-b:s = "b"}if ok {fmt.Println(s, v)} else {os.Exit(0)}}}()for i := 0; i < 5; i++ {select { // 隨機選擇可用 channel,發送數據。case a <- i:case b <- i:}}close(a)select {} // 沒有可用 channel,阻塞 main goroutine。
}
輸出:
a 0
a 1
a 2
a 3
b 4
在循環中使用 select default case 需要小心,避免形成洪水。
- 如果在
select語句中發現某個通道已關閉,那么應該怎樣屏蔽掉它所在的分支?
在 case 中通過第二個參數判斷 chan 是否關閉,如果關閉則通過 make(chan type) 來對關閉的 chan 置 nil ,當再次執行到 select 時,因為 chan 時 nil 會進入阻塞而不會進入候選分支。
package mainimport ("fmt""time"
)func main() {i := 0c := make(chan int, 2)c <- 1c <- 2close(c)for {select {case value, ok := <-c:if !ok {c = make(chan int)fmt.Println("ch is closed")} else {fmt.Printf("value is %#v\n", value)}default:time.Sleep(1e9) // 等待1秒鐘fmt.Println("default, ", i)i = i + 1if i > 3 {return}}}
}
輸出結果:
value is 1
value is 2
ch is closed
default, 0
default, 1
default, 2
default, 3
- 在
select語句與for語句聯用時,怎樣直接退出外層的for語句?
- 可以使用
goto加lable跳轉到for外面; - 可以設置一個額外的標記位,當
chan關閉時,設置flag=true,在for的最后判斷flag決定是否break;
5. 用 channel 實現信號量 (semaphore)
package mainimport ("fmt""sync"
)func main() {wg := sync.WaitGroup{}wg.Add(3)sem := make(chan int, 1)for i := 0; i < 3; i++ {go func(id int) {defer wg.Done()sem <- 1 // 向 sem 發送數據,阻塞或者成功。for x := 0; x < 3; x++ {fmt.Println(id, x)}<-sem // 接收數據,使得其他阻塞 goroutine 可以發送數據。}(i)}wg.Wait()
}
輸出:
2 0
2 1
2 2
0 0
0 1
0 2
1 0
1 1
1 2
6. 用 closed channel 發出退出通知
close 函數聲明如下:
func close(c chan<- Type)
內置的 close 函數,只能用于 chan 類型變量。使用 close 函數關閉通道后,這個通道不允許被寫入新的信息,但是關閉操作不會清除通道中已有的內容,不影響通道被讀取。示例代碼如下:
package main
import ("fmt""time"
)
func write(ch chan int) {for i := 0; i < 10; i++ {ch <- i * 10time.Sleep(time.Second * 1)}close(ch)
}
func read(ch chan int) {for {if val, ok := <-ch; ok {fmt.Println("從通道中讀取值:", val)} else {// 通道被關閉fmt.Println("通道已關閉,退出讀取程序")break}}
}
func main() {var ch = make(chan int, 10)go write(ch)read(ch)
}
上邊的通道讀取操作是:
val,ok := <-ch
當通道被關閉后:
- 如果從通道中讀取到信息,則
ok值為true,val是一個有效值; - 如果從通道中沒有讀取到信息,則
ok值為false,此時的val是臟數據,切勿將ok為false時的val值拿去使用,此時的val值是chan指定數據類型的默認值。
如果通道沒有被關閉,當從通道中沒有讀取到信息時,讀取操作將會產生程序阻塞。所以使用 close 函數的目的是關閉不會再寫入數據的通道,告訴通道讀取方,所有數據發送完畢。
package mainimport ("sync""time"
)func main() {var wg sync.WaitGroupquit := make(chan bool)for i := 0; i < 2; i++ {wg.Add(1)go func(id int) {defer wg.Done()task := func() {println(id, time.Now().Nanosecond())time.Sleep(time.Second)}for {select {case <-quit: // closed channel 不會阻塞,因此可用作退出通知。returndefault: // 執行正常任務。task()}}}(i)}time.Sleep(time.Second * 5) // 讓測試 goroutine 運行一會。close(quit) // 發出退出通知。wg.Wait()
}
7. channel 傳參或者作為結構成員
channel 是第一類對象,可傳參 (內部實現為指針) 或者作為結構成員。
package mainimport "fmt"type Request struct {data []intret chan int
}func NewRequest(data ...int) *Request {return &Request{data, make(chan int, 1)}
}
func Process(req *Request) {x := 0for _, i := range req.data {x += i}req.ret <- x
}
func main() {req := NewRequest(10, 20, 30)Process(req)fmt.Println(<-req.ret)
}
8. 并發總結
- 并發是指
goroutine運行的時候是相互獨立的。 - 使用關鍵字
go創建goroutine來運行函數。 goroutine在邏輯處理器上執行,而邏輯處理器具有獨立的系統線程和運行隊列。- 競爭狀態是指兩個或者多個
goroutine試圖訪問同一個資源。 - 原子函數和互斥鎖提供了一種防止出現競爭狀態的辦法。
- 通道提供了一種在兩個
goroutine之間共享數據的簡單方法。 - 無緩沖的通道保證同時交換數據,而有緩沖的通道不做這種保證。
總結
以上是生活随笔為你收集整理的Go 学习笔记(25)— 并发(04)[有缓冲/无缓冲通道、WaitGroup 协程同步、select 多路监听通道、close 关闭通道、channel 传参或作为结构体成员]的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 乌镇二月份穿什么衣服合适
- 下一篇: 一件正宗的羊绒衫大概多少钱