第21讲:IP代理池的搭建和使用
我們在上一課時了解了利用代理可以解決目標網站封 IP 的問題,但是如何實時高效地獲取到大量可用的代理又是一個問題。
首先在互聯網上有大量公開的免費代理,當然我們也可以購買付費的代理 IP,但是代理不論是免費的還是付費的,都不能保證是可用的,因為可能此 IP 已被其他人使用來爬取同樣的目標站點而被封禁,或者代理服務器突然發生故障或網絡繁忙。一旦我們選用了一個不可用的代理,這勢必會影響爬蟲的工作效率。
所以,我們需要提前做篩選,將不可用的代理剔除掉,保留可用代理。那么這個怎么來實現呢?這里就需要借助于一個叫作代理池的東西了。
接下來本課時我們就介紹一下如何搭建一個高效易用的代理池。
1.準備工作
在這里代理池的存儲我們需要借助于 Redis,因此這個需要額外安裝。總體來說,本課時需要的環境如下:
安裝并成功運行和連接一個 Redis 數據庫,安裝方法見:https://cuiqingcai.com/5219.html。
- 安裝好 Python3(至少為 Python 3.6 版本),并能成功運行 Python 程序。
- 安裝好一些必要的庫,包括 aiohttp、requests、redis-py、pyquery、Flask 等。
建議使用 Python 虛擬環境安裝,參考安裝命令如下:
pip3 install - r https://raw.githubusercontent.com/Python3WebSpider/ProxyPool/master/requirements.txt做好了如上準備工作,我們便可以開始實現或運行本課時所講的代理池了。
2.代理池的目標
我們需要做到下面的幾個目標,來實現易用高效的代理池。
- 基本模塊分為 4 塊:存儲模塊、獲取模塊、檢測模塊、接口模塊。
- 存儲模塊:負責存儲抓取下來的代理。首先要保證代理不重復,要標識代理的可用情況,還要動態實時處理每個代理,所以一種比較高效和方便的存儲方式就是使用 Redis 的 Sorted Set,即有序集合。
- 獲取模塊:需要定時在各大代理網站抓取代理。代理可以是免費公開代理也可以是付費代理,代理的形式都是 IP 加端口,此模塊盡量從不同來源獲取,盡量抓取高匿代理,抓取成功之后將可用代理保存到數據庫中。
- 檢測模塊:需要定時檢測數據庫中的代理。這里需要設置一個檢測鏈接,最好是爬取哪個網站就檢測哪個網站,這樣更加有針對性,如果要做一個通用型的代理,那可以設置百度等鏈接來檢測。另外,我們需要標識每一個代理的狀態,如設置分數標識,100 分代表可用,分數越少代表越不可用。檢測一次,如果代理可用,我們可以將分數標識立即設置為 100 滿分,也可以在原基礎上加 1 分;如果代理不可用,可以將分數標識減 1 分,當分數減到一定閾值后,代理就直接從數據庫移除。通過這樣的標識分數,我們就可以辨別代理的可用情況,選用的時候會更有針對性。
- 接口模塊:需要用 API 來提供對外服務的接口。其實我們可以直接連接數據庫來獲取對應的數據,但是這樣就需要知道數據庫的連接信息,并且要配置連接,而比較安全和方便的方式就是提供一個 Web API 接口,我們通過訪問接口即可拿到可用代理。另外,由于可用代理可能有多個,那么我們可以設置一個隨機返回某個可用代理的接口,這樣就能保證每個可用代理都可以取到,實現負載均衡。
以上內容是設計代理的一些基本思路。接下來我們設計整體的架構,然后用代碼實現代理池。
3.代理池的架構
根據上文的描述,代理池的架構如圖所示。
代理池分為 4 個模塊:存儲模塊、獲取模塊、檢測模塊、接口模塊。
- 存儲模塊使用 Redis 的有序集合,用來做代理的去重和狀態標識,同時它也是中心模塊和基礎模塊,將其他模塊串聯起來。
- 獲取模塊定時從代理網站獲取代理,將獲取的代理傳遞給存儲模塊,并保存到數據庫。
- 檢測模塊定時通過存儲模塊獲取所有代理,并對代理進行檢測,根據不同的檢測結果對代理設置不同的標識。
- 接口模塊通過 Web API 提供服務接口,接口通過連接數據庫并通過 Web 形式返回可用的代理。
4.代理池的實現
接下來我們分別用代碼來實現一下這四個模塊。
注:完整的代理池代碼量較大,因此本課時的代碼不必一步步跟著編寫,最后去了解源碼即可。
5.存儲模塊
這里我們使用 Redis 的有序集合,集合的每一個元素都是不重復的,對于代理池來說,集合的元素就變成了一個個代理,也就是 IP 加端口的形式,如 60.207.237.111:8888,這樣的一個代理就是集合的一個元素。另外,有序集合的每一個元素都有一個分數字段,分數是可以重復的,可以是浮點數類型,也可以是整數類型。該集合會根據每一個元素的分數對集合進行排序,數值小的排在前面,數值大的排在后面,這樣就可以實現集合元素的排序了。
對于代理池來說,這個分數可以作為判斷一個代理是否可用的標志,100 為最高分,代表最可用,0 為最低分,代表最不可用。如果要獲取可用代理,可以從代理池中隨機獲取分數最高的代理,注意是隨機,這樣可以保證每個可用代理都會被調用到。
分數是我們判斷代理穩定性的重要標準,設置分數規則如下所示。
- 分數 100 為可用,檢測器會定時循環檢測每個代理可用情況,一旦檢測到有可用的代理就立即置為 100,檢測到不可用就將分數減 1,分數減至 0 后代理移除。
- 新獲取的代理的分數為 10,如果測試可行,分數立即置為 100,不可行則分數減 1,分數減至 0 后代理移除。
這只是一種解決方案,當然可能還有更合理的方案。之所以設置此方案有如下幾個原因。
- 在檢測到代理可用時,分數立即置為 100,這樣可以保證所有可用代理有更大的機會被獲取到。你可能會問,為什么不將分數加 1 而是直接設為最高 100 呢?設想一下,有的代理是從各大免費公開代理網站獲取的,常常一個代理并沒有那么穩定,平均 5 次請求可能有兩次成功,3 次失敗,如果按照這種方式來設置分數,那么這個代理幾乎不可能達到一個高的分數,也就是說即便它有時是可用的,但是篩選的分數最高,那這樣的代理幾乎不可能被取到。如果想追求代理穩定性,可以用上述方法,這種方法可確保分數最高的代理一定是最穩定可用的。所以,這里我們采取 “可用即設置 100” 的方法,確保只要可用的代理都可以被獲取到。
- 在檢測到代理不可用時,分數減 1,分數減至 0 后,代理移除。這樣一個有效代理如果要被移除需要連續不斷失敗 100 次,也就是說當一個可用代理如果嘗試了 100 次都失敗了,就一直減分直到移除,一旦成功就重新置回 100。嘗試機會越多,則這個代理拯救回來的機會越多,這樣就不容易將曾經的一個可用代理丟棄,因為代理不可用的原因很可能是網絡繁忙或者其他人用此代理請求太過頻繁,所以在這里將分數為 100。
- 新獲取的代理的分數設置為 10,代理如果不可用,分數就減 1,分數減到 0,代理就移除,如果代理可用,分數就置為 100。由于很多代理是從免費網站獲取的,所以新獲取的代理無效的比例非常高,可能可用的代理不足 10%。所以在這里我們將分數設置為 10,檢測的機會沒有可用代理的 100 次那么多,這也可以適當減少開銷。
上述代理分數的設置思路不一定是最優思路,但據個人實測,它的實用性還是比較強的。
在這里首先給出存儲模塊的實現代碼,見:https://github.com/Python3WebSpider/ProxyPool/tree/master/proxypool/storages,建議直接對照源碼閱讀。
在代碼中,我們定義了一個類來操作數據庫的有序集合,定義一些方法來實現分數的設置、代理的獲取等。其核心實現代碼實現如下所示:
import redis from proxypool.exceptions import PoolEmptyException from proxypool.schemas.proxy import Proxy from proxypool.setting import REDIS_HOST, REDIS_PORT, REDIS_PASSWORD, REDIS_KEY, PROXY_SCORE_MAX, PROXY_SCORE_MIN, \PROXY_SCORE_INIT from random import choice from typing import List from loguru import logger from proxypool.utils.proxy import is_valid_proxy, convert_proxy_or_proxies REDIS_CLIENT_VERSION = redis.__version__ IS_REDIS_VERSION_2 = REDIS_CLIENT_VERSION.startswith('2.') class RedisClient(object):"""redis connection client of proxypool"""def __init__(self, host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, **kwargs):"""init redis client:param host: redis host:param port: redis port:param password: redis password"""self.db = redis.StrictRedis(host=host, port=port, password=password, decode_responses=True, **kwargs)def add(self, proxy: Proxy, score=PROXY_SCORE_INIT) -> int:"""add proxy and set it to init score:param proxy: proxy, ip:port, like 8.8.8.8:88:param score: int score:return: result"""if not is_valid_proxy(f'{proxy.host}:{proxy.port}'):logger.info(f'invalid proxy {proxy}, throw it')returnif not self.exists(proxy):if IS_REDIS_VERSION_2:return self.db.zadd(REDIS_KEY, score, proxy.string())return self.db.zadd(REDIS_KEY, {proxy.string(): score})def random(self) -> Proxy:"""get random proxyfirstly try to get proxy with max scoreif not exists, try to get proxy by rankif not exists, raise error:return: proxy, like 8.8.8.8:8"""# try to get proxy with max scoreproxies = self.db.zrangebyscore(REDIS_KEY, PROXY_SCORE_MAX, PROXY_SCORE_MAX)if len(proxies):return convert_proxy_or_proxies(choice(proxies))# else get proxy by rankproxies = self.db.zrevrange(REDIS_KEY, PROXY_SCORE_MIN, PROXY_SCORE_MAX)if len(proxies):return convert_proxy_or_proxies(choice(proxies))# else raise errorraise PoolEmptyExceptiondef decrease(self, proxy: Proxy) -> int:"""decrease score of proxy, if small than PROXY_SCORE_MIN, delete it:param proxy: proxy:return: new score"""score = self.db.zscore(REDIS_KEY, proxy.string())# current score is larger than PROXY_SCORE_MINif score and score > PROXY_SCORE_MIN:logger.info(f'{proxy.string()} current score {score}, decrease 1')if IS_REDIS_VERSION_2:return self.db.zincrby(REDIS_KEY, proxy.string(), -1)return self.db.zincrby(REDIS_KEY, -1, proxy.string())# otherwise delete proxyelse:logger.info(f'{proxy.string()} current score {score}, remove')return self.db.zrem(REDIS_KEY, proxy.string())def exists(self, proxy: Proxy) -> bool:"""if proxy exists:param proxy: proxy:return: if exists, bool"""return not self.db.zscore(REDIS_KEY, proxy.string()) is Nonedef max(self, proxy: Proxy) -> int:"""set proxy to max score:param proxy: proxy:return: new score"""logger.info(f'{proxy.string()} is valid, set to {PROXY_SCORE_MAX}')if IS_REDIS_VERSION_2:return self.db.zadd(REDIS_KEY, PROXY_SCORE_MAX, proxy.string())return self.db.zadd(REDIS_KEY, {proxy.string(): PROXY_SCORE_MAX})def count(self) -> int:"""get count of proxies:return: count, int"""return self.db.zcard(REDIS_KEY)def all(self) -> List[Proxy]:"""get all proxies:return: list of proxies"""return convert_proxy_or_proxies(self.db.zrangebyscore(REDIS_KEY, PROXY_SCORE_MIN, PROXY_SCORE_MAX))def batch(self, start, end) -> List[Proxy]:"""get batch of proxies:param start: start index:param end: end index:return: list of proxies"""return convert_proxy_or_proxies(self.db.zrevrange(REDIS_KEY, start, end - 1)) if __name__ == '__main__':conn = RedisClient()result = conn.random()print(result)首先我們定義了一些常量,如 PROXY_SCORE_MAX、PROXY_SCORE_MIN、PROXY_SCORE_INIT 分別代表最大分數、最小分數、初始分數。REDIS_HOST、REDIS_PORT、REDIS_PASSWORD 分別代表了 Redis 的連接信息,即地址、端口、密碼。REDIS_KEY 是有序集合的鍵名,我們可以通過它來獲取代理存儲所使用的有序集合。
RedisClient 這個類可以用來操作 Redis 的有序集合,其中定義了一些方法來對集合中的元素進行處理,它的主要功能如下所示。
- init 方法是初始化的方法,其參數是 Redis 的連接信息,默認的連接信息已經定義為常量,在 init 方法中初始化了一個 StrictRedis 的類,建立 Redis 連接。
- add 方法向數據庫添加代理并設置分數,默認的分數是 PROXY_SCORE_INIT 也就是 10,返回結果是添加的結果。
- random 方法是隨機獲取代理的方法,首先獲取 100 分的代理,然后隨機選擇一個返回。如果不存在 100 分的代理,則此方法按照排名來獲取,選取前 100 名,然后隨機選擇一個返回,否則拋出異常。
- decrease 方法是在代理檢測無效的時候設置分數減 1 的方法,代理傳入后,此方法將代理的分數減 1,如果分數達到最低值,那么代理就刪除。
- exists 方法可判斷代理是否存在集合中。
- max 方法將代理的分數設置為 PROXY_SCORE_MAX,即 100,也就是當代理有效時的設置。
- count 方法返回當前集合的元素個數。
- all 方法返回所有的代理列表,供檢測使用。
定義好了這些方法,我們可以在后續的模塊中調用此類來連接和操作數據庫。如想要獲取隨機可用的代理,只需要調用 random 方法即可,得到的就是隨機的可用代理。
6.獲取模塊
獲取模塊主要是為了從各大網站抓取代理并調用存儲模塊進行保存,代碼實現見:https://github.com/Python3WebSpider/ProxyPool/tree/master/proxypool/crawlers。
獲取模塊的邏輯相對簡單,比如我們可以定義一些抓取代理的方法,示例如下:
from proxypool.crawlers.base import BaseCrawler from proxypool.schemas.proxy import Proxy import re MAX_PAGE = 5 BASE_URL = 'http://www.ip3366.net/free/?stype=1&page={page}' class IP3366Crawler(BaseCrawler):"""ip3366 crawler, http://www.ip3366.net/"""urls = [BASE_URL.format(page=i) for i in range(1, 8)]def parse(self, html):"""parse html file to get proxies:return:"""ip_address = re.compile('<tr>\s*<td>(.*?)</td>\s*<td>(.*?)</td>')# \s * 匹配空格,起到換行作用re_ip_address = ip_address.findall(html)for address, port in re_ip_address:proxy = Proxy(host=address.strip(), port=int(port.strip()))yield proxy我們在這里定義了一個代理 Crawler 類,用來抓取某一網站的代理,這里是抓取的 IP3366 的公開代理,通過 parse 方法來解析頁面的源碼并構造一個個 Proxy 對象返回即可。
另外在其父類 BaseCrawler 里面定義了通用的頁面抓取方法,它可以讀取子類里面定義的 urls 全局變量并進行爬取,然后調用子類的 parse 方法來解析頁面,代碼實現如下:
from retrying import retry import requests from loguru import logger class BaseCrawler(object):urls = []@retry(stop_max_attempt_number=3, retry_on_result=lambda x: x is None)def fetch(self, url, **kwargs):try:response = requests.get(url, **kwargs)if response.status_code == 200:return response.textexcept requests.ConnectionError:return@logger.catchdef crawl(self):"""crawl main method"""for url in self.urls:logger.info(f'fetching {url}')html = self.fetch(url)for proxy in self.parse(html):logger.info(f'fetched proxy {proxy.string()} from {url}')yield proxy所以,我們如果要擴展一個代理的 Crawler,只需要繼承 BaseCrawler 并實現 parse 方法即可,擴展性較好。
因此,這一個個的 Crawler 就可以針對各個不同的代理網站進行代理的抓取。最后有一個統一的方法將 Crawler 匯總起來,遍歷調用即可。
如何匯總呢?在這里我們可以檢測代碼只要定義有 BaseCrawler 的子類就算一個有效的代理 Crawler,可以直接通過遍歷 Python 文件包的方式來獲取,代碼實現如下:
import pkgutil from .base import BaseCrawler import inspect # load classes subclass of BaseCrawler classes = [] for loader, name, is_pkg in pkgutil.walk_packages(__path__):module = loader.find_module(name).load_module(name)for name, value in inspect.getmembers(module):globals()[name] = valueif inspect.isclass(value) and issubclass(value, BaseCrawler) and value is not BaseCrawler:classes.append(value) __all__ = __ALL__ = classes在這里我們調用了 walk_packages 方法,遍歷了整個 crawlers 模塊下的類,并判斷了它是 BaseCrawler 的子類,那就將其添加到結果中,并返回。
最后只要將 classes 遍歷并依次實例化,調用其 crawl 方法即可完成代理的爬取和提取,代碼實現見:https://github.com/Python3WebSpider/ProxyPool/blob/master/proxypool/processors/getter.py。
7.檢測模塊
我們已經成功將各個網站的代理獲取下來了,現在就需要一個檢測模塊來對所有代理進行多輪檢測。代理檢測可用,分數就設置為 100,代理不可用,分數減 1,這樣就可以實時改變每個代理的可用情況。如要獲取有效代理只需要獲取分數高的代理即可。
由于代理的數量非常多,為了提高代理的檢測效率,我們在這里使用異步請求庫 aiohttp 來進行檢測。
requests 作為一個同步請求庫,我們在發出一個請求之后,程序需要等待網頁加載完成之后才能繼續執行。也就是這個過程會阻塞等待響應,如果服務器響應非常慢,比如一個請求等待十幾秒,那么我們使用 requests 完成一個請求就會需要十幾秒的時間,程序也不會繼續往下執行,而在這十幾秒的時間里程序其實完全可以去做其他的事情,比如調度其他的請求或者進行網頁解析等。
對于響應速度比較快的網站來說,requests 同步請求和 aiohttp 異步請求的效果差距沒那么大。可對于檢測代理來說,檢測一個代理一般需要十多秒甚至幾十秒的時間,這時候使用 aiohttp 異步請求庫的優勢就大大體現出來了,效率可能會提高幾十倍不止。
所以,我們的代理檢測使用異步請求庫 aiohttp,實現示例如下所示:
import asyncio import aiohttp from loguru import logger from proxypool.schemas import Proxy from proxypool.storages.redis import RedisClient from proxypool.setting import TEST_TIMEOUT, TEST_BATCH, TEST_URL, TEST_VALID_STATUS from aiohttp import ClientProxyConnectionError, ServerDisconnectedError, ClientOSError, ClientHttpProxyError from asyncio import TimeoutError EXCEPTIONS = (ClientProxyConnectionError,ConnectionRefusedError,TimeoutError,ServerDisconnectedError,ClientOSError,ClientHttpProxyError ) class Tester(object):"""tester for testing proxies in queue"""def __init__(self):"""init redis"""self.redis = RedisClient()self.loop = asyncio.get_event_loop()async def test(self, proxy: Proxy):"""test single proxy:param proxy: Proxy object:return:"""async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session:try:logger.debug(f'testing {proxy.string()}')async with session.get(TEST_URL, proxy=f'http://{proxy.string()}', timeout=TEST_TIMEOUT,allow_redirects=False) as response:if response.status in TEST_VALID_STATUS:self.redis.max(proxy)logger.debug(f'proxy {proxy.string()} is valid, set max score')else:self.redis.decrease(proxy)logger.debug(f'proxy {proxy.string()} is invalid, decrease score')except EXCEPTIONS:self.redis.decrease(proxy)logger.debug(f'proxy {proxy.string()} is invalid, decrease score')@logger.catchdef run(self):"""test main method:return:"""# event loop of aiohttplogger.info('stating tester...')count = self.redis.count()logger.debug(f'{count} proxies to test')for i in range(0, count, TEST_BATCH):# start end end offsetstart, end = i, min(i + TEST_BATCH, count)logger.debug(f'testing proxies from {start} to {end} indices')proxies = self.redis.batch(start, end)tasks = [self.test(proxy) for proxy in proxies]# run tasks using event loopself.loop.run_until_complete(asyncio.wait(tasks)) if __name__ == '__main__':tester = Tester()tester.run()這里定義了一個類 Tester,init 方法中建立了一個 RedisClient 對象,供該對象中其他方法使用。接下來定義了一個 test 方法,這個方法用來檢測單個代理的可用情況,其參數就是被檢測的代理。注意,test 方法前面加了 async 關鍵詞,代表這個方法是異步的。方法內部首先創建了 aiohttp 的 ClientSession 對象,可以直接調用該對象的 get 方法來訪問頁面。
測試的鏈接在這里定義為常量 TEST_URL。如果針對某個網站有抓取需求,建議將 TEST_URL 設置為目標網站的地址,因為在抓取的過程中,代理本身可能是可用的,但是該代理的 IP 已經被目標網站封掉了。例如,某些代理可以正常訪問百度等頁面,但是對知乎來說可能就被封了,所以我們可以將 TEST_URL 設置為知乎的某個頁面的鏈接,當請求失敗、代理被封時,分數自然會減下來,失效的代理就不會被取到了。
如果想做一個通用的代理池,則不需要專門設置 TEST_URL,可以將其設置為一個不會封 IP 的網站,也可以設置為百度這類響應穩定的網站。
我們還定義了 TEST_VALID_STATUS 變量,這個變量是一個列表形式,包含了正常的狀態碼,如可以定義成 [200]。當然某些目標網站可能會出現其他的狀態碼,你可以自行配置。
程序在獲取 Response 后需要判斷響應的狀態,如果狀態碼在 TEST_VALID_STATUS 列表里,則代表代理可用,可以調用 RedisClient 的 max 方法將代理分數設為 100,否則調用 decrease 方法將代理分數減 1,如果出現異常也同樣將代理分數減 1。
另外,我們設置了批量測試的最大值為 TEST_BATCH,也就是一批測試最多 TEST_BATCH 個,這可以避免代理池過大時一次性測試全部代理導致內存開銷過大的問題。當然也可以用信號量來實現并發控制。
隨后,在 run 方法里面獲取了所有的代理列表,使用 aiohttp 分配任務,啟動運行。這樣在不斷的運行過程中,代理池中無效的代理的分數會一直被減 1,直至被清除,有效的代理則會一直保持 100 分,供隨時取用。
這樣,測試模塊的邏輯就完成了。
8.接口模塊
通過上述 3 個模塊,我們已經可以做到代理的獲取、檢測和更新,數據庫就會以有序集合的形式存儲各個代理及其對應的分數,分數 100 代表可用,分數越小代表越不可用。
但是我們怎樣方便地獲取可用代理呢?可以用 RedisClient 類直接連接 Redis,然后調用 random 方法。這樣做沒問題,效率很高,但是會有幾個弊端。
- 如果其他人使用這個代理池,他需要知道 Redis 連接的用戶名和密碼信息,這樣很不安全。
- 如果代理池需要部署在遠程服務器上運行,而遠程服務器的 Redis 只允許本地連接,那么我們就不能遠程直連 Redis 來獲取代理。
- 如果爬蟲所在的主機沒有連接 Redis 模塊,或者爬蟲不是由 Python 語言編寫的,那么我們就無法使用 RedisClient 來獲取代理。
- 如果 RedisClient 類或者數據庫結構有更新,那么爬蟲端必須同步這些更新,這樣非常麻煩。
綜上考慮,為了使代理池可以作為一個獨立服務運行,我們最好增加一個接口模塊,并以 Web API 的形式暴露可用代理。
這樣一來,獲取代理只需要請求接口即可,以上的幾個缺點弊端也可以避免。
我們使用一個比較輕量級的庫 Flask 來實現這個接口模塊,實現示例如下所示:
from flask import Flask, g from proxypool.storages.redis import RedisClient from proxypool.setting import API_HOST, API_PORT, API_THREADED __all__ = ['app'] app = Flask(__name__) def get_conn():"""get redis client object:return:"""if not hasattr(g, 'redis'):g.redis = RedisClient()return g.redis @app.route('/') def index():"""get home page, you can define your own templates:return:"""return '<h2>Welcome to Proxy Pool System</h2>' @app.route('/random') def get_proxy():"""get a random proxy:return: get a random proxy"""conn = get_conn()return conn.random().string() @app.route('/count') def get_count():"""get the count of proxies:return: count, int"""conn = get_conn()return str(conn.count()) if __name__ == '__main__':app.run(host=API_HOST, port=API_PORT, threaded=API_THREADED)在這里,我們聲明了一個 Flask 對象,定義了 3 個接口,分別是首頁、隨機代理頁、獲取數量頁。
運行之后,Flask 會啟動一個 Web 服務,我們只需要訪問對應的接口即可獲取到可用代理。
9.調度模塊
調度模塊就是調用以上所定義的 3 個模塊,將這 3 個模塊通過多進程的形式運行起來,示例如下所示:
import time import multiprocessing from proxypool.processors.server import app from proxypool.processors.getter import Getter from proxypool.processors.tester import Tester from proxypool.setting import CYCLE_GETTER, CYCLE_TESTER, API_HOST, API_THREADED, API_PORT, ENABLE_SERVER, \ENABLE_GETTER, ENABLE_TESTER, IS_WINDOWS from loguru import logger if IS_WINDOWS:multiprocessing.freeze_support() tester_process, getter_process, server_process = None, None, None class Scheduler():"""scheduler"""def run_tester(self, cycle=CYCLE_TESTER):"""run tester"""if not ENABLE_TESTER:logger.info('tester not enabled, exit')returntester = Tester()loop = 0while True:logger.debug(f'tester loop {loop} start...')tester.run()loop += 1time.sleep(cycle)def run_getter(self, cycle=CYCLE_GETTER):"""run getter"""if not ENABLE_GETTER:logger.info('getter not enabled, exit')returngetter = Getter()loop = 0while True:logger.debug(f'getter loop {loop} start...')getter.run()loop += 1time.sleep(cycle)def run_server(self):"""run server for api"""if not ENABLE_SERVER:logger.info('server not enabled, exit')returnapp.run(host=API_HOST, port=API_PORT, threaded=API_THREADED)def run(self):global tester_process, getter_process, server_processtry:logger.info('starting proxypool...')if ENABLE_TESTER:tester_process = multiprocessing.Process(target=self.run_tester)logger.info(f'starting tester, pid {tester_process.pid}...')tester_process.start()if ENABLE_GETTER:getter_process = multiprocessing.Process(target=self.run_getter)logger.info(f'starting getter, pid{getter_process.pid}...')getter_process.start()if ENABLE_SERVER:server_process = multiprocessing.Process(target=self.run_server)logger.info(f'starting server, pid{server_process.pid}...')server_process.start()tester_process.join()getter_process.join()server_process.join()except KeyboardInterrupt:logger.info('received keyboard interrupt signal')tester_process.terminate()getter_process.terminate()server_process.terminate()finally:# must call join method before calling is_alivetester_process.join()getter_process.join()server_process.join()logger.info(f'tester is {"alive" if tester_process.is_alive() else "dead"}')logger.info(f'getter is {"alive" if getter_process.is_alive() else "dead"}')logger.info(f'server is {"alive" if server_process.is_alive() else "dead"}')logger.info('proxy terminated') if __name__ == '__main__':scheduler = Scheduler()scheduler.run()3 個常量 ENABLE_TESTER、ENABLE_GETTER、ENABLE_SERVER 都是布爾類型,表示測試模塊、獲取模塊、接口模塊的開關,如果都為 True,則代表模塊開啟。
啟動入口是 run 方法,這個方法分別判斷 3 個模塊的開關。如果開關開啟,啟動時程序就新建一個 Process 進程,設置好啟動目標,然后調用 start 方法運行,這樣 3 個進程就可以并行執行,互不干擾。
3 個調度方法結構也非常清晰。比如,run_tester 方法用來調度測試模塊,首先聲明一個 Tester 對象,然后進入死循環不斷循環調用其 run 方法,執行完一輪之后就休眠一段時間,休眠結束之后重新再執行。在這里,休眠時間也定義為一個常量,如 20 秒,即每隔 20 秒進行一次代理檢測。
最后,只需要調用 Scheduler 的 run 方法即可啟動整個代理池。
以上內容便是整個代理池的架構和相應實現邏輯。
10.運行
接下來我們將代碼整合一下,將代理運行起來,運行之后的輸出結果如下所示:
2020-04-13 02:52:06.510 | INFO | proxypool.storages.redis:decrease:73 - 60.186.146.193:9000 current score 10.0, decrease 1 2020-04-13 02:52:06.517 | DEBUG | proxypool.processors.tester:test:52 - proxy 60.186.146.193:9000 is invalid, decrease score 2020-04-13 02:52:06.524 | INFO | proxypool.storages.redis:decrease:73 - 60.186.151.147:9000 current score 10.0, decrease 1 2020-04-13 02:52:06.532 | DEBUG | proxypool.processors.tester:test:52 - proxy 60.186.151.147:9000 is invalid, decrease score 2020-04-13 02:52:07.159 | INFO | proxypool.storages.redis:max:96 - 60.191.11.246:3128 is valid, set to 100 2020-04-13 02:52:07.167 | DEBUG | proxypool.processors.tester:test:46 - proxy 60.191.11.246:3128 is valid, set max score 2020-04-13 02:52:17.271 | INFO | proxypool.storages.redis:decrease:73 - 59.62.7.130:9000 current score 10.0, decrease 1 2020-04-13 02:52:17.280 | DEBUG | proxypool.processors.tester:test:52 - proxy 59.62.7.130:9000 is invalid, decrease score 2020-04-13 02:52:17.288 | INFO | proxypool.storages.redis:decrease:73 - 60.167.103.74:1133 current score 10.0, decrease 1 2020-04-13 02:52:17.295 | DEBUG | proxypool.processors.tester:test:52 - proxy 60.167.103.74:1133 is invalid, decrease score 2020-04-13 02:52:17.302 | INFO | proxypool.storages.redis:decrease:73 - 60.162.71.113:9000 current score 10.0, decrease 1 2020-04-13 02:52:17.309 | DEBUG | proxypool.processors.tester:test:52 - proxy 60.162.71.113:9000 is invalid, decrease score以上是代理池的控制臺輸出,可以看到可用代理設置為 100,不可用代理分數減 1。
接下來我們再打開瀏覽器,當前配置了運行在 5555 端口,所以打開:http://127.0.0.1:5555,即可看到其首頁,如圖所示。
我們只需要訪問此接口即可獲取一個隨機可用代理,這非常方便。
獲取代理的代碼如下所示:
import requestsPROXY_POOL_URL = 'http://localhost:5555/random'def get_proxy():try:response = requests.get(PROXY_POOL_URL)if response.status_code == 200:return response.textexcept ConnectionError:return None這樣便可以獲取到一個隨機代理了,它是字符串類型,此代理可以按照上一課時所示的方法設置,如 requests 的使用方法如下所示:
import requestsproxy = get_proxy() proxies = {'http': 'http://' + proxy,'https': 'https://' + proxy, } try:response = requests.get('http://httpbin.org/get', proxies=proxies)print(response.text) except requests.exceptions.ConnectionError as e:print('Error', e.args)有了代理池之后,我們再取出代理即可有效防止 IP 被封禁的情況。
11.總結
本課時代碼地址為:https://github.com/Python3WebSpider/ProxyPool,代碼量相比之前的案例復雜了很多,邏輯也相對完善。另外代碼庫中還提供了 Docker 和 Kubernetes 的運行和部署操作,可以幫助我們更加快捷地運行代理池,如果你感興趣可以了解下。
超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術人生總結
以上是生活随笔為你收集整理的第21讲:IP代理池的搭建和使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 第14讲:Selenium 的基本使用
- 下一篇: 第33讲:可见即可爬,Appium 的使