多线程与多进程爬虫
多線程與多進(jìn)程爬蟲
- threading
- Thread類與線程函數(shù)
- Thread 類與線程對(duì)象
- 從Tread 類繼承
- 線程鎖
- 信號(hào)量
- 信號(hào)量與鎖結(jié)合
- 生產(chǎn)者--消費(fèi)者問題與queue模塊
- 爬取豆瓣電影詳情
- 網(wǎng)頁(yè)分析
threading
Thread類與線程函數(shù)
如果使用Thread類處理線程就方便得多了,可以直接使用Thread對(duì)象的join方法等待線程函數(shù)執(zhí)行完畢再往下執(zhí)行,也就是說,在主線程(main函數(shù))中調(diào)用Thread對(duì)象的join方法,并且Thread對(duì)象的線程函數(shù)沒有執(zhí)行完畢,主線程會(huì)處于阻塞狀態(tài)。
使用Thread類也很簡(jiǎn)單,首先需要?jiǎng)?chuàng)建Thread類的實(shí)例,通過Thread類構(gòu)造方法的target關(guān)鍵字參數(shù)執(zhí)行線程函數(shù),通過args關(guān)鍵字參數(shù)指定傳給線程函數(shù)的參數(shù)。然后調(diào)用Thread對(duì)象的start方法啟動(dòng)線程。
樣例:
import threading from time import sleep, ctime # 線程函數(shù),index表示整數(shù)類型的索引,sec表示休眠時(shí)間,單位:秒 def fun(index, sec):print('開始執(zhí)行', index, ' 時(shí)間:', ctime())# 休眠sec秒sleep(sec)print('結(jié)束執(zhí)行', index, '時(shí)間:', ctime()) def main():# 創(chuàng)建第1個(gè)Thread對(duì)象,通過target關(guān)鍵字參數(shù)指定線程函數(shù)fun,傳入索引10和休眠時(shí)間(4秒)thread1 = threading.Thread(target=fun,args=(10, 4))# 啟動(dòng)第1個(gè)線程thread1.start()# 創(chuàng)建第2個(gè)Thread對(duì)象,通過target關(guān)鍵字參數(shù)指定線程函數(shù)fun,傳入索引20和休眠時(shí)間(2秒)thread2 = threading.Thread(target=fun,args=(20, 2))# 啟動(dòng)第2個(gè)線程thread2.start()# 等待第1個(gè)線程函數(shù)執(zhí)行完畢thread1.join()# 等待第2個(gè)線程函數(shù)執(zhí)行完畢thread2.join()if __name__ == '__main__':main()Thread 類與線程對(duì)象
Thread類構(gòu)造方法的target關(guān)鍵字參數(shù)不僅可以是一個(gè)函數(shù),還可以是一個(gè)對(duì)象,可以稱這個(gè)對(duì)象為線程對(duì)象。其實(shí)線程調(diào)用的仍然是函數(shù),只是這個(gè)函數(shù)用對(duì)象進(jìn)行了封裝。這么做的好處是可以將與線程函數(shù)相關(guān)的代碼都放在對(duì)象對(duì)應(yīng)的類中,這樣更能體現(xiàn)面向?qū)ο蟮姆庋b性。
線程對(duì)象對(duì)應(yīng)的類需要有一個(gè)可以傳入線程函數(shù)和參數(shù)的構(gòu)造方法,而且在類中還必須有一個(gè)名為“_ call _” 的方法。當(dāng)線程啟動(dòng)時(shí),會(huì)自動(dòng)調(diào)用線程對(duì)象的“_ call _”方法,然后在該方法中調(diào)用線程函數(shù)。
代碼:
import threading from time import sleep, ctime # 線程對(duì)象對(duì)應(yīng)的類 class MyThread(object):# func表示線程函數(shù),args表示線程函數(shù)的參數(shù)def __init__(self, func, args):# 將線程函數(shù)與線程函數(shù)的參數(shù)賦給當(dāng)前類的成員變量self.func = funcself.args = args# 線程啟動(dòng)時(shí)會(huì)調(diào)用該方法def __call__(self):# 調(diào)用線程函數(shù),并將元組類型的參數(shù)值分解為單個(gè)的參數(shù)值傳入線程函數(shù)self.func(*self.args) # 線程函數(shù) def fun(index, sec):print('開始執(zhí)行', index, ' 時(shí)間:', ctime())# 延遲sec秒sleep(sec)print('結(jié)束執(zhí)行', index, '時(shí)間:', ctime()) def main():print('執(zhí)行開始時(shí)間:', ctime())# 創(chuàng)建第1個(gè)線程,通過target關(guān)鍵字參數(shù)指定了線程對(duì)象(MyThread),延遲4秒thread1 = threading.Thread(target = MyThread(fun,(10, 4)))# 啟動(dòng)第1個(gè)線程thread1.start()# 創(chuàng)建第2個(gè)線程,通過target關(guān)鍵字參數(shù)指定了線程對(duì)象(MyThread),延遲2秒thread2 = threading.Thread(target = MyThread(fun,(20, 2)))# 啟動(dòng)第2個(gè)線程thread2.start()# 創(chuàng)建第3個(gè)線程,通過target關(guān)鍵字參數(shù)指定了線程對(duì)象(MyThread),延遲1秒thread3 = threading.Thread(target = MyThread(fun,(30, 1)))# 啟動(dòng)第3個(gè)線程thread3.start()# 等待第1個(gè)線程函數(shù)執(zhí)行完畢thread1.join()# 等待第2個(gè)線程函數(shù)執(zhí)行完畢thread2.join()# 等待第3個(gè)線程函數(shù)執(zhí)行完畢thread3.join()print('所有的線程函數(shù)已經(jīng)執(zhí)行完畢:', ctime()) if __name__ == '__main__':main()從Tread 類繼承
為了更好地對(duì)與線程有關(guān)的代碼進(jìn)行封裝,可以從Thread類派生一個(gè)子類。然后將與線程有關(guān)的代碼都放到這個(gè)類中。Thread類的子類的使用方法與Thread相同。從Thread類繼承最簡(jiǎn)單的方式是在子類的構(gòu)造方法中通過
super( )函數(shù)調(diào)用父類的構(gòu)造方法,并傳入相應(yīng)的參數(shù)值。
示例:
import threading from time import sleep, ctime# 從Thread類派生的子類 class MyThread(threading.Thread):# 重寫父類的構(gòu)造方法,其中func是線程函數(shù),args是傳入線程函數(shù)的參數(shù),name是線程名def __init__(self, func, args, name=''):# 調(diào)用父類的構(gòu)造方法,并傳入相應(yīng)的參數(shù)值super().__init__(target=func, name=name,args=args)# 重寫父類的run方法def run(self):self._target(*self._args)# 線程函數(shù) def fun(index, sec):print('開始執(zhí)行', index, '時(shí)間:', ctime())# 休眠sec秒sleep(sec)print('執(zhí)行完畢', index, '時(shí)間:', ctime())def main():print('開始:', ctime())# 創(chuàng)建第1個(gè)線程,并指定線程名為“線程1”thread1 = MyThread(fun, (10, 4), '線程1')# 創(chuàng)建第2個(gè)線程,并指定線程名為“線程2”thread2 = MyThread(fun, (20, 2), '線程2')# 開啟第1個(gè)線程thread1.start()# 開啟第2個(gè)線程thread2.start()# 輸出第1個(gè)線程的名字print(thread1.name)# 輸出第2個(gè)線程的名字print(thread2.name)# 等待第1個(gè)線程結(jié)束thread1.join()# 等待第2個(gè)線程結(jié)束thread2.join()print('結(jié)束:', ctime())if __name__ == '__main__':main()線程鎖
線程鎖的目的是將一段代碼鎖住,一旦獲得了鎖權(quán)限,除非釋放線程鎖,否則其他任何代碼都無法再次獲得鎖權(quán)限。為了使用線程鎖,首先需要?jiǎng)?chuàng)建Lock類的實(shí)例,然后通過Lock對(duì)象的acquire方法獲取鎖權(quán)限,當(dāng)需要完成原子操作的代碼段執(zhí)行完后,再使用Lock對(duì)象的release方法釋放鎖,這樣其代碼就可以再次獲得這個(gè)鎖權(quán)限了。
要注意的是,鎖對(duì)象要放到線程函數(shù)的外面作為一個(gè)全局變量,這樣所有的線程函數(shù)實(shí)例都可以共享這個(gè)變量,如果將鎖對(duì)象放到線程函數(shù)內(nèi)部,那么這個(gè)鎖對(duì)象就變成局部變量了,多個(gè)線程函數(shù)實(shí)例使用的是不同的鎖對(duì)象,所以仍然不能有效保護(hù)原子操作的代碼。
示例:
from atexit import register import random from threading import Thread,Lock,currentThread from time import sleep,ctime#創(chuàng)建線程鎖對(duì)象 lock = Lock()def fun():#獲取線程鎖權(quán)限lock.acquire()#for循環(huán)已經(jīng)變成了原子操作for i in range(5):print('Thread Name','=',currentThread().name,'i','=',i)# 休眠一段時(shí)間1~4sleep(random.randint(1,5))#釋放線程鎖lock.release()def main():for i in range(3):Thread(target=fun).start()#當(dāng)線程結(jié)束時(shí)調(diào)用這個(gè)函數(shù) @register #路由def exit():print('線程執(zhí)行完畢:',ctime())if __name__ == "__main__":main()信號(hào)量
信號(hào)量是最古老的同步原語(yǔ)之一,它是一個(gè)計(jì)數(shù)器,用于記錄資源消耗情況。當(dāng)資源消耗時(shí)遞減,當(dāng)資源釋放時(shí)遞增。可以認(rèn)為信號(hào)量代表資源是否可用。消耗資源使計(jì)數(shù)器遞減的操作習(xí)慣上稱為P,當(dāng)一個(gè)線程對(duì)一個(gè)資源完成操作時(shí),該資源需要返回資源池,這個(gè)操作一般稱為V。
Python語(yǔ)言統(tǒng)一了所有的命名,使用與線程鎖同樣的方法名消耗和釋放資源。acquire方法用于消耗資源,調(diào)用該方法計(jì)數(shù)器會(huì)減1,release方法用于釋放資源,調(diào)用該方法計(jì)數(shù)器會(huì)加 1。
使用信號(hào)量首先要?jiǎng)?chuàng)建Bounded Semaphore類的實(shí)例,并且通過該類的構(gòu)造方法傳入計(jì)數(shù)器的最大值,然后就可以使用BoundedSemphor對(duì)象的acquire方法和release方法獲取資源(計(jì)數(shù)器減1)和釋放資源(計(jì)數(shù)器加1)了。
示例:
from threading import BoundedSemaphore MAX = 3 # 創(chuàng)建信號(hào)量對(duì)象,并設(shè)置了計(jì)數(shù)器的最大值(也是資源的最大值),計(jì)數(shù)器不能超過這個(gè)值 semaphore = BoundedSemaphore(MAX) # 輸出當(dāng)前計(jì)數(shù)器的值,輸出結(jié)果:3 print(semaphore._value) # 獲取資源,計(jì)數(shù)器減1 semaphore.acquire() # 輸出結(jié)果:2 print(semaphore._value) # 獲取資源,計(jì)數(shù)器減1 semaphore.acquire() # 輸出結(jié)果:1 print(semaphore._value) # 獲取資源,計(jì)數(shù)器減1 semaphore.acquire() # 輸出結(jié)果:0 print(semaphore._value) # 當(dāng)計(jì)數(shù)器為0時(shí),不能再獲取資源,所以acquire方法會(huì)返回False # 輸出結(jié)果:False print(semaphore.acquire(False)) # 輸出結(jié)果:0 print(semaphore._value) # 釋放資源,計(jì)數(shù)器加1 semaphore.release() # 輸出結(jié)果:1 print(semaphore._value) # 釋放資源,計(jì)數(shù)器加1 semaphore.release() # 輸出結(jié)果:2 print(semaphore._value) # 釋放資源,計(jì)數(shù)器加1 semaphore.release() # 輸出結(jié)果:3 print(semaphore._value) # 拋出異常,當(dāng)計(jì)數(shù)器達(dá)到最大值時(shí),不能再次釋放資源,否則會(huì)拋出異常 semaphore.release()要注意的是信號(hào)量對(duì)象的acquire方法與release方法。當(dāng)資源枯竭(計(jì)數(shù)器為0)時(shí)調(diào)用acquinte方法會(huì)有兩種結(jié)果。
第1種是acquire方法的參數(shù)值為True或不指定參數(shù)時(shí), acquire方法會(huì)處于阻塞狀態(tài),直到使用release釋放資源后,acquire方法才會(huì)往下執(zhí)行。
第2種acquire方法的參數(shù)值為False,當(dāng)計(jì)數(shù)器為0時(shí)調(diào)用acquire方法并不會(huì)阻塞,而是直接返回False,表示未獲得資源,如果成功獲得資源,會(huì)返回True。
release方法在釋放資源時(shí),如果計(jì)數(shù)器已經(jīng)達(dá)到了最大值(本例是3),會(huì)直接拋出異常,表示已經(jīng)沒有資源釋放了。
信號(hào)量與鎖結(jié)合
示例:
from atexit import register from random import randrange from threading import BoundedSemaphore, Lock, Thread from time import sleep, ctime # 創(chuàng)建線程鎖 lock = Lock() # 定義糖果機(jī)的槽數(shù),也是信號(hào)量計(jì)數(shù)器的最大值 MAX = 5 # 創(chuàng)建信號(hào)量對(duì)象,并指定計(jì)數(shù)器的最大值 candytray = BoundedSemaphore(MAX) # 給糖果機(jī)的槽補(bǔ)充新的糖果(每次只補(bǔ)充一個(gè)槽) def refill():# 獲取線程鎖,將補(bǔ)充糖果的操作變成原子操作lock.acquire()print('重新添加糖果...', end=' ')try:# 為糖果機(jī)的槽補(bǔ)充糖果(計(jì)數(shù)器加1)candytray.release()except ValueError:print('糖果機(jī)都滿了,無法添加')else:print('成功添加糖果')# 釋放線程鎖lock.release() # 顧客購(gòu)買糖果 def buy():# 獲取線程鎖,將購(gòu)買糖果的操作變成原子操作lock.acquire()print('購(gòu)買糖果...', end=' ')# 顧客購(gòu)買糖果(計(jì)數(shù)器減1),如果購(gòu)買失敗(5個(gè)槽都沒有糖果了),返回Falseif candytray.acquire(False):print('成功購(gòu)買糖果')else:print('糖果機(jī)為空,無法購(gòu)買糖果')# 釋放線程鎖lock.release() # 產(chǎn)生多個(gè)補(bǔ)充糖果的動(dòng)作 def producer(loops):for i in range(loops):refill()sleep(randrange(3)) # 產(chǎn)生多個(gè)購(gòu)買糖果的動(dòng)作 def consumer(loops):for i in range(loops):buy()sleep(randrange(3))def main():print('開始:', ctime())# 參數(shù)一個(gè)2到5的隨機(jī)數(shù)nloops = randrange(2, 6)print('糖果機(jī)共有%d個(gè)槽!' % MAX)# 開始一個(gè)線程,用于執(zhí)行consumer函數(shù)Thread(target=consumer, args=(randrange(nloops, nloops+MAX+2),)).start()# 開始一個(gè)線程,用于執(zhí)行producer函數(shù)Thread(target=producer, args=(nloops,)).start()@register def exit():print('程序執(zhí)行完畢:', ctime())if __name__ == '__main__':main()運(yùn)行結(jié)果:
生產(chǎn)者–消費(fèi)者問題與queue模塊
本節(jié)使用線程鎖以及隊(duì)列來模擬一個(gè)典型的案例:生產(chǎn)者一消費(fèi)者模型。在這個(gè)場(chǎng)景下,商品或服務(wù)的生產(chǎn)者生產(chǎn)商品、然后將其放到類似隊(duì)列的數(shù)據(jù)結(jié)構(gòu)中,生產(chǎn)商品的時(shí)間是不確定的.同樣消費(fèi)者消費(fèi)生產(chǎn)者生產(chǎn)的商品的時(shí)間也是不確定的。
這里使用queue模塊來提供線程間通信的機(jī)制,也就是說,生產(chǎn)者和消費(fèi)者共享一個(gè)隊(duì)列。生產(chǎn)者生產(chǎn)商品后,會(huì)將商品添加到隊(duì)列中。消費(fèi)者消費(fèi)商品,會(huì)從隊(duì)列中取一個(gè)商品。由于向隊(duì)列中添加商品和從隊(duì)列中獲取商品都不是原子操作,所以需要使用線程鎖將這兩個(gè)操作鎖住。
代碼:
```python from random import randrange from time import sleep,time,ctime from threading import Lock,Thread from queue import Queue# 創(chuàng)建線程鎖對(duì)象 lock = Lock()# 從Therad 派生的子類 class MyTherad(Thread):def __init__(self,func,args):super().__init__(target= func , args= args)# 向隊(duì)列添加商品 def writeQ(queue):# 獲取線程鎖lock.acquire()print('生產(chǎn)了一個(gè)對(duì)象,并將其添加到隊(duì)列中', end=' ')# 向隊(duì)列中添加商品queue.put('商品')print("隊(duì)列尺寸", queue.qsize())# 釋放線程鎖lock.release()# 從隊(duì)列中獲取商品 def readQ(queue):# 獲取線程鎖lock.acquire()# 從隊(duì)列中獲取商品val = queue.get(1)print('消費(fèi)了一個(gè)對(duì)象,隊(duì)列尺寸:', queue.qsize())# 釋放線程鎖lock.release()#生產(chǎn)若干個(gè)生產(chǎn)者者 def writer(queue,loops):for i in range(loops):writeQ(queue)sleep(randrange(1,4))# 生產(chǎn)若干個(gè)消費(fèi)者def reader(queue,loops):for i in range(loops):readQ(queue)sleep(randrange(2,4))funcs =[writer,reader] nfuncs = range(len(funcs))def main():nloops = randrange(2,6)q = Queue(32)threads = []#創(chuàng)建2個(gè)線程運(yùn)行writer 函數(shù)與reder函數(shù)for i in nfuncs:t = MyTherad(funcs[i],(q,nloops))threads.append(t)# 開始線程for i in nfuncs:threads[i].start()#等待兩個(gè)線程結(jié)束for i in nfuncs:threads[i].join()print('所以工作已經(jīng)完成')if __name__ =='__main__':main() 效果: # 多進(jìn)程 盡管多線程可以實(shí)現(xiàn)并發(fā)執(zhí)行,不過多個(gè)線程之間是共享當(dāng)前進(jìn)程的內(nèi)存的,也就是說,線程可以申請(qǐng)到的資源有限。要想進(jìn)一步發(fā)揮并發(fā)的作用,可以考慮使用多進(jìn)程。如果建立的進(jìn)程比較多,可以使用`multiprocessing模塊的進(jìn)程池(Pool類)`,通過Pool類構(gòu)造方法的processes函數(shù),可以指定創(chuàng)建的進(jìn)程數(shù)。Pool類有一個(gè)map方法,用于將回調(diào)函數(shù)與要給回調(diào) 函數(shù)傳遞的數(shù)據(jù)管理起來,代碼如下:```python pool = Pool(processes=4) pool.map(callback_fun,values)上面的代碼利用Pool對(duì)象創(chuàng)建了4個(gè)進(jìn)程,并通過map方法指定了進(jìn)程回調(diào)函數(shù),當(dāng)進(jìn)程執(zhí)行時(shí),就會(huì)調(diào)用這個(gè)函數(shù),values是一個(gè)可迭代對(duì)象,每次進(jìn)程運(yùn)行時(shí),就會(huì)從values中取一個(gè)值傳遞給callback _ fun,也就是說,callback fun函數(shù)至少要有一個(gè)參數(shù)接收values中的值。
示例:
from multiprocessing import Pool import time# 進(jìn)程回調(diào)函數(shù) def get_value(value):i = 0while i <3:#休眠一秒time.sleep(1)print(value,i)i+=1if __name__ =='__main__':#產(chǎn)生5個(gè)值,供多線程獲取values =['value{}'.format(str(i)) for i in range(0,5)]# 創(chuàng)建4個(gè)進(jìn)程pool = Pool(processes=4)#將進(jìn)程回調(diào)函數(shù)與values關(guān)聯(lián)pool.map(get_value,values)爬取豆瓣電影詳情
網(wǎng)頁(yè)分析
因?yàn)殡娪胺诸惿系臄?shù)據(jù)是異步的所以我們,在XHR中找到真實(shí)的網(wǎng)址
https://movie.douban.com/j/chart/top_list?type=11&interval_id=100%3A90&action=&start=20&limit=20發(fā)現(xiàn)每一個(gè)分類中的網(wǎng)址只有兩個(gè)地方是不一樣的
- type=11
- start=20
而 type = 11 ,這個(gè)11是和分類這個(gè)連接中的type是一樣的,
start=20 是什么意思呢,通過分析,這個(gè)是每一次會(huì)獲取20個(gè)電影信息,就是說每一次下滑,會(huì)一次性返回20個(gè);
每一個(gè)對(duì)應(yīng)一個(gè)電影的數(shù)據(jù),是json格式。需要轉(zhuǎn)換。
代碼:
import json, threading import re, requests from lxml import etree from queue import Queueclass DouBan(threading.Thread):#重寫父類的構(gòu)造函數(shù)def __init__(self, q=None):super().__init__()self.base_url = 'https://movie.douban.com/chart'self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36','Referer': 'https://movie.douban.com/explore'}self.q = qself.ajax_url = 'https://movie.douban.com/j/chart/top_list?type={}&interval_id=100%3A90&action=&start={}&limit=20'# 獲取網(wǎng)頁(yè)的源碼def get_content(self, url, headers):response = requests.get(url, headers=headers)return response.text# 獲取電影指定信息def get_movie_info(self, text):# 將json格式轉(zhuǎn)換為Python的字典text = json.loads(text)item = {}for data in text:score = data['score']image = data['cover_url']title = data['title']actors = data['actors']detail_url = data['url']vote_count = data['vote_count']types = data['types']item['評(píng)分'] = scoreitem['圖片'] = imageitem['電影名'] = titleitem['演員'] = actorsitem['詳情頁(yè)鏈接'] = detail_urlitem['評(píng)價(jià)數(shù)'] = vote_countitem['電影類別'] = typesprint(item)# 獲取電影api數(shù)據(jù)的def get_movie(self):headers = {'X-Requested-With': 'XMLHttpRequest','User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36',}# 獲取api數(shù)據(jù),并判斷分頁(yè)while True:if self.q.empty():breakn = 0while True:# 拼接成一個(gè)完整的網(wǎng)址text = self.get_content(self.ajax_url.format(self.q.get(), n), headers=headers)if text == '[]':breakself.get_movie_info(text)n += 20# 獲取所有類型的type——iddef get_types(self):html_str = self.get_content(self.base_url, headers=self.headers) # 分類頁(yè)首頁(yè)html = etree.HTML(html_str)types = html.xpath('//div[@class="types"]/span/a/@href') # 獲得每個(gè)分類的連接,但是切割type# print(types)type_list = []for i in types:p = re.compile('type=(.*?)&interval_id=') # 篩選id,拼接到api接口的路由type = p.search(i).group(1)type_list.append(type)return type_listdef run(self):self.get_movie()if __name__ == '__main__':# 創(chuàng)建消息隊(duì)列q = Queue()# 將任務(wù)隊(duì)列初始化,將我們的type放到消息隊(duì)列中t = DouBan()types = t.get_types()for tp in types:q.put(tp[0])# 創(chuàng)建一個(gè)列表,列表的數(shù)量就是開啟線程的樹木crawl_list = [1, 2, 3, 4]for crawl in crawl_list:# 實(shí)例化對(duì)象movie = DouBan(q=q)movie.start()解釋:
效果:
總結(jié)
- 上一篇: 抓取异步数据(AJAX)笔记
- 下一篇: 邻接表建立图(c语言)