Golang并发——并发技术Goroutine和channel的使用、定时器、生产者消费者、条件变量、select
Goroutine:
goroutine是Go并行設計的核心。goroutine說到底其實就是協程,它比線程更小,十幾個goroutine可能體現在底層就是五六個線程,Go語言內部幫你實現了這些goroutine之間的內存共享。執行goroutine只需極少的棧內存(大概是4~5KB),當然會根據相應的數據伸縮。也正因為如此,可同時運行成千上萬個并發任務。goroutine比thread更易用、更高效、更輕便。一般情況下,一個普通計算機跑幾十個線程就有點負載過大了,但是同樣的機器卻可以輕松地讓成百上千個goroutine進行資源競爭。
特點:
- 有獨立的棧空間
- 共享程序堆內存
- 調度由用于控制
- 協程是輕量級的線程
Goroutine的創建:
只需在函數調?語句前添加 go 關鍵字,就可創建并發執?單元。開發?員無需了解任何執?細節,調度器會自動將其安排到合適的系統線程上執行。在并發編程中,我們通常想將一個過程切分成幾塊,然后讓每個goroutine各自負責一塊工作,當一個程序啟動時,主函數在一個單獨的goroutine中運行,我們叫它main goroutine。新的goroutine會用go語句來創建。而go語言的并發設計,讓我們很輕松就可以達成這一目的。
Goroutine格式:
go 函數名( 參數列表 )演示:
主goroutine退出后,其它的工作goroutine也會自動退出:
func main() {// 如果不加go執行順序是:先執行test1再執行test2,是有順序的,但是如果有go關鍵字就是同時在執行了go goroutineTest01()go goroutineTest02()for {} }func goroutineTest01() {for i := 0; i < 10; i++ {fmt.Println("goroutineTest01執行")time.Sleep(1000 * time.Millisecond)} }func goroutineTest02() {for i := 0; i < 10; i++ {fmt.Println("goroutineTest02執行")time.Sleep(1000 * time.Millisecond)} }runtime包:
runtime.Gosched()用于讓出CPU時間片,讓出當前goroutine的執行權限,調度器安排其他等待的任務運行,并在下次再獲得cpu時間輪片的時候,從該出讓cpu的位置恢復執行。有點像跑接力賽,A跑了一會碰到代碼runtime.Gosched() 就把接力棒交給B了,A歇著了,B繼續跑。
func main() {go func() {for {fmt.Println("我不讓出時間片")}}()for {runtime.Gosched() // 讓出當前時間片fmt.Println("我讓出時間片")} }runtime.Goexit() 將立即終止當前 goroutine 執?,調度器確保所有已注冊 defer延遲調用被執行。Goexit之前注冊的defer會生效,之后不會。
func main() {go func() { // Goexit直接退出funcfmt.Println("走我嗎——1")goexit()fmt.Println("走我嗎——2")}()for {} }func goexit() {//returnfmt.Println("走我嗎——3")runtime.Goexit() // 退出當前go程defer fmt.Println("走我嗎——4") }runtime.GOMAXPROCS()用來設置可以并行計算的CPU核數的最大值,并返回上一次的核心數,如果是第一次調用就返回默認值。
func main() {// func GOMAXPROCS(n int) int {} 參數是要設置的核心數,返回值是上一次設置的核心數 num := runtime.GOMAXPROCS(1)fmt.Println("上一次設置核心數為:", num)for {// 0和1會一直交替打印,如果用GOMAXPROCS限制1個核心,那么誰搶到誰就一直跑go fmt.Println(0)fmt.Println(1)} }channel:
- channel可以建立goroutine之間的通信連接,channel的特點是:先進先出、線程安全不需要加鎖,channel是Go語言中的一個核心類型,可以把它看成管道。并發核心單元通過它就可以發送或者接收數據通訊,這在一定程度上又進一步降低了編程的難度。channel是一個數據類型,主要用來解決協程的同步問題以及協程之間數據共享(數據傳遞)的問題。
- goroutine運行在相同的地址空間,因此訪問共享內存必須做好同步。goroutine 奉行通過通信來共享內存,而不是共享內存來通信。引?類型 channel可用于多個 goroutine 通訊。其內部實現了同步,確保并發安全。
- channel分為有緩沖和無緩沖
channel分為兩個端:
傳入端負責寫的操作,輸出端負責讀的操作
讀和寫必須同時滿足條件,才在會進行數據流動,否則會阻塞。
例:channel就是一個外賣小哥,傳入端是賣家,輸出端是買家,必須保證賣家把商品給外賣小哥以后,買家正在準備拿,否則外賣小哥就會懵逼了。
無緩沖的channel:
- 無緩沖的通道(unbuffered channel)指在接收前不會保存任何數據的一個通道。通道容量為0,可以實現同步的操作,操作前提是讀和寫必須同時操作,否則會阻塞。
- 緩沖:中間加了個存放數據的區域,然后緩存區慢了才會寫入,就像Java的緩沖流一樣
- 這種類型的通道要求發送goroutine和接收goroutine同時準備好,才能完成發送和接收操作。否則,通道會導致先執行發送或接收操作的 goroutine 阻塞等待。
- 這種對通道進行發送和接收的交互行為本身就是同步的。其中任意一個操作都無法離開另一個操作單獨存在。
- 阻塞:由于某種原因數據沒有到達,當前協程(線程)持續處于等待狀態,直到條件滿足,才解除阻塞。
- 同步:在兩個或多個協程(線程)間,保持數據內容一致性的機制。
無緩沖創建:
chan是創建channel所需使用的關鍵字。Type代表指定channel收發數據的類型。
當參數capacity= 0 時,channel 是無緩沖阻塞讀寫的
當capacity > 0 時,channel 有緩沖、是非阻塞的,直到寫滿 capacity個元素才阻塞寫入。
channel通過操作符<-來發送和接收數據
發送和接收數據語法:
默認情況下,channel接收和發送數據都是阻塞的,除非另一端已經準備好,這樣就使得goroutine同步變的更加的簡單,而不需要顯式的lock。
channel <- value //發送value到channel<-channel //接收并將其丟棄x := <-channel //從channel中接收數據,并賦值給xx, ok := <-channel //功能同上,同時檢查通道是否已關閉或者是否為空演示:
func main() {go channelTest01()go channelTest02()for {} }// 定義channel var channel = make(chan int)// 定義一個公共操作類 func print(s string) {for _, ch := range s {fmt.Printf("%c", ch)time.Sleep(300 * time.Millisecond)} }// 定義兩個人使用打印機 func channelTest01() {print("person01")channel <- 1 // person01負責寫的操作,隨便寫的數字都行,相當于規定了兩個方法的執行順序 } func channelTest02() {<-channel // person02負責讀channel中的數據,也就是先把person01的數據讀出來才會繼續執行person02的任務//num := <-channel // person02負責讀channel中的數據,也就是先把person01的數據讀出來才會繼續執行person02的任務// 如果一個寫,一個沒讀,或者是一個讀一個沒寫就會阻塞print("person02")//fmt.Println(num) //也可以定義一個變量存起來 }演示:
func main() {// 創建無緩沖通道,長度默認為0ch := make(chan string)// 驗證長度和容量 len(ch):channel中數據剩余未讀取的個數 cap(ch):channel的容量fmt.Println("channel中未讀取的個數:", len(ch), "channel的容量:", cap(ch))// 定義匿名go程go func() {for i := 0; i < 3; i++ {fmt.Println("匿名go程循環:", i, "channel中未讀取的個數:", len(ch), "channel的容量:", cap(ch))}ch <- "匿名go程執行完畢"}()// 主函數讀取channel中的數據result := <-chfmt.Println("result", result) }演示:
func main() {ch := make(chan int)go func() {for i := 0; i < 3; i++ {fmt.Println("匿名channel在寫:", i)ch <- i}}()// time.Sleep(300 * time.Millisecond)for i := 0; i < 3; i++ {result := <-chfmt.Println("main函數channel在讀:", result)} }打印結果:
可以看到結果并不是一個寫一個讀的情況,原因是這樣的
有緩沖的channel:
- 有緩沖的通道(buffered channel)是一種在被接收前能存儲一個或者多個數據值的通道。
- 這種類型的通道并不強制要求 goroutine 之間必須同時完成發送和接收。通道會阻塞發送和接收動作的條件也不同。無緩沖的是同步操作,有緩沖是異步操作
- 只有通道中沒有要接收的值時,接收動作才會阻塞。
- 只有通道沒有可用緩沖區容納被發送的值時,發送動作才會阻塞。
- 這導致有緩沖的通道和無緩沖的通道之間的一個很大的不同:無緩沖的通道保證進行發送和接收的 goroutine 會在同一時間進行數據交換;有緩沖的通道沒有這種保證。
有緩沖的channel創建格式:
如果給定了一個緩沖區容量,通道就是異步的。只要緩沖區有未使用空間用于發送數據,或還包含可以接收的數據,那么其通信就會無阻塞地進行
make(chan Type, capacity)演示:
有可能會出現
func main() {ch := make(chan int, 3)fmt.Println("初始數據:", "channel中未讀取的個數:", len(ch), "channel的容量:", cap(ch))go func() {for i := 0; i < 3; i++ {fmt.Println("匿名go程循環:", i, "channel中未讀取的個數:", len(ch), "channel的容量:", cap(ch))ch <- i}}()time.Sleep(300 * time.Millisecond)for i := 0; i < 3; i++ {result := <-chfmt.Println("main函數channel在讀:", result, "channel中未讀取的個數:", len(ch), "channel的容量:", cap(ch))} }關閉channel:
如果發送者知道,沒有更多的值需要發送到channel的話,那么讓接收者也能及時知道沒有多余的值可接收將是有用的,因為接收者可以停止不必要的接收等待。這可以通過內置的close函數來關閉channel實現。
- channel不像文件一樣需要經常去關閉,只有當你確實沒有任何發送數據了,或者你想顯式的結束range循環之類的,才去關閉channel
- 關閉channel后,無法向channel 再發送數據(引發 panic 錯誤后導致接收立即返回零值)
- 閉channel后,可以繼續從channel接收數據
- 對于nil channel,無論收發都會被阻塞。
- 如果不知道發送端要發多少次可以使用 range 來迭代channel
判斷channel是否關閉的兩種方式
if num , ok := <- ch; ok == true{}for num := range ch {}- 如果已經關閉,ok為false,num無數據
- 如果沒有關閉,ok為true,num保存讀取的數據
- golang中還是引用了管道的特性,關閉后就會返回0,但是不需要需判斷是否為0,而是用ok判斷
單向channel:
默認情況下,通道channel是雙向的,也就是,既可以往里面發送數據也可以往里面接收數據。但是,我們經常見一個通道作為參數進行傳遞而值希望對方是單向使用的,要么只讓它發送數據,要么只讓它接收數據,這時候我們可以指定通道的方向。
單向channel變量聲明:
var ch1 chan int // ch1是一個正常的channel,是雙向的 var ch2 chan<- float64 // ch2是單向channel,只用于寫float64數據 var ch3 <-chan int // ch3是單向channel,只用于讀int數據- chan<- 表示數據進入管道,要把數據寫進管道,對于調用者就是輸出。
- <-chan 表示數據從管道出來,對于調用者就是得到管道的數據,當然就是輸入。
- 雙向channel可以隱式轉換為任意一種單向channel,單向channel不可以轉換為雙向channel
channel作為函數參數:
channel傳參是引用,好處就是多個Goroutine通信的時候會共用一個channel
演示:
func main() {ch := make(chan int, 3)go func() {send(ch) // 相當于雙向轉為單向寫的操作}()read(ch) }// 讀 func read(in <-chan int) {n := <-infmt.Println("讀到:", n) }// 寫 func send(out chan<- int) {out <- 24close(out) }生產者消費者模型:
- 單向channel最典型的應用是生產者消費者模型
- 生產者消費者模型: 某個模塊(函數等)負責產生數據,這些數據由另一個模塊來負責處理(此處的模塊是廣義的,可以是類、函數、協程、線程、進程等)。產生數據的模塊,就形象地稱為生產者;而處理數據的模塊,就稱為消費者。
- 單單抽象出生產者和消費者,還夠不上是生產者/消費者模型。該模式還需要有一個緩沖區處于生產者和消費者之間,作為一個中介。生產者把數據放入緩沖區,而消費者從緩沖區取出數據。
例:小明負責蒸包子,小紅負責吃包子,桌子用來放包子
這個緩沖區有什么用呢?為什么不讓生產者直接調用消費者的某個函數,直接把數據傳遞過去,而畫蛇添足般的設置一個緩沖區呢?
1、解耦
假設生產者和消費者分別是兩個類。如果讓生產者直接調用消費者的某個方法,那么生產者對于消費者就會產生依賴(也就是耦合)。將來如果消費者的代碼發生變化,可能會直接影響到生產者。而如果兩者都依賴于某個緩沖區,兩者之間不直接依賴,耦合度也就相應降低了。
2、處理并發
生產者直接調用消費者的某個方法,還有另一個弊端。由于函數調用是同步的(或者叫阻塞的),在消費者的方法沒有返回之前,生產者只好一直等在那邊。萬一消費者處理數據很慢,生產者只能無端浪費時間。使用了生產者/消費者模式之后,生產者和消費者可以是兩個獨立的并發主體。生產者把制造出來的數據往緩沖區一丟,就可以再去生產下一個數據。基本上不用依賴消費者的處理速度。其實最當初這個生產者消費者模式,主要就是用來處理并發問題的。
3、緩存
如果生產者制造數據的速度時快時慢,緩沖區的好處就體現出來了。當數據制造快的時候,消費者來不及處理,未處理的數據可以暫時存在緩沖區中。等生產者的制造速度慢下來,消費者再慢慢處理掉。
演示:
type OrderInfo struct {orderId int // 訂單id }func product(out chan<- OrderInfo) {for i := 0; i < 10; i++ {order := OrderInfo{orderId: i + 1}out <- order}close(out) }func consumer(in <-chan OrderInfo) {for order := range in {fmt.Println("訂單id為:", order.orderId)} }func main() {ch := make(chan OrderInfo)go product(ch)consumer(ch) }條件變量:
- 條件變量的作用并不保證在同一時刻僅有一個協程(線程)訪問某個共享的數據資源,而是在對應的共享數據的狀態發生變化時,通知阻塞在某個條件上的協程(線程)。條件變量不是鎖,在并發中不能達到同步的目的,因此條件變量總是與鎖一塊使用。
- 例如,上面說的,如果倉庫隊列滿了,我們可以使用條件變量讓生產者對應的goroutine暫停(阻塞),但是當消費者消費了某個產品后,倉庫就不再滿了,應該喚醒(發送通知給)阻塞的生產者goroutine繼續生產產品。GO標準庫中的sys.Cond類型代表了條件變量。條件變量要與鎖(互斥鎖,或者讀寫鎖)一起使用。成員變量L代表與條件變量搭配使用的鎖。對應的有3個常用方法,Wait、Signal、Broadcast
func (c *Cond) Wait()該函數的作用可歸納為如下三點:
func (c *Cond) Signal()
- 單發通知,給一個正等待(阻塞)在該條件變量上的goroutine(線程)發送通知。
func (c *Cond) Broadcast()
- 廣播通知,給正在等待(阻塞)在該條件變量上的所有goroutine(線程)發送通知。
演示:
var cond sync.Cond // 創建全局條件變量// 生產者 func producer(out chan<- int, idx int) {for {cond.L.Lock() // 條件變量對應互斥鎖加鎖for len(out) == 3 { // 產品區滿 等待消費者消費cond.Wait() // 掛起當前協程, 等待條件變量滿足,被消費者喚醒}num := rand.Intn(1000) // 產生一個隨機數out <- num // 寫入到 channel 中 (生產)fmt.Printf("%dth 生產者,產生數據 %3d, 公共區剩余%d個數據\n", idx, num, len(out))cond.L.Unlock() // 生產結束,解鎖互斥鎖cond.Signal() // 喚醒 阻塞的 消費者time.Sleep(time.Second) // 生產完休息一會,給其他協程執行機會} }//消費者 func consumer(in <-chan int, idx int) {for {cond.L.Lock() // 條件變量對應互斥鎖加鎖(與生產者是同一個)for len(in) == 0 { // 產品區為空 等待生產者生產cond.Wait() // 掛起當前協程, 等待條件變量滿足,被生產者喚醒}num := <-in // 將 channel 中的數據讀走 (消費)fmt.Printf("---- %dth 消費者, 消費數據 %3d,公共區剩余%d個數據\n", idx, num, len(in))cond.L.Unlock() // 消費結束,解鎖互斥鎖cond.Signal() // 喚醒 阻塞的 生產者time.Sleep(time.Millisecond * 500) //消費完 休息一會,給其他協程執行機會} } func main() {rand.Seed(time.Now().UnixNano()) // 設置隨機數種子quit := make(chan bool) // 創建用于結束通信的 channelproduct := make(chan int, 3) // 產品區(公共區)使用channel 模擬cond.L = new(sync.Mutex) // 創建互斥鎖和條件變量for i := 0; i < 5; i++ { // 5個消費者go producer(product, i+1)}for i := 0; i < 3; i++ { // 3個生產者go consumer(product, i+1)}<-quit // 主協程阻塞 不結束 }/* 1. main函數中定義quit,其作用是讓主協程阻塞。 2. 定義product作為隊列,生產者產生數據保存至隊列中,最多存儲3個數據,消費者從中取出數據模擬消費 3. 條件變量要與鎖一起使用,這里定義全局條件變量cond,它有一個屬性:L Locker。是一個互斥鎖。 4. 開啟5個消費者協程,開啟3個生產者協程。 5. producer生產者,在該方法中開啟互斥鎖,保證數據完整性。并且判斷隊列是否滿,如果已滿,調用wait()讓該goroutine阻塞。當消費者取出數后執行cond.Signal(),會喚醒該goroutine,繼續生產數據。 6. consumer消費者,同樣開啟互斥鎖,保證數據完整性。判斷隊列是否為空,如果為空,調用wait()使得當前goroutine阻塞。當生產者產生數據并添加到隊列,執行cond.Signal() 喚醒該goroutine。 */定時器:
ime.Timer是一個定時器。代表未來的一個單一事件,你可以告訴timer你要等待多長時間。它提供一個channel,在定時時間到達之前,沒有數據寫入timer.C會一直阻塞。直到定時時間到,向channel寫入值,阻塞解除,可以從中讀取數據。
創建:
func main() {// 三種方法完成定時、NewTimer、After// 當前時間fmt.Printf("當前時間:%v\n", time.Now())// 創建定時器,2秒后,定時器向定時器的C發送time.Timer類型的元素值timer := time.NewTimer(time.Second * 2)nowTimer := <-timer.Cfmt.Println("nowTimer", nowTimer)fmt.Printf("當前時間:%v\n", time.Now())after := <-time.After(time.Second * 2)fmt.Println("after", after)timeNow02 := <-timer.Cfmt.Printf("timeNow02:%v\n", timeNow02) // 當前時間// time.Sleeptimer2 := time.NewTimer(time.Second * 2)<-timer2.Cfmt.Println("可以實現單純的等待2秒")time.Sleep(time.Second * 2)fmt.Println("再一次2s后") }重置和關閉:
func main() {// 定時停止和重置timer3 := time.NewTimer(time.Second * 3)go func() {<-timer3.Cfmt.Println("timer3運行完畢")}()stop := timer3.Stop() // 設置定時器停止if stop {fmt.Println("已經停止")}timer4 := time.NewTimer(time.Second * 3) // 原設置時間timer4.Reset(time.Second * 1) // 重新設置時間<-timer4.Cfmt.Println("after") }定時器周期定時:
func main() {quit := make(chan bool)i := 0fmt.Println("當前時間:", time.Now())// NewTicker:周期定時器ticker := time.NewTicker(time.Second)go func() {for {i++nowTime := <-ticker.Cfmt.Println("nowTime", nowTime)if i == 5 {quit <- true}}}()<-quit }select:
通過select可以監聽channel上的數據流動select的用法與switch語言非常類似,由select開始一個新的選擇塊,每個選擇條件由case語句來描述。與switch語句相比, select有比較多的限制,其中最大的一條限制就是每個case語句里必須是一個IO操作(讀寫),大致的結構如下:
select {case <-chan1:// 如果chan1成功讀到數據,則進行該case處理語句case chan2 <- 1:// 如果成功向chan2寫入數據,則進行該case處理語句default:// 如果上面都沒有成功,則進入default處理流程}在一個select語句中,Go語言會按順序從頭至尾評估每一個發送和接收的語句。如果其中的任意一語句可以繼續執行(即沒有被阻塞),那么就從那些可以執行的語句中任意選擇一條來使用。如果沒有任意一條語句可以執行(即所有的通道都被阻塞),那么有兩種可能的情況:
- 如果給出了default語句,那么就會執行default語句,同時程序的執行會從select語句后的語句中恢復。
- 如果沒有default語句,那么select語句將被阻塞,直到至少有一個通信可以進行下去,但是會產生忙輪詢
演示:
func main() {ch := make(chan int) // 用來進行數據通信的channelquit := make(chan bool) // 用來判斷是否退出的channelgo func() {for i := 0; i < 5; i++ {ch <- itime.Sleep(time.Second)}close(ch)quit <- true // 通知主go程退出runtime.Goexit()}()// 監聽channel,讀取數據for {select {case num := <-ch:fmt.Println("讀到的數據為:", num)case <-quit:return}} }斐波那契數列:
func main() {ch := make(chan int) // 用來進行數據通信的channelquit := make(chan bool) // 用來判斷是否退出的channelgo f(ch, quit)x, y := 1, 1for i := 0; i < 20; i++ {ch <- xx, y = y, x+y}quit <- true }func f(ch <-chan int, quit <-chan bool) {for {select {case num := <-ch:fmt.Print(num, " ")case <-quit://returnruntime.Goexit()}} }超時:
有時候會出現goroutine阻塞的情況,使用select我們如何避免整個程序進入阻塞的情況
演示:
func main() {ch := make(chan int)quit := make(chan bool)go func() {for {select {case v := <-ch:fmt.Println(v)// 設置5秒讀取不到數據就退出,避免阻塞case <-time.After(5 * time.Second):fmt.Println("timeout")quit <- truebreak}}}()ch <- 666 // 寫完5秒后退出<-ch }總結
以上是生活随笔為你收集整理的Golang并发——并发技术Goroutine和channel的使用、定时器、生产者消费者、条件变量、select的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Golang——递归的使用
- 下一篇: jquery 字符串去首尾空格_jque