Python基础入门教程:使用 Python 3 协程快速获得一个代理池
Python基礎入門教程:使用 Python 3 協程快速獲得一個代理池
前言
在執行 IO 密集型任務的時候,程序會因為等待 IO 而阻塞。比如我們使用 requests 庫來進行網絡爬蟲請求的話,如果網站響應速度過慢,程序會一直等待網站響應,最終導致其爬取效率十分低下。本文以爬取 IP 代理池為例,演示 Python 中如何利用異步協程來加速網絡爬蟲。
注:本文示例代碼,需要 Python 3.7 及以上版本。
協程
協程(Coroutine),又稱微線程,纖程,協程是一種用戶態的輕量級線程。
協程擁有自己的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存,在調度回來的時候,恢復先前保存的寄存器上下文和棧。因此協程能保留上一次調用時的狀態,即所有局部狀態的一個特定組合。
協程本質上是個單進程,協程相對于多進程來說,無需進程間上下文切換的開銷,無需原子操作鎖定及同步的開銷,編程模型也非常簡單。
我們可以使用協程來實現異步操作,比如在網絡爬蟲場景下,在發出一個請求之后,需要等待一定的時間才能得到響應。其實在這個等待過程中,程序可以干許多其他的事情,等到響應返回之后再切換回來繼續處理,這樣可以充分利用 CPU 和其他資源,這就是異步協程的優勢。
Python 中的協程
從 Python 3.4 開始,Python 中加入了協程的概念,這個版本的協程是通過生成器對象來實現的,在 Python 3.5 中增加了 asyncio 庫和 async、await 關鍵字,使得協程的實現更加方便。
asyncio 庫
首先我們先來看一個不使用協程的程序,代碼如下:
import time ? ? def job(t):print('Start job {}'.format(t))time.sleep(t) # 等待 t 秒print('Job {0} takes {0}s'.format(t)) ? ? def main():[job(t) for t in range(1, 3)] ? ? start = time.time() main() print("total time: {}".format(time.time() - start))復制代碼運行結果:
Start job 1 Job 1 takes 1s Start job 2 Job 2 takes 2s total time: 3.001577138900757復制代碼從運行結果可以看出,我們的 job 是按順序執行的。必須執行完 job 1 才能開始執行 job 2, job 1 需要 1 秒的執行時間,job 2 需要 2 秒的執行時間,所以總時間是 3 秒多。
如果我們使用協程的方式,job 1 在等待 time.sleep(t) 執行結束的時候(可以看做是等待一個網頁的下載成功),是可以切換到 job 2 執行的。
我們再來看一下使用協程改造后的代碼:
import time import asyncio ? ? async def job(t): # 使用 async 關鍵字將一個函數定義為協程print('Start job {}'.format(t))await asyncio.sleep(t) # 等待 t 秒, 期間切換執行其他任務print('Job {0} takes {0}s'.format(t)) ? ? async def main(loop): # 使用 async 關鍵字將一個函數定義為協程tasks = [loop.create_task(job(t)) for t in range(1, 3)] # 創建任務, 不立即執行await asyncio.wait(tasks) # 執行并等待所有任務完成 ? ? start = time.time() loop = asyncio.get_event_loop() # 建立 loop loop.run_until_complete(main(loop)) # 執行 loop loop.close() # 關閉 loop print("total time: {}".format(time.time() - start))復制代碼運行結果:
Start job 1 Start job 2 Job 1 takes 1s Job 2 takes 2s total time: 2.0033459663391113復制代碼從運行結果可以看出,我們沒有等待 job 1 執行結束再開始執行 job 2,而是 job 1 觸發 await 的時候切換到了 job 2 。 這時 job 1 和 job 2 同時在執行 await asyncio.sleep(t),所以最終程序的執行時間取決于執行時間最長的那個 job,也就是 job 2 的執行時間:2 秒。
aiohttp 庫
在對 asyncio 庫做了簡單了解之后,我們來看一下如何通過協程來改造我們的爬蟲程序。
安裝 aiohttp 庫:
pip install aiohttp復制代碼我們先來看一下使用 reqeusts 庫實現一個網頁的爬取:
import time ? import requests ? def fetch(url):r = requests.get(url)return r.url ? ? def main():results = [fetch('http://www.baidu.com') for _ in range(2)]print(results) ? ? start = time.time() main() print("total time: {}".format(time.time() - start))復制代碼運行結果:
['http://www.baidu.com/', 'http://www.baidu.com/'] total time: 1.5445010662078857復制代碼使用 requests 庫,訪問兩次 www.baidu.com,共耗時 1.5 秒
我們用 aiohttp 庫來改造上面的代碼:
import time import asyncio ? import aiohttp ? ? async def fetch(session, url):response = await session.get(url) # await 等待網絡 IO 并切換協程return str(response.url) ? ? async def main(loop):async with aiohttp.ClientSession() as session:tasks = [loop.create_task(fetch(session, 'http://www.baidu.com'))for _ in range(2)]done, pending = await asyncio.wait(tasks) # 執行并等待所有任務完成results = [r.result() for r in done] # 獲取所有返回結果print(results) ? ? start = time.time() loop = asyncio.get_event_loop() # 建立 事件循環 loop.run_until_complete(main(loop)) # 在 事件循環 中執行協程 loop.close() # 關閉 事件循環 print("total time: {}".format(time.time() - start))復制代碼運行結果:
['http://www.baidu.com', 'http://www.baidu.com'] total time: 0.10848307609558105復制代碼使用 aiohttp 的代碼執行時間較使用 reqeusts 的代碼有大幅的提升。
上例中,我們使用官方推薦的方式創建 session,并通過 session 執行 get 操作。aiohttp 官方建議一個 application 中共享使用一個 session,不要為每個請求都創建 session。
使用 asyncio 和 aiohttp 快速獲得一個代理池
通過爬蟲解析免費的代理發布網站頁面,來生成代理池。
#!/usr/bin/env python # -*- coding:utf-8 -*- """ @author: qfedu.com """ import os import re import time import asyncio ? import aiohttp ? HEADERS = {'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.0.3 Safari/605.1.15' } ? OUTPUT_FILE = 'proxies.txt' # 代理池輸出文件 SITES = ['http://www.live-socks.net', 'http://www.proxyserverlist24.top'] # 代理發布網站 CHECK_URL = 'http://www.baidu.com' LOCAL_PROXY = None # 在本地發起請求時的代理 ? ? # http get 協程 async def fetch(session, url, proxy=None):proxy_headers = HEADERS if proxy else Nonetry:async with session.get(url, headers=HEADERS, proxy=proxy,proxy_headers=proxy_headers,timeout=aiohttp.ClientTimeout(total=5)) as response:if response.status == 200:return await response.text()else:return ''except:return '' ? ? # 從代理發布網站獲取代理發布頁面鏈接 async def get_page_links(loop, session):tasks = [loop.create_task(fetch(session, url, proxy=LOCAL_PROXY))for url in SITES] # 創建協程任務done, _ = await asyncio.wait(tasks) # 執行并等待所有任務完成htmls = [f.result() for f in done] # 獲取所有返回結果 ?# 解析出 html 頁面中的代理發布鏈接def parse(html):return re.findall(r'<h3[\s\S]*?<a.*?(http.*?\.html).*?</a>', html) ?results = map(parse, htmls) # 逐個解析 html 頁面 ?return [y for x in results for y in x] ? ? # 從代理發布頁面獲取代理 IP async def get_proxies(loop, session, page_links):tasks = [loop.create_task(fetch(session, url, proxy=LOCAL_PROXY))for url in page_links] # 創建協程任務done, _ = await asyncio.wait(tasks) # 執行并等待所有任務完成htmls = [f.result() for f in done] # 獲取所有返回結果 ?# 解析出 html 頁面中的代理 IPdef parse(html):return re.findall(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{1,5}', html) ?results = map(parse, htmls) # 逐個解析 html 頁面 ?return list(set([y for x in results for y in x])) ? ? # 驗證代理 IP async def check_proxy(session, proxy):html = await fetch(session, CHECK_URL, proxy=proxy) ?# 如果返回通過代理 IP 訪問的頁面,則說明代理 IP 有效return proxy if html else '' ? ? # 通過協程批量驗證代理 IP,每次同時發起 200 個驗證請求 async def check_proxies(loop, session, proxies):checked_proxies = []for i in range(0, len(proxies), 200):_proxies = [proxy.strip() if proxy.strip().startswith('http://')else 'http://' + proxy.strip() for proxy in proxies[i:i + 200]]tasks = [loop.create_task(check_proxy(session, proxy))for proxy in _proxies]done, _ = await asyncio.wait(tasks) # 執行并等待所有任務完成checked = [f.result() for f in done] # 獲取所有返回結果checked_proxies += [p for p in checked if p] # 獲取不為空的返回值,即驗證成功的代理 IP ?return checked_proxies ? ? # 將代理 IP 逐行保存到文件 def save_proxies(proxies):# 創建新文件,如果文件已存在,則清空文件內容with open(OUTPUT_FILE, 'w') as f:f.write('') ?# 通過追加寫模式,逐行寫入文件with open(OUTPUT_FILE, 'a') as f:for proxy in proxies:f.write(proxy + '\n') ? ? async def main(loop):async with aiohttp.ClientSession() as session:page_links = await get_page_links(loop, session) # 獲得代理發布頁面鏈接# 從代理發布頁面獲得代理 IPproxies = await get_proxies(loop, session, page_links)print('total proxy: {}'.format(len(proxies))) # 解析出的代理 IP 總量proxies = await check_proxies(loop, session, proxies) # 驗證代理 IP ?print('total checked proxy: {}'.format(len(proxies))) # 驗證后的代理 IP 總量save_proxies(proxies) # 保存代理 IP 到文件 ? ? start = time.time() loop = asyncio.get_event_loop() # 建立 事件循環 loop.run_until_complete(main(loop)) # 在 事件循環 中執行協程 loop.close() # 關閉 事件循環 total_time = time.time() - start print(f'total time: {total_time}')復制代碼運行結果:
total proxy: 15675 total checked proxy: 4503 total time: 487.2807550430298復制代碼更加高效的爬蟲
在爬蟲程序中,通常有網絡請求任務、頁面解析任務、數據清洗任務和數據入庫任務。
網絡請求任務、數據入庫任務屬于 IO 密集型任務,在 Python 中通常使用多線程模型來提高這類任務的性能,現在還可以通過 aiohttp,Motor(MongoDB 的異步 Python 驅動)等異步框架將性能進一步提升。
頁面解析任務、數據清洗任務這類 CPU 密集型的任務我們該如何來提高性能?在 Python 中針對 CPU 密集型任務可以通過 multiprocessing 模塊來提升性能,通過 multiprocessing 模塊可以使程序運行在多核 CPU 中,增加 CPU 的利用率以提升計算性能。
給代理池爬蟲示例增加多核計算支持:
#!/usr/bin/env python # -*- coding:utf-8 -*- """ @author: qfedu.com """ import os import re import time import asyncio from multiprocessing import Pool ? import aiohttp ? HEADERS = {'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.0.3 Safari/605.1.15' } ? OUTPUT_FILE = 'proxies.txt' # 代理池輸出文件 SITES = ['http://www.live-socks.net', 'http://www.proxyserverlist24.top'] # 代理發布網站 CHECK_URL = 'http://www.baidu.com' LOCAL_PROXY = 'http://127.0.0.1:1087' # ?在本地發起請求時的代理 ? ? # http get 協程 async def fetch(session, url, proxy=None):proxy_headers = HEADERS if proxy else Nonetry:async with session.get(url, headers=HEADERS, proxy=proxy,proxy_headers=proxy_headers,timeout=aiohttp.ClientTimeout(total=5)) as response:if response.status == 200:return await response.text()else:return ''except:return '' ? # 解析出 html 頁面中的代理發布鏈接 ? ? def parse_page_link(html):return re.findall(r'<h3[\s\S]*?<a.*?(http.*?\.html).*?</a>', html) ? # 從代理發布網站獲取代理發布頁面鏈接 ? ? async def get_page_links(loop, session):tasks = [loop.create_task(fetch(session, url, proxy=LOCAL_PROXY))for url in SITES] # 創建協程任務done, _ = await asyncio.wait(tasks) # 執行并等待所有任務完成htmls = [f.result() for f in done] # 獲取所有返回結果 ?# 利用多核 CPU 的計算能力提升頁面解析性能with Pool(processes=os.cpu_count() * 2) as pool:results = pool.map(parse_page_link, htmls) ?return [y for x in results for y in x] ? # 解析出 html 頁面中的代理 IP ? ? def parse_proxy(html):return re.findall(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{1,5}', html) ? # 從代理發布頁面獲取代理 IP ? ? async def get_proxies(loop, session, page_links):tasks = [loop.create_task(fetch(session, url, proxy=LOCAL_PROXY))for url in page_links] # 創建協程任務done, _ = await asyncio.wait(tasks) # 執行并等待所有任務完成htmls = [f.result() for f in done] # 獲取所有返回結果 ?# 利用多核 CPU 的計算能力提升頁面解析性能with Pool(processes=os.cpu_count() * 2) as pool:results = pool.map(parse_proxy, htmls) ?return list(set([y for x in results for y in x])) ? ? # 驗證代理 IP async def check_proxy(session, proxy):html = await fetch(session, CHECK_URL, proxy=proxy) ?# 如果返回通過代理 IP 訪問的頁面,則說明代理 IP 有效return proxy if html else '' ? ? # 通過協程批量驗證代理 IP,每次同時發起 200 個驗證請求 async def check_proxies(loop, session, proxies):checked_proxies = []for i in range(0, len(proxies), 200):_proxies = [proxy.strip() if proxy.strip().startswith('http://')else 'http://' + proxy.strip() for proxy in proxies[i:i + 200]]tasks = [loop.create_task(check_proxy(session, proxy))for proxy in _proxies]done, _ = await asyncio.wait(tasks) # 執行并等待所有任務完成checked = [f.result() for f in done] # 獲取所有返回結果checked_proxies += [p for p in checked if p] # 獲取不為空的返回值,即驗證成功的代理 IP ?return checked_proxies ? ? # 將代理 IP 逐行保存到文件 def save_proxies(proxies):# 創建新文件,如果文件已存在,則清空文件內容with open(OUTPUT_FILE, 'w') as f:f.write('') ?# 通過追加寫模式,逐行寫入文件with open(OUTPUT_FILE, 'a') as f:for proxy in proxies:f.write(proxy + '\n') ? ? async def main(loop):async with aiohttp.ClientSession() as session:page_links = await get_page_links(loop, session) # 獲得代理發布頁面鏈接# 從代理發布頁面獲得代理 IPproxies = await get_proxies(loop, session, page_links)print('total proxy: {}'.format(len(proxies))) # 解析出的代理 IP 總量proxies = await check_proxies(loop, session, proxies) # 驗證代理 IP ?print('total checked proxy: {}'.format(len(proxies))) # 驗證后的代理 IP 總量save_proxies(proxies) # 保存代理 IP 到文件 ? ? start = time.time() loop = asyncio.get_event_loop() # 建立 事件循環 loop.run_until_complete(main(loop)) # 在 事件循環 中執行協程 loop.close() # 關閉 事件循環 total_time = time.time() - start print(f'total time: {total_time}')復制代碼進程間的調度及上下文切換是非常消耗資源的。上面例子中解析任務比較簡單,解析量也非常少,增加多核計算支持后,性能幾乎沒有提升還有可能降低。在實際爬蟲項目中需要根據實際情況來衡量和選擇。
總結
以上是生活随笔為你收集整理的Python基础入门教程:使用 Python 3 协程快速获得一个代理池的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: CentOS7援救模式下更改密码
- 下一篇: mariadb(mysql)的安装