Go嵌套并发实现EDM,附坑点分析#1
看著身邊優(yōu)秀的小伙伴們早就開始寫博客,自己深感落后,還好遲做總比不做好,勉勵自己見賢思齊。趁著年前最后一個周末,陽光正好,寫下第一篇博客,為2019年開個頭,以期完成今年為自己立下的flags。
從PHPer轉Gopher,很大一個原因就是業(yè)務對性能和并發(fā)的持續(xù)需求,另一個主要原因就是Go語言原生的并發(fā)特性,可以在提供同等高可用的能力下,使用更少的機器資源,節(jié)約可觀的成本。因此本文就結合自己在學習Go并發(fā)的實戰(zhàn)demo中,把遇到的一些坑點寫下來,共享進步。
1. 在Go語言中實現(xiàn)并發(fā)控制,目前主要有三種方式:
a) Channel - 分為無緩沖、有緩沖通道;
b) WaitGroup - sync包提供的goroutine間的同步機制;
c) Context - 在調(diào)用鏈不同goroutine間傳遞和共享數(shù)據(jù);
本文demo中主要用到了前兩種,基本使用請查看官方文檔。
2. Demo需求與分析:
需求:實現(xiàn)一個EDM的高效郵件發(fā)送:需要支持多個國家(可以看成是多個任務),需要記錄每條任務發(fā)送的狀態(tài)(當前成功、失敗條數(shù)),需要支持可暫停(stop)、重新發(fā)送(run)操作。
分析:從需求可以看出,在郵件發(fā)送中可以通過并發(fā)實現(xiàn)多個國家(多個任務)并發(fā)、單個任務分批次并發(fā)實現(xiàn)快速、高效EDM需求。
3. Demo實戰(zhàn)源碼:
3.1 main.go
package mainimport ("bufio""fmt""io""log""os""strconv""sync""time" )var (batchLength = 20wg sync.WaitGroupfinish = make(chan bool) )func main() {startTime := time.Now().UnixNano()for i := 1; i <= 3; i++ {filename := "./task/edm" + strconv.Itoa(i) + ".txt"start := 60go RunTask(filename, start, batchLength)}// main 阻塞等待goroutine執(zhí)行完成fmt.Println(<-finish)fmt.Println("finished all tasks.")endTime := time.Now().UnixNano()fmt.Println("Total cost(ms):", (endTime-startTime)/1e6) }// 單任務 func RunTask(filename string, start, length int) (retErr error) {for {readLine, err := ReadLines(filename, start, length)if err == io.EOF {fmt.Println("Read EOF:", filename)retErr = errbreak}if err != nil {fmt.Println(err)retErr = errbreak}fmt.Println("current line:", readLine)start += length// 等待一批完成才進入下一批//wg.Wait()}wg.Wait()finish <- truereturn retErr } 復制代碼注意上面wg.Wait()的位置(下面有討論),在finish channel之前,目的是為了等待子goroutine運行完,再通過一個無緩沖通道finish通知main goroutine,然后main運行結束。
func ReadLines()讀取指定行數(shù)據(jù):
// 讀取指定行數(shù)據(jù) func ReadLines(filename string, start, length int) (line int, retErr error) {fmt.Println("current file:", filename)fileObj, err := os.Open(filename)if err != nil {panic(err)}defer fileObj.Close()// 跳過開始行之前的行-ReadString方式startLine := 1endLine := start + lengthreader := bufio.NewReader(fileObj)for {line, err := reader.ReadString(byte('\n'))if err == io.EOF {fmt.Println("Read EOF:", filename)retErr = errbreak}if err != nil {log.Fatal(err)retErr = errbreak}if startLine > start && startLine <= endLine {wg.Add(1)// go并發(fā)執(zhí)行go SendEmail(line)if startLine == endLine {break}}startLine++}return startLine, retErr }// 模擬郵件發(fā)送 func SendEmail(email string) error {defer wg.Done()time.Sleep(time.Second * 1)fmt.Println(email)return nil } 復制代碼運行上面main.go,3個任務在1s內(nèi)并發(fā)完成所有郵件(./task/edm1.txt中一行表示一個郵箱)發(fā)送。
truefinished all tasks.Total cost(ms): 1001 復制代碼那么問題來了:沒有實現(xiàn)分批每次并發(fā)batchLength = 20,因為如果不分批發(fā)送,只要其中某個任務或某一封郵件出錯了,那下次重新run的時候,會不知道哪些用戶已經(jīng)發(fā)送過了,出現(xiàn)重復發(fā)送。而分批發(fā)送即使中途出錯了,下一次重新run可從上次出錯的end行開始,最多是[start - end]一個batchLength 發(fā)送失敗,可以接受。
于是,將倒數(shù)第5行wg.Wait()注釋掉,倒數(shù)第8行注釋打開,如下:
// 單任務 func RunTask(filename string, start, length int) (retErr error) {for {readLine, err := ReadLines(filename, start, length)if err == io.EOF {fmt.Println("Read EOF:", filename)retErr = errbreak}if err != nil {fmt.Println(err)retErr = errbreak}fmt.Println("current line:", readLine)start += length// 等待一批完成才進入下一批wg.Wait()}//wg.Wait()finish <- truereturn retErr } 復制代碼運行就報錯:
panic: sync: WaitGroup is reused before previous Wait has returned 復制代碼提示W(wǎng)aitGroup在goroutine之間重用了,雖然是全局變量,看起來是使用不當。怎么調(diào)整呢?
3.2 main.go
package mainimport ("bufio""fmt""io""log""os""strconv""sync""time" )var (batchLength = 10outerWg sync.WaitGroup )func main() {startTime := time.Now().UnixNano()for i := 1; i <= 3; i++ {filename := "./task/edm" + strconv.Itoa(i) + ".txt"start := 60outerWg.Add(1)go RunTask(filename, start, batchLength)}// main 阻塞等待goroutine執(zhí)行完成outerWg.Wait()fmt.Println("finished all tasks.")endTime := time.Now().UnixNano()fmt.Println("Total cost(ms):", (endTime-startTime)/1e6) }// 單任務 func RunTask(filename string, start, length int) (retErr error) {for {isFinish := make(chan bool)readLine, err := ReadLines(filename, start, length, isFinish)if err == io.EOF {fmt.Println("Read EOF:", filename)retErr = errbreak}if err != nil {fmt.Println(err)retErr = errbreak}// 等待一批完成才進入下一批fmt.Println("current line:", readLine)start += length<-isFinish// 關閉channel,釋放資源close(isFinish)}outerWg.Done()return retErr } 復制代碼從上面可以看出:調(diào)整的思路是外層用WaitGroup控制,里層用channel 控制,執(zhí)行又報錯 : (
fatal error: all goroutines are asleep - deadlock!goroutine 1 [semacquire]:sync.runtime_Semacquire(0x55fe7c)/usr/local/go/src/runtime/sema.go:56 +0x39sync.(*WaitGroup).Wait(0x55fe70)/usr/local/go/src/sync/waitgroup.go:131 +0x72main.main()/home/work/data/www/docker_env/www/go/src/WWW/edm/main.go:31 +0x1abgoroutine 5 [chan send]:main.ReadLines(0xc42001c0c0, 0xf, 0x3c, 0xa, 0xc42008e000, 0x0, 0x0, 0x0) 復制代碼仔細檢查,發(fā)現(xiàn)上面代碼中定義的isFinish 是一個無緩沖channel,在發(fā)郵件SendMail() 子協(xié)程沒有完成時,讀取一個無數(shù)據(jù)的無緩沖通道將阻塞當前goroutine,其他goroutine也是一樣的都被阻塞,這樣就出現(xiàn)了all goroutines are asleep - deadlock!
于是將上面代碼改為有緩沖繼續(xù)嘗試:
isFinish := make(chan bool, 1) // 讀取指定行數(shù)據(jù) func ReadLines(filename string, start, length int, isFinish chan bool) (line int, retErr error) {fmt.Println("current file:", filename)// 控制每一批發(fā)完再下一批var wg sync.WaitGroupfileObj, err := os.Open(filename)if err != nil {panic(err)}defer fileObj.Close()// 跳過開始行之前的行-ReadString方式startLine := 1endLine := start + lengthreader := bufio.NewReader(fileObj)for {line, err := reader.ReadString(byte('\n'))if err == io.EOF {fmt.Println("Read EOF:", filename)retErr = errbreak}if err != nil {log.Fatal(err)retErr = errbreak}if startLine > start && startLine <= endLine {wg.Add(1)// go并發(fā)執(zhí)行go SendEmail(line, wg)if startLine == endLine {isFinish <- truebreak}}startLine++}wg.Wait()return startLine, retErr }// 模擬郵件發(fā)送 func SendEmail(email string, wg sync.WaitGroup) error {defer wg.Done()time.Sleep(time.Second * 1)fmt.Println(email)return nil } 復制代碼運行,又報錯了 : (
fatal error: all goroutines are asleep - deadlock!goroutine 1 [semacquire]:sync.runtime_Semacquire(0x55fe7c)/usr/local/go/src/runtime/sema.go:56 +0x39sync.(*WaitGroup).Wait(0x55fe70) 復制代碼這次提示有點不一樣,看起來是里層的WaitGroup 導致了死鎖,繼續(xù)檢查發(fā)現(xiàn)里層wg 是值傳遞,應該使用指針傳引用。
// go并發(fā)執(zhí)行 go SendEmail(line, wg) 復制代碼最后修改代碼如下:
// 讀取指定行數(shù)據(jù) func ReadLines(filename string, start, length int, isFinish chan bool) (line int, retErr error) {fmt.Println("current file:", filename)// 控制每一批發(fā)完再下一批var wg sync.WaitGroupfileObj, err := os.Open(filename)if err != nil {panic(err)}defer fileObj.Close()// 跳過開始行之前的行-ReadString方式startLine := 1endLine := start + lengthreader := bufio.NewReader(fileObj)for {line, err := reader.ReadString(byte('\n'))if err == io.EOF {fmt.Println("Read EOF:", filename)retErr = errbreak}if err != nil {log.Fatal(err)retErr = errbreak}if startLine > start && startLine <= endLine {wg.Add(1)// go并發(fā)執(zhí)行go SendEmail(line, &wg)if startLine == endLine {isFinish <- truebreak}}startLine++}wg.Wait()return startLine, retErr }// 模擬郵件發(fā)送 func SendEmail(email string, wg *sync.WaitGroup) error {defer wg.Done()time.Sleep(time.Second * 1)fmt.Println(email)return nil } 復制代碼趕緊運行一下,這次終于成功啦 : )
current line: 100current file: ./task/edm2.txtRead EOF: ./task/edm2.txtRead EOF: ./task/edm2.txtfinished all tasks.Total cost(ms): 4003 復制代碼每個任務模擬的是100行,從第60行開始運行,四個任務并發(fā)執(zhí)行,每個任務分批內(nèi)再次并發(fā),并且控制了每一批次完成后再進行下一批,所以總運行時間約4s,符合期望值。完整源碼請閱讀原文或移步GitHub:github.com/astraw99/ed…
4. 小結:
本文通過兩層嵌套Go 并發(fā),模擬實現(xiàn)了高性能并發(fā)EDM,具體的一些出錯行控制、任務中斷與再次執(zhí)行將在下次繼續(xù)討論,主要邏輯已跑通,幾個坑點小結如下:
a) WaitGroup 一般用于main 主協(xié)程等待全部子協(xié)程退出后,再優(yōu)雅退出主協(xié)程;嵌套使用時注意wg.Wait()放的位置;
b) 合理使用channel,無緩沖chan將阻塞當前goroutine,有緩沖chan在cap未滿的情況下不會阻塞當前goroutine,使用完記得釋放chan資源;
c) 注意函數(shù)間傳值或傳引用(本質上還是傳值,傳的指針的指針內(nèi)存值)的合理使用;
后記:第一篇博客寫到這里差不多算完成了,一不小心一個下午就過去了,寫的邏輯、可讀性可能不太好請見諒,歡迎留言批評指正。感謝您的閱讀。
轉載于:https://juejin.im/post/5c4da16f5188253a317b7637
總結
以上是生活随笔為你收集整理的Go嵌套并发实现EDM,附坑点分析#1的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 理解bootstrap的列偏移offse
- 下一篇: Mongodb在使用过程中有什么问题