生活随笔
收集整理的這篇文章主要介紹了
爬虫实现并发爬取
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
單線程爬蟲,多線程爬蟲,多協程爬蟲
通過多線程或多進程提高爬蟲效率,比較各自的優劣情況,根據不同的業務條件選擇不同的方式
爬取的網址 https://wz.sun0769.com/political/index/politicsNewest?id=1&page=1
線程
線程(英語:thread)是操作系統能夠進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運作單位。
單線程實現
from lxml
import etree
import requests
import json
import time
header
= {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36'}local_file
= open('duanzi.json','a',encoding
='utf-8')def parse_html(html
):text
= etree
.HTML
(html
)node_list
= text
.xpath
('/html/body/div[2]/div[3]/ul[2]/li')for node
in node_list
:try:id = node
.xpath
('./span[1]/text()')[0]state
= node
.xpath
('./span[2]/text()')[0].strip
()items
= {'id':id,'state':state
}local_file
.write
(json
.dumps
(items
)+'\n')except:passurl
= 'https://wz.sun0769.com/political/index/politicsNewest?id=1&page=2'def main():for i
in range(1,20):url
= f'https://wz.sun0769.com/political/index/politicsNewest?id=1&page={i}'html
= requests
.get
(url
=url
,headers
=header
).textparse_html
(html
)if __name__
== '__main__':t1
= time
.time
()main
()print(time
.time
()-t1
)
多線程實現的流程
使用一個pageQueue隊列保存要訪問的網頁同時啟動多個采集線程,每個線程都要從網頁頁碼隊列pageQueue中取出要訪問的頁碼,構建網址,訪問網址并爬取數據,操作完一個網頁后再從網頁隊列中選取下一個頁碼,依次進行,直到所有的頁碼都已訪問完畢,所有采集線程保存在threadCrawls中使用一個dataCode來保存所有的網頁代碼,每個線程獲取到的數據都應該放入隊列中同時啟動多個解析線程,每個線程都從網頁源代碼dataQueue中取出一個網頁源代碼,并進行解析獲得想要的數據,解析完成以后再選取下一個進行同樣的操作,直至所有的解析完成。將所有的解析線程存儲在列表threadParses中將解析的json數據存儲在本地文件中
import json
import threading
from queue
import Queue
from lxml
import etree
import time
import random
import requestscrawl
= False class ThreadCrawl(threading
.Thread
): def __init__(self
, threadName
, pageQueue
, dataQueue
):threading
.Thread
.__init__
(self
)self
.threadName
= threadNameself
.pageQueue
= pageQueueself
.dataQueue
= dataQueueself
.headers
= {'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36'}def run(self
):print("啟動" + self
.threadName
)while not crawl
:try:page
= self
.pageQueue
.get
(False)url
= f'https://wz.sun0769.com/political/index/politicsNewest?id=1&page={page}'time
.sleep
(random
.uniform
(1,3)) content
= requests
.get
(url
, headers
=self
.headers
).textself
.dataQueue
.put
(content
)except:passprint("結束" + self
.threadName
)PARSE_EXIT
= Falseclass ThreadParse(threading
.Thread
): def __init__(self
, threadName
, dataQueue
, localFile
, lock
):super(ThreadParse
, self
).__init__
()self
.threadName
= threadNameself
.dataQueue
= dataQueueself
.localFile
= localFile self
.lock
= lock
def run(self
):print("啟動" + self
.threadName
)while not PARSE_EXIT
:try:html
= self
.dataQueue
.get
(False)self
.parse
(html
)except:passprint("結束" + self
.threadName
)def parse(self
, html
):text
= etree
.HTML
(html
)node_list
= text
.xpath
('/html/body/div[2]/div[3]/ul[2]/li')for node
in node_list
:try:id = node
.xpath
('./span[1]/text()')[0]state
= node
.xpath
('./span[2]/text()')[0].strip
()items
= {'id': id,'state': state
}with self
.lock
: print(json
.dumps
(items
))self
.localFile
.write
(json
.dumps
(items
) + '\n')except:passdef main():pageQueue
= Queue
(20)for i
in range(1, 21):pageQueue
.put
(i
)dataQueue
= Queue
()localFile
= open('多線程.json', 'a')lock
= threading
.Lock
() crawlList
= ['采集線程1', '采集線程2', '采集線程3']threadCrawls
= []for thredName
in crawlList
:thread
= ThreadCrawl
(thredName
, pageQueue
, dataQueue
)thread
.start
() threadCrawls
.append
(thread
)parseList
= ['解析線程1', '解析線程2', '解析線程3']theradParses
= []for threadName
in parseList
:thread
= ThreadParse
(threadName
, dataQueue
, localFile
, lock
)thread
.start
()theradParses
.append
(thread
)while not pageQueue
.empty
():passglobal crawlcrawl
= Trueprint("pageQueue為空")for thread
in threadCrawls
:thread
.join
() while not dataQueue
.empty
():passprint('dataQueue為空')global PARSE_EXITPARSE_EXIT
= Truefor thread
in theradParses
:thread
.join
()with lock
:localFile
.close
()if __name__
== '__main__':t1
= time
.time
()main
()print(time
.time
()-t1
)
協程
- 協程是一種比線程更小的執行單元,又稱微線程(用戶態的線程)。在一個線程中可以有多個協程,但是一次只能只能執行一個協程,當所執行的協程遭遇阻塞時,就會切換到下一個任務繼續執行,從而提高CPU的利用率,適用于IO密集的場景,可以避免線程過多,減少線程切換之間浪費的時間
協程爬蟲的流程分析
由于協程的切換不像多線程那樣調度耗費資源,所以不用嚴格的限制協程的數量
將要爬取的網址存儲在一個列表中,由于針對每一個網址都需要創建一個協程,所以需要準備一個待爬取的網址列表為每一個網址創建一個協程并啟動該協程。協程會依次執行,爬取對應的網頁內容,如果一個協程在執行過程中出現網絡阻塞或者其他異常情況,則會立馬切換到下一個協程,由于協程的切換不用切換線程,消耗資源較小,所以不用嚴格限制協程的大小(分情況對待),每個協程協程負責爬取網頁,并且將網頁中的目標數據解析出來將爬取到的目標數據存儲到一個列表中遍歷數據列表,將數據存儲到本地文件中
gevent
gevent是一個基于協程的Python網絡庫,是一個第三方庫
安裝
pip install genvent
協程實現流程
定義一個負責負責爬蟲的類,所有爬蟲的工作完全交給該類負責使用一個隊列data_queue保存所有的數據創建多個協程任務,每一個協程都會使用頁碼構建完整的網址,訪問網址爬取和提取有用的數據,并將數據保存到數據隊列中將dataQueue隊列中的數據保存到本地文件中
import time
import gevent
from lxml
import etree
import requests
from queue
import Queue
class Spider(object):def __init__(self
):self
.headers
= {'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36'}self
.url
= 'https://wz.sun0769.com/political/index/politicsNewest?id=1&page='self
.dataQueue
= Queue
()self
.count
= 0def send_raquest(self
,url
):print("正在爬取"+url
)html
= requests
.get
(url
,self
.headers
).texttime
.sleep
(1)self
.parse_page
(html
)def parse_page(self
,html
):text
= etree
.HTML
(html
)node_list
= text
.xpath
('/html/body/div[2]/div[3]/ul[2]/li')for node
in node_list
:try:id = node
.xpath
('./span[1]/text()')[0]state
= node
.xpath
('./span[2]/text()')[0].strip
()items
= {'id': id,'state': state
}self
.count
+=1self
.dataQueue
.put
(items
)except:passdef start_work(self
):arr
= []for page
in range(1,20):url
= self
.url
+str(page
)job
= gevent
.spawn
(self
.send_raquest
,url
)arr
.append
(job
)gevent
.joinall
(arr
)local_file
= open("協程.json",'wb+')while not self
.dataQueue
.empty
():content
= self
.dataQueue
.get
()result
= str(content
).encode
('utf-8')local_file
.write
(result
+b"\n")local_file
.close
()print(self
.count
)if __name__
== '__main__':t1
= time
.time
()spider
= Spider
()spider
.start_work
()print(time
.time
()-t1
)
總結
以上是生活随笔為你收集整理的爬虫实现并发爬取的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。