Colly源码解析——结合例子分析底层实现
? ? ? ? 通過《Colly源碼解析——框架》分析,我們可以知道Colly執行的主要流程。本文將結合http://go-colly.org上的例子分析一些高級設置的底層實現。(轉載請指明出于breaksoftware的csdn博客)
遞歸深度
? ? ? ? 以下例子截取于Basic
c := colly.NewCollector(// Visit only domains: hackerspaces.org, wiki.hackerspaces.orgcolly.AllowedDomains("hackerspaces.org", "wiki.hackerspaces.org"),)// On every a element which has href attribute call callbackc.OnHTML("a[href]", func(e *colly.HTMLElement) {link := e.Attr("href")// Print linkfmt.Printf("Link found: %q -> %s\n", e.Text, link)// Visit link found on page// Only those links are visited which are in AllowedDomainsc.Visit(e.Request.AbsoluteURL(link))})
? ? ? ? c是Collector指針,它的Visit方法給scrape傳遞的“深度”值是1。
func (c *Collector) Visit(URL string) error {return c.scrape(URL, "GET", 1, nil, nil, nil, true)
}
? ? ? ? 由于NewCollector構造的Collector.MaxDepth為0,而在scrape方法內部調用的requestCheck中,如果此值為0,則不會去做深度檢測
// requestCheck methodif c.MaxDepth > 0 && c.MaxDepth < depth {return ErrMaxDepth}
? ? ? ? 如果希望通過MaxDepth控制深度,則可以參見Max depth例子
c := colly.NewCollector(// MaxDepth is 1, so only the links on the scraped page// is visited, and no further links are followedcolly.MaxDepth(1),)// On every a element which has href attribute call callbackc.OnHTML("a[href]", func(e *colly.HTMLElement) {link := e.Attr("href")// Print linkfmt.Println(link)// Visit link found on pagee.Request.Visit(link)})
? ? ? ? 第4行將深度設置為1,這樣理論上只能訪問第一層的URL。
? ? ? ? 如果OnHTML中的代碼和Basic例子一樣,即使用Collector的Visit訪問URL,則由于其depth一直傳1,而導致requestCheck的深度檢測一直不滿足條件,從而會訪問超過1層的URL。
? ? ? ? 所以第13行,調用的是HTMLElement的Visit方法
func (r *Request) Visit(URL string) error {return r.collector.scrape(r.AbsoluteURL(URL), "GET", r.Depth+1, nil, r.Ctx, nil, true)
}
? ? ? ? 相較于Collector的Visit,HTMLElement的Visit方法將Depth增加了1,并且傳遞了請求的上下文(ctx)。由于depth有變化,所以之后的深度檢測會返回錯誤,從而只會訪問1層URL。
規則
? ? ? ? Collector的Limit方法用于設置各種規則。這些規則最終在Collector的httpBackend成員中執行。
? ? ? ? 一個Collector只有一個httpBackend結構體指針,而一個httpBackend結構體可以有一組規則
type httpBackend struct {LimitRules []*LimitRuleClient *http.Clientlock *sync.RWMutex
}
? ? ? ? 規則針對Domain來區分,我們可以通過設定不同的匹配規則,讓每組URL執行相應的操作。這些操作包括:
- 訪問并行數
- 訪問間隔延遲
? ? ? ? 參見Parallel例子。只截取其中關鍵一段
// Limit the maximum parallelism to 2// This is necessary if the goroutines are dynamically// created to control the limit of simultaneous requests.//// Parallelism can be controlled also by spawning fixed// number of go routines.c.Limit(&colly.LimitRule{DomainGlob: "*", Parallelism: 2})
? ? ? ? Collector的Limit最終會調用到httpBackend的Limit,它將規則加入到規則組后初始化該規則。
// Init initializes the private members of LimitRule
func (r *LimitRule) Init() error {waitChanSize := 1if r.Parallelism > 1 {waitChanSize = r.Parallelism}r.waitChan = make(chan bool, waitChanSize)hasPattern := falseif r.DomainRegexp != "" {c, err := regexp.Compile(r.DomainRegexp)if err != nil {return err}r.compiledRegexp = chasPattern = true}if r.DomainGlob != "" {c, err := glob.Compile(r.DomainGlob)if err != nil {return err}r.compiledGlob = chasPattern = true}if !hasPattern {return ErrNoPattern}return nil
}
? ? ? ? 第7行創建了一個可以承載waitChanSize個元素的channel。可以看到,如果我們在規則中沒有設置并行數,也會創建只有1個元素的channel。這個channel會被用于調節并行執行的任務數量。所以這也就意味著,一旦調用了Limit方法而沒設置Parallelism值,該Collector中針對符合規則的請求就會變成串行的。
? ? ? ? 第10和18行分別針對不同規則初始化一個編譯器。因為這個操作比較重,所以在初始化時執行,之后只是簡單使用這些編譯器即可。
? ? ? ? 當發起請求時,流程最終會走到httpBackend的Do方法
func (h *httpBackend) Do(request *http.Request, bodySize int) (*Response, error) {r := h.GetMatchingRule(request.URL.Host)if r != nil {r.waitChan <- truedefer func(r *LimitRule) {randomDelay := time.Duration(0)if r.RandomDelay != 0 {randomDelay = time.Duration(rand.Int63n(int64(r.RandomDelay)))}time.Sleep(r.Delay + randomDelay)<-r.waitChan}(r)}
? ? ? ? 第2行通過域名查找對應的規則,如果找到,則在第4行嘗試往channel中加入元素。這個操作相當于上鎖。如果channel此時是滿的,則該流程會被掛起。否則就執行之后的流程。在Do函數結束,命中規則的會執行上面的匿名函數,它在休眠規則配置的時間后,嘗試從channel中獲取數據。這個操作相當于釋放鎖。
? ? ? ? Colly就是通過channel的特性實現了并行控制。
并行
? ? ? ? 在“規則”一節,我們講到可以通過Parallelism控制并行goroutine的數量。httpBackend的Do方法最終將被Collector的fetch方法調用,而該方法可以被異步執行,即是一個goroutine。這就意味著承載Do邏輯的goroutine執行完畢后就會退出。而一種類似線程的技術在Colly也被支持,它更像一個生產者消費者模型。消費者線程執行完一個任務后不會退出,而在生產者生產出的物料池中取出未處理的任務加以處理。
? ? ? ? 以下代碼截取于Queue
q, _ := queue.New(2, // Number of consumer threads&queue.InMemoryQueueStorage{MaxSize: 10000}, // Use default queue storage)……for i := 0; i < 5; i++ {// Add URLs to the queueq.AddURL(fmt.Sprintf("%s?n=%d", url, i))}// Consume URLsq.Run(c)
? ? ? ? 這次沒有調用Collector的Visit等函數,而是調用了Queue的Run。
? ? ? ? 第2行創建了一個具有2個消費者(goroutine)的Queue。第10行預先給這個Queue加入5個需要訪問的URL。
// AddURL adds a new URL to the queue
func (q *Queue) AddURL(URL string) error {u, err := url.Parse(URL)if err != nil {return err}r := &colly.Request{URL: u,Method: "GET",}d, err := r.Marshal()if err != nil {return err}return q.storage.AddRequest(d)
}
? ? ? ? AddUrl的第11行將請求序列化,在第15行將該序列化數據保存到“倉庫”中。
? ? ? ? 在Run方法中,Colly將啟動2個goroutine。注意它是使用for循環組織的,這意味著如果for內無break,它會一直循環執行下去——不退出。
func (q *Queue) Run(c *colly.Collector) error {wg := &sync.WaitGroup{}for i := 0; i < q.Threads; i++ {wg.Add(1)go func(c *colly.Collector, wg *sync.WaitGroup) {defer wg.Done()for {
? ? ? ? 如果隊列中沒有需要處理的request,則會嘗試退出
if q.IsEmpty() {if q.activeThreadCount == 0 {break}ch := make(chan bool)q.lock.Lock()q.threadChans = append(q.threadChans, ch)q.lock.Unlock()action := <-chif action == stop && q.IsEmpty() {break}}
? ? ? ??activeThreadCount表示當前運行中的消費者goroutine數量。如果已經沒有消費者了,則直接跳出for循環,整個goroutine結束。
? ? ? ? 如果還有消費者,則創建一個channel,并將其加入到q.threadChans的channel切片中。然后在第9行等待該channel被寫入值。如果寫入的是true并且此時沒有需要處理的request,則退出goroutine。可以看到這段邏輯檢測了兩次是否有request,這個我們之后再討論。
? ? ? ? 如果還有request要處理,則遞增消費者數量(在finish中會遞減以抵消)。然后從“倉庫”中取出一個任務,在通過Request的Do方法發起請求,最后調用finish方法善后。
q.lock.Lock()atomic.AddInt32(&q.activeThreadCount, 1)q.lock.Unlock()rb, err := q.storage.GetRequest()if err != nil || rb == nil {q.finish()continue}r, err := c.UnmarshalRequest(rb)if err != nil || r == nil {q.finish()continue}r.Do()q.finish()}}(c, wg)}wg.Wait()return nil
}
? ? ? ??finish方法干了三件事:
- 遞減消費者數量,以抵消Run方法中的遞增。
- 將Queue的各個等待中的,其他goroutine創建的channel傳入true值,即告知他們可以退出了。
- 給Queue創建一個空的channel切片
func (q *Queue) finish() {q.lock.Lock()q.activeThreadCount--for _, c := range q.threadChans {c <- stop}q.threadChans = make([]chan bool, 0, q.Threads)q.lock.Unlock()
}
? ? ? ? 我們再看下怎么在請求的過程中給Queue增加任務
// AddRequest adds a new Request to the queue
func (q *Queue) AddRequest(r *colly.Request) error {d, err := r.Marshal()if err != nil {return err}if err := q.storage.AddRequest(d); err != nil {return err}q.lock.Lock()for _, c := range q.threadChans {c <- !stop}q.threadChans = make([]chan bool, 0, q.Threads)q.lock.Unlock()return nil
}
? ? ? ? 第3~9行,會將請求序列化后保存到“倉庫”中。
? ? ? ? 第10~15行,會將其他goroutine創建的channel傳入false,告知它們不要退出。然后再創建一個空的channel切片。
? ? ? ? finish和AddRequest都使用鎖鎖住了所有的邏輯,而且它們都會把其他goroutine創建的channel傳入值,然后將Queue的channel切片清空。這樣就保證這些channel只可能收到一種狀態。由于它自己創建的channel是在finish調用完之后才有機會創建出來,所以不會造成死鎖。
? ? ? ? 再回來看goroutine退出的邏輯
if q.IsEmpty() {if q.activeThreadCount == 0 {break}ch := make(chan bool)q.lock.Lock()q.threadChans = append(q.threadChans, ch)q.lock.Unlock()action := <-chif action == stop && q.IsEmpty() {break}}
? ? ? ? 如果finish方法中遞減的activeThreadCount為0,這說明這是最后一個goroutine了,而且當前也沒request,所以退出。當然此時存在一種可能:在1行執行結束后,其他非消費者goroutine調用AddRequest新增了若干request。而執行第2行時,goroutine將退出,從而導致存在request沒有處理的可能。
? ? ? ? 如果還存在其他goroutine,則本goroutine將在第5行創建一個channel,并將這個channel加入到Queue的channel切片中。供其他goroutine調用finish往channel中傳入true,或者AddRequest傳入false,調控是否需要退出本過程。在第9行等待channel傳出數據前,可能存在如下幾種情況:
- 執行了finish
- 執行了AddRequest
- 執行了finish后執行了AddRequest
- 執行了AddRequest后執行了finish
? ? ? ? 如果是第1和4種,action將是false。第2和3種,action是true。但是這個情況下不能單純的通過action決定是否退出。因為第9和10行執行需要時間,這段時間其他goroutine可能還會執行AddRequest新增任務,或者GetRequest刪除任務。所以還要在第10行檢測下IsEmpty。
? ? ? ? 這段是我閱讀Colly中思考的最多的代碼,因為有goroutine和channel,導致整個邏輯比較復雜。也感慨下,雖然goroutine很方便,但是真的能把它寫對也是不容易的。
分布式
? ? ? ? 在Queue例子中,我們看到“倉庫”這個概念。回顧下Queue的例子,“倉庫”是InMemoryQueueStorage。顧名思義,它是一個內存型的倉庫,所以不存在分布式基礎。
// create a request queue with 2 consumer threadsq, _ := queue.New(2, // Number of consumer threads&queue.InMemoryQueueStorage{MaxSize: 10000}, // Use default queue storage)
? ? ? ? 一個分布式的例子是Redis backend,截取一段
// create the redis storagestorage := &redisstorage.Storage{Address: "127.0.0.1:6379",Password: "",DB: 0,Prefix: "httpbin_test",}// add storage to the collectorerr := c.SetStorage(storage)if err != nil {panic(err)}// delete previous data from storageif err := storage.Clear(); err != nil {log.Fatal(err)}// close redis clientdefer storage.Client.Close()// create a new request queue with redis storage backendq, _ := queue.New(2, storage)
? ? ? ? 這兒創建了一個redis型的倉庫。不僅Collector的Storage是它,Queue的Storage也是它。這樣一個集群上的服務都往這個倉庫里存入和取出數據,從而實現分布式架構。
? ? ? ??redisstorage庫引自github.com/gocolly/redisstorage。我們查看其源碼,其實現了Collector的storage需要的接口
type Storage interface {// Init initializes the storageInit() error// Visited receives and stores a request ID that is visited by the CollectorVisited(requestID uint64) error// IsVisited returns true if the request was visited before IsVisited// is calledIsVisited(requestID uint64) (bool, error)// Cookies retrieves stored cookies for a given hostCookies(u *url.URL) string// SetCookies stores cookies for a given hostSetCookies(u *url.URL, cookies string)
}
? ? ? ? 以及Queue的storage需要的
// Storage is the interface of the queue's storage backend
type Storage interface {// Init initializes the storageInit() error// AddRequest adds a serialized request to the queueAddRequest([]byte) error// GetRequest pops the next request from the queue// or returns error if the queue is emptyGetRequest() ([]byte, error)// QueueSize returns with the size of the queueQueueSize() (int, error)
}
?
總結
以上是生活随笔為你收集整理的Colly源码解析——结合例子分析底层实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Colly源码解析——框架
- 下一篇: Gin源码解析和例子——路由