Scrapy-redis实现分布式爬取的过程与原理
Scrapy是一個(gè)比較好用的Python爬蟲(chóng)框架,你只需要編寫幾個(gè)組件就可以實(shí)現(xiàn)網(wǎng)頁(yè)數(shù)據(jù)的爬取。但是當(dāng)我們要爬取的頁(yè)面非常多的時(shí)候,單個(gè)主機(jī)的處理能力就不能滿足我們的需求了(無(wú)論是處理速度還是網(wǎng)絡(luò)請(qǐng)求的并發(fā)數(shù)),這時(shí)候分布式爬蟲(chóng)的優(yōu)勢(shì)就顯現(xiàn)出來(lái)。
而Scrapy-Redis則是一個(gè)基于Redis的Scrapy分布式組件。它利用Redis對(duì)用于爬取的請(qǐng)求(Requests)進(jìn)行存儲(chǔ)和調(diào)度(Schedule),并對(duì)爬取產(chǎn)生的項(xiàng)目(items)存儲(chǔ)以供后續(xù)處理使用。scrapy-redi重寫了scrapy一些比較關(guān)鍵的代碼,將scrapy變成一個(gè)可以在多個(gè)主機(jī)上同時(shí)運(yùn)行的分布式爬蟲(chóng)。
原生的Scrapy的架構(gòu)是這樣子的:
加上了Scrapy-Redis之后的架構(gòu)變成了:
scrapy-redis的官方文檔寫的比較簡(jiǎn)潔,沒(méi)有提及其運(yùn)行原理,所以如果想全面的理解分布式爬蟲(chóng)的運(yùn)行原理,還是得看scrapy-redis的源代碼才行,不過(guò)scrapy-redis的源代碼很少,也比較好懂,很快就能看完。
scrapy-redis工程的主體還是是redis和scrapy兩個(gè)庫(kù),工程本身實(shí)現(xiàn)的東西不是很多,這個(gè)工程就像膠水一樣,把這兩個(gè)插件粘結(jié)了起來(lái)。
scrapy-redis提供了哪些組件?
scrapy-redis所實(shí)現(xiàn)的兩種分布式:爬蟲(chóng)分布式以及item處理分布式。分別是由模塊scheduler和模塊pipelines實(shí)現(xiàn)。
connection.py
負(fù)責(zé)根據(jù)setting中配置實(shí)例化redis連接。被dupefilter和scheduler調(diào)用,總之涉及到redis存取的都要使用到這個(gè)模塊。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 | import redis import six from scrapy.utils.misc import load_object DEFAULT_REDIS_CLS = redis.StrictRedis # Sane connection defaults. DEFAULT_PARAMS = { ????'socket_timeout': 30, ????'socket_connect_timeout': 30, ????'retry_on_timeout': True, } # Shortcut maps 'setting name' -> 'parmater name'. SETTINGS_PARAMS_MAP = { ????'REDIS_URL': 'url', ????'REDIS_HOST': 'host', ????'REDIS_PORT': 'port', } def get_redis_from_settings(settings): ????"""Returns a redis client instance from given Scrapy settings object. ????This function uses ``get_client`` to instantiate the client and uses ????``DEFAULT_PARAMS`` global as defaults values for the parameters. You can ????override them using the ``REDIS_PARAMS`` setting. ????Parameters ????---------- ????settings : Settings ????????A scrapy settings object. See the supported settings below. ????Returns ????------- ????server ????????Redis client instance. ????Other Parameters ????---------------- ????REDIS_URL : str, optional ????????Server connection URL. ????REDIS_HOST : str, optional ????????Server host. ????REDIS_PORT : str, optional ????????Server port. ????REDIS_PARAMS : dict, optional ????????Additional client parameters. ????""" ????params = DEFAULT_PARAMS.copy() ????params.update(settings.getdict('REDIS_PARAMS')) ????# XXX: Deprecate REDIS_* settings. ????for source, dest in SETTINGS_PARAMS_MAP.items(): ????????val = settings.get(source) ????????if val: ????????????params[dest] = val ????# Allow ``redis_cls`` to be a path to a class. ????if isinstance(params.get('redis_cls'), six.string_types): ????????params['redis_cls'] = load_object(params['redis_cls']) ????return get_redis(**params) # Backwards compatible alias. from_settings = get_redis_from_settings def get_redis(**kwargs): ????"""Returns a redis client instance. ????Parameters ????---------- ????redis_cls : class, optional ????????Defaults to ``redis.StrictRedis``. ????url : str, optional ????????If given, ``redis_cls.from_url`` is used to instantiate the class. ????**kwargs ????????Extra parameters to be passed to the ``redis_cls`` class. ????Returns ????------- ????server ????????Redis client instance. ????""" ????redis_cls = kwargs.pop('redis_cls', DEFAULT_REDIS_CLS) ????url = kwargs.pop('url', None) ????if url: ????????return redis_cls.from_url(url, **kwargs) ????else: ????????return redis_cls(**kwargs) |
connect文件引入了redis模塊,這個(gè)是redis-python庫(kù)的接口,用于通過(guò)python訪問(wèn)redis數(shù)據(jù)庫(kù),可見(jiàn),這個(gè)文件主要是實(shí)現(xiàn)連接redis數(shù)據(jù)庫(kù)的功能(返回的是redis庫(kù)的Redis對(duì)象或者StrictRedis對(duì)象,這倆都是可以直接用來(lái)進(jìn)行數(shù)據(jù)操作的對(duì)象)。這些連接接口在其他文件中經(jīng)常被用到。其中,我們可以看到,要想連接到redis數(shù)據(jù)庫(kù),和其他數(shù)據(jù)庫(kù)差不多,需要一個(gè)ip地址、端口號(hào)、用戶名密碼(可選)和一個(gè)整形的數(shù)據(jù)庫(kù)編號(hào),同時(shí)我們還可以在scrapy工程的setting文件中配置套接字的超時(shí)時(shí)間、等待時(shí)間等。
dupefilter.py
負(fù)責(zé)執(zhí)行requst的去重,實(shí)現(xiàn)的很有技巧性,使用redis的set數(shù)據(jù)結(jié)構(gòu)。但是注意scheduler并不使用其中用于在這個(gè)模塊中實(shí)現(xiàn)的dupefilter鍵做request的調(diào)度,而是使用queue.py模塊中實(shí)現(xiàn)的queue。當(dāng)request不重復(fù)時(shí),將其存入到queue中,調(diào)度時(shí)將其彈出。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 | import logging import time from scrapy.dupefilters import BaseDupeFilter from scrapy.utils.request import request_fingerprint from .connection import get_redis_from_settings DEFAULT_DUPEFILTER_KEY = "dupefilter:%(timestamp)s" logger = logging.getLogger(__name__) # TODO: Rename class to RedisDupeFilter. class RFPDupeFilter(BaseDupeFilter): ????"""Redis-based request duplicates filter. ????This class can also be used with default Scrapy's scheduler. ????""" ????logger = logger ????def __init__(self, server, key, debug=False): ????????"""Initialize the duplicates filter. ????????Parameters ????????---------- ????????server : redis.StrictRedis ????????????The redis server instance. ????????key : str ????????????Redis key Where to store fingerprints. ????????debug : bool, optional ????????????Whether to log filtered requests. ????????""" ????????self.server = server ????????self.key = key ????????self.debug = debug ????????self.logdupes = True ????@classmethod ????def from_settings(cls, settings): ????????"""Returns an instance from given settings. ????????This uses by default the key ``dupefilter:<timestamp>``. When using the ????????``scrapy_redis.scheduler.Scheduler`` class, this method is not used as ????????it needs to pass the spider name in the key. ????????Parameters ????????---------- ????????settings : scrapy.settings.Settings ????????Returns ????????------- ????????RFPDupeFilter ????????????A RFPDupeFilter instance. ????????""" ????????server = get_redis_from_settings(settings) ????????# XXX: This creates one-time key. needed to support to use this ????????# class as standalone dupefilter with scrapy's default scheduler ????????# if scrapy passes spider on open() method this wouldn't be needed ????????# TODO: Use SCRAPY_JOB env as default and fallback to timestamp. ????????key = DEFAULT_DUPEFILTER_KEY % {'timestamp': int(time.time())} ????????debug = settings.getbool('DUPEFILTER_DEBUG') ????????return cls(server, key=key, debug=debug) ????@classmethod ????def from_crawler(cls, crawler): ????????"""Returns instance from crawler. ????????Parameters ????????---------- ????????crawler : scrapy.crawler.Crawler ????????Returns ????????------- ????????RFPDupeFilter ????????????Instance of RFPDupeFilter. ????????""" ????????return cls.from_settings(crawler.settings) ????def request_seen(self, request): ????????"""Returns True if request was already seen. ????????Parameters ????????---------- ????????request : scrapy.http.Request ????????Returns ????????------- ????????bool ????????""" ????????fp = self.request_fingerprint(request) ????????# This returns the number of values added, zero if already exists. ????????added = self.server.sadd(self.key, fp) ????????return added == 0 ????def request_fingerprint(self, request): ????????"""Returns a fingerprint for a given request. ????????Parameters ????????---------- ????????request : scrapy.http.Request ????????Returns ????????------- ????????str ????????""" ????????return request_fingerprint(request) ????def close(self, reason=''): ????????"""Delete data on close. Called by Scrapy's scheduler. ????????Parameters ????????---------- ????????reason : str, optional ????????""" ????????self.clear() ????def clear(self): ????????"""Clears fingerprints data.""" ????????self.server.delete(self.key) ????def log(self, request, spider): ????????"""Logs given request. ????????Parameters ????????---------- ????????request : scrapy.http.Request ????????spider : scrapy.spiders.Spider ????????""" ????????if self.debug: ????????????msg = "Filtered duplicate request: %(request)s" ????????????self.logger.debug(msg, {'request': request}, extra={'spider': spider}) ????????elif self.logdupes: ????????????msg = ("Filtered duplicate request %(request)s" ?????????????????? " - no more duplicates will be shown" ?????????????????? " (see DUPEFILTER_DEBUG to show all duplicates)") ????????????msg = "Filtered duplicate request: %(request)s" ????????????self.logger.debug(msg, {'request': request}, extra={'spider': spider}) ????????????self.logdupes = False |
這個(gè)文件看起來(lái)比較復(fù)雜,重寫了scrapy本身已經(jīng)實(shí)現(xiàn)的request判重功能。因?yàn)楸旧韘crapy單機(jī)跑的話,只需要讀取內(nèi)存中的request隊(duì)列或者持久化的request隊(duì)列(scrapy默認(rèn)的持久化似乎是json格式的文件,不是數(shù)據(jù)庫(kù))就能判斷這次要發(fā)出的request url是否已經(jīng)請(qǐng)求過(guò)或者正在調(diào)度(本地讀就行了)。而分布式跑的話,就需要各個(gè)主機(jī)上的scheduler都連接同一個(gè)數(shù)據(jù)庫(kù)的同一個(gè)request池來(lái)判斷這次的請(qǐng)求是否是重復(fù)的了。
在這個(gè)文件中,通過(guò)繼承BaseDupeFilter重寫他的方法,實(shí)現(xiàn)了基于redis的判重。根據(jù)源代碼來(lái)看,scrapy-redis使用了scrapy本身的一個(gè)fingerprint接request_fingerprint,這個(gè)接口很有趣,根據(jù)scrapy文檔所說(shuō),他通過(guò)hash來(lái)判斷兩個(gè)url是否相同(相同的url會(huì)生成相同的hash結(jié)果),但是當(dāng)兩個(gè)url的地址相同,get型參數(shù)相同但是順序不同時(shí),也會(huì)生成相同的hash結(jié)果(這個(gè)真的比較神奇。。。)所以scrapy-redis依舊使用url的fingerprint來(lái)判斷request請(qǐng)求是否已經(jīng)出現(xiàn)過(guò)。這個(gè)類通過(guò)連接redis,使用一個(gè)key來(lái)向redis的一個(gè)set中插入fingerprint(這個(gè)key對(duì)于同一種spider是相同的,redis是一個(gè)key-value的數(shù)據(jù)庫(kù),如果key是相同的,訪問(wèn)到的值就是相同的,這里使用spider名字+DupeFilter的key就是為了在不同主機(jī)上的不同爬蟲(chóng)實(shí)例,只要屬于同一種spider,就會(huì)訪問(wèn)到同一個(gè)set,而這個(gè)set就是他們的url判重池),如果返回值為0,說(shuō)明該set中該fingerprint已經(jīng)存在(因?yàn)榧鲜菦](méi)有重復(fù)值的),則返回False,如果返回值為1,說(shuō)明添加了一個(gè)fingerprint到set中,則說(shuō)明這個(gè)request沒(méi)有重復(fù),于是返回True,還順便把新fingerprint加入到數(shù)據(jù)庫(kù)中了。 DupeFilter判重會(huì)在scheduler類中用到,每一個(gè)request在進(jìn)入調(diào)度之前都要進(jìn)行判重,如果重復(fù)就不需要參加調(diào)度,直接舍棄就好了,不然就是白白浪費(fèi)資源。
queue.py
其作用如dupefilter.py所述,但是這里實(shí)現(xiàn)了三種方式的queue:FIFO的SpiderQueue,SpiderPriorityQueue,以及LIFI的SpiderStack。默認(rèn)使用的是第二種,這也就是出現(xiàn)之前文章中所分析情況的原因(鏈接)。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 | from scrapy.utils.reqser import request_to_dict, request_from_dict from . import picklecompat class Base(object): ????"""Per-spider queue/stack base class""" ????def __init__(self, server, spider, key, serializer=None): ????????"""Initialize per-spider redis queue. ????????Parameters: ????????????server -- redis connection ????????????spider -- spider instance ????????????key -- key for this queue (e.g. "%(spider)s:queue") ????????""" ????????if serializer is None: ????????????# Backward compatibility. ????????????# TODO: deprecate pickle. ????????????serializer = picklecompat ????????if not hasattr(serializer, 'loads'): ????????????raise TypeError("serializer does not implement 'loads' function: %r" ????????????????????????????% serializer) ????????if not hasattr(serializer, 'dumps'): ????????????raise TypeError("serializer '%s' does not implement 'dumps' function: %r" ????????????????????????????% serializer) ????????self.server = server ????????self.spider = spider ????????self.key = key % {'spider': spider.name} ????????self.serializer = serializer ????def _encode_request(self, request): ????????"""Encode a request object""" ????????obj = request_to_dict(request, self.spider) ????????return self.serializer.dumps(obj) ????def _decode_request(self, encoded_request): ????????"""Decode an request previously encoded""" ????????obj = self.serializer.loads(encoded_request) ????????return request_from_dict(obj, self.spider) ????def __len__(self): ????????"""Return the length of the queue""" ????????raise NotImplementedError ????def push(self, request): ????????"""Push a request""" ????????raise NotImplementedError ????def pop(self, timeout=0): ????????"""Pop a request""" ????????raise NotImplementedError ????def clear(self): ????????"""Clear queue/stack""" ????????self.server.delete(self.key) class SpiderQueue(Base): ????"""Per-spider FIFO queue""" ????def __len__(self): ????????"""Return the length of the queue""" ????????return self.server.llen(self.key) ????def push(self, request): ????????"""Push a request""" ????????self.server.lpush(self.key, self._encode_request(request)) ????def pop(self, timeout=0): ????????"""Pop a request""" ????????if timeout > 0: ????????????data = self.server.brpop(self.key, timeout) ????????????if isinstance(data, tuple): ????????????????data = data[1] ????????else: ????????????data = self.server.rpop(self.key) ????????if data: ????????????return self._decode_request(data) class SpiderPriorityQueue(Base): ????"""Per-spider priority queue abstraction using redis' sorted set""" ????def __len__(self): ????????"""Return the length of the queue""" ????????return self.server.zcard(self.key) ????def push(self, request): ????????"""Push a request""" ????????data = self._encode_request(request) ????????score = -request.priority ????????# We don't use zadd method as the order of arguments change depending on ????????# whether the class is Redis or StrictRedis, and the option of using ????????# kwargs only accepts strings, not bytes. ????????self.server.execute_command('ZADD', self.key, score, data) ????def pop(self, timeout=0): ????????""" ????????Pop a request ????????timeout not support in this queue class ????????""" ????????# use atomic range/remove using multi/exec ????????pipe = self.server.pipeline() ????????pipe.multi() ????????pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0) ????????results, count = pipe.execute() ????????if results: ????????????return self._decode_request(results[0]) class SpiderStack(Base): ????"""Per-spider stack""" ????def __len__(self): ????????"""Return the length of the stack""" ????????return self.server.llen(self.key) ????def push(self, request): ????????"""Push a request""" ????????self.server.lpush(self.key, self._encode_request(request)) ????def pop(self, timeout=0): ????????"""Pop a request""" ????????if timeout > 0: ????????????data = self.server.blpop(self.key, timeout) ????????????if isinstance(data, tuple): ????????????????data = data[1] ????????else: ????????????data = self.server.lpop(self.key) ????????if data: ????????????return self._decode_request(data) __all__ = ['SpiderQueue', 'SpiderPriorityQueue', 'SpiderStack'] |
該文件實(shí)現(xiàn)了幾個(gè)容器類,可以看這些容器和redis交互頻繁,同時(shí)使用了我們上邊picklecompat中定義的serializer。這個(gè)文件實(shí)現(xiàn)的幾個(gè)容器大體相同,只不過(guò)一個(gè)是隊(duì)列,一個(gè)是棧,一個(gè)是優(yōu)先級(jí)隊(duì)列,這三個(gè)容器到時(shí)候會(huì)被scheduler對(duì)象實(shí)例化,來(lái)實(shí)現(xiàn)request的調(diào)度。比如我們使用SpiderQueue最為調(diào)度隊(duì)列的類型,到時(shí)候request的調(diào)度方法就是先進(jìn)先出,而實(shí)用SpiderStack就是先進(jìn)后出了。
我們可以仔細(xì)看看SpiderQueue的實(shí)現(xiàn),他的push函數(shù)就和其他容器的一樣,只不過(guò)push進(jìn)去的request請(qǐng)求先被scrapy的接口request_to_dict變成了一個(gè)dict對(duì)象(因?yàn)閞equest對(duì)象實(shí)在是比較復(fù)雜,有方法有屬性不好串行化),之后使用picklecompat中的serializer串行化為字符串,然后使用一個(gè)特定的key存入redis中(該key在同一種spider中是相同的)。而調(diào)用pop時(shí),其實(shí)就是從redis用那個(gè)特定的key去讀其值(一個(gè)list),從list中讀取最早進(jìn)去的那個(gè),于是就先進(jìn)先出了。
這些容器類都會(huì)作為scheduler調(diào)度request的容器,scheduler在每個(gè)主機(jī)上都會(huì)實(shí)例化一個(gè),并且和spider一一對(duì)應(yīng),所以分布式運(yùn)行時(shí)會(huì)有一個(gè)spider的多個(gè)實(shí)例和一個(gè)scheduler的多個(gè)實(shí)例存在于不同的主機(jī)上,但是,因?yàn)閟cheduler都是用相同的容器,而這些容器都連接同一個(gè)redis服務(wù)器,又都使用spider名加queue來(lái)作為key讀寫數(shù)據(jù),所以不同主機(jī)上的不同爬蟲(chóng)實(shí)例公用一個(gè)request調(diào)度池,實(shí)現(xiàn)了分布式爬蟲(chóng)之間的統(tǒng)一調(diào)度。
picklecompat.py
| 1 2 3 4 5 6 7 8 9 | """A pickle wrapper module with protocol=-1 by default.""" try: ????import cPickle as pickle??# PY2 except ImportError: ????import pickle def loads(s): ????return pickle.loads(s) def dumps(obj): return pickle.dumps(obj, protocol=-1) |
這里實(shí)現(xiàn)了loads和dumps兩個(gè)函數(shù),其實(shí)就是實(shí)現(xiàn)了一個(gè)serializer,因?yàn)閞edis數(shù)據(jù)庫(kù)不能存儲(chǔ)復(fù)雜對(duì)象(value部分只能是字符串,字符串列表,字符串集合和hash,key部分只能是字符串),所以我們存啥都要先串行化成文本才行。這里使用的就是python的pickle模塊,一個(gè)兼容py2和py3的串行化工具。這個(gè)serializer主要用于一會(huì)的scheduler存reuqest對(duì)象,至于為什么不實(shí)用json格式,我也不是很懂,item pipeline的串行化默認(rèn)用的就是json。
pipelines.py
這是是用來(lái)實(shí)現(xiàn)分布式處理的作用。它將Item存儲(chǔ)在redis中以實(shí)現(xiàn)分布式處理。另外可以發(fā)現(xiàn),同樣是編寫pipelines,在這里的編碼實(shí)現(xiàn)不同于文章中所分析的情況,由于在這里需要讀取配置,所以就用到了from_crawler()函數(shù)。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | from scrapy.utils.misc import load_object from scrapy.utils.serialize import ScrapyJSONEncoder from twisted.internet.threads import deferToThread from . import connection default_serialize = ScrapyJSONEncoder().encode class RedisPipeline(object): ????"""Pushes serialized item into a redis list/queue""" ????def __init__(self, server, ???????????????? key='%(spider)s:items', ???????????????? serialize_func=default_serialize): ????????self.server = server ????????self.key = key ????????self.serialize = serialize_func ????@classmethod ????def from_settings(cls, settings): ????????params = { ????????????'server': connection.from_settings(settings), ????????} ????????if settings.get('REDIS_ITEMS_KEY'): ????????????params['key'] = settings['REDIS_ITEMS_KEY'] ????????if settings.get('REDIS_ITEMS_SERIALIZER'): ????????????params['serialize_func'] = load_object( ????????????????settings['REDIS_ITEMS_SERIALIZER'] ????????????) ????????return cls(**params) ????@classmethod ????def from_crawler(cls, crawler): ????????return cls.from_settings(crawler.settings) ????def process_item(self, item, spider): ????????return deferToThread(self._process_item, item, spider) ????def _process_item(self, item, spider): ????????key = self.item_key(item, spider) ????????data = self.serialize(item) ????????self.server.rpush(key, data) ????????return item ????def item_key(self, item, spider): ????????"""Returns redis key based on given spider. ????????Override this function to use a different key depending on the item ????????and/or spider. ????????""" ????????return self.key % {'spider': spider.name} |
pipeline文件實(shí)現(xiàn)了一個(gè)item pipieline類,和scrapy的item pipeline是同一個(gè)對(duì)象,通過(guò)從settings中拿到我們配置的REDIS_ITEMS_KEY作為key,把item串行化之后存入redis數(shù)據(jù)庫(kù)對(duì)應(yīng)的value中(這個(gè)value可以看出出是個(gè)list,我們的每個(gè)item是這個(gè)list中的一個(gè)結(jié)點(diǎn)),這個(gè)pipeline把提取出的item存起來(lái),主要是為了方便我們延后處理數(shù)據(jù)。
scheduler.py
此擴(kuò)展是對(duì)scrapy中自帶的scheduler的替代(在settings的SCHEDULER變量中指出),正是利用此擴(kuò)展實(shí)現(xiàn)crawler的分布式調(diào)度。其利用的數(shù)據(jù)結(jié)構(gòu)來(lái)自于queue中實(shí)現(xiàn)的數(shù)據(jù)結(jié)構(gòu)。
scrapy-redis所實(shí)現(xiàn)的兩種分布式:爬蟲(chóng)分布式以及item處理分布式就是由模塊scheduler和模塊pipelines實(shí)現(xiàn)。上述其它模塊作為為二者輔助的功能模塊。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 | import importlib import six from scrapy.utils.misc import load_object from . import connection # TODO: add SCRAPY_JOB support. class Scheduler(object): ????"""Redis-based scheduler""" ????def __init__(self, server, ???????????????? persist=False, ???????????????? flush_on_start=False, ???????????????? queue_key='%(spider)s:requests', ???????????????? queue_cls='scrapy_redis.queue.SpiderPriorityQueue', ???????????????? dupefilter_key='%(spider)s:dupefilter', ???????????????? dupefilter_cls='scrapy_redis.dupefilter.RFPDupeFilter', ???????????????? idle_before_close=0, ???????????????? serializer=None): ????????"""Initialize scheduler. ????????Parameters ????????---------- ????????server : Redis ????????????The redis server instance. ????????persist : bool ????????????Whether to flush requests when closing. Default is False. ????????flush_on_start : bool ????????????Whether to flush requests on start. Default is False. ????????queue_key : str ????????????Requests queue key. ????????queue_cls : str ????????????Importable path to the queue class. ????????dupefilter_key : str ????????????Duplicates filter key. ????????dupefilter_cls : str ????????????Importable path to the dupefilter class. ????????idle_before_close : int ????????????Timeout before giving up. ????????""" ????????if idle_before_close < 0: ????????????raise TypeError("idle_before_close cannot be negative") ????????self.server = server ????????self.persist = persist ????????self.flush_on_start = flush_on_start ????????self.queue_key = queue_key ????????self.queue_cls = queue_cls ????????self.dupefilter_cls = dupefilter_cls ????????self.dupefilter_key = dupefilter_key ????????self.idle_before_close = idle_before_close ????????self.serializer = serializer ????????self.stats = None ????def __len__(self): ????????return len(self.queue) ????@classmethod ????def from_settings(cls, settings): ????????kwargs = { ????????????'persist': settings.getbool('SCHEDULER_PERSIST'), ????????????'flush_on_start': settings.getbool('SCHEDULER_FLUSH_ON_START'), ????????????'idle_before_close': settings.getint('SCHEDULER_IDLE_BEFORE_CLOSE'), ????????} ????????# If these values are missing, it means we want to use the defaults. ????????optional = { ????????????# TODO: Use custom prefixes for this settings to note that are ????????????# specific to scrapy-redis. ????????????'queue_key': 'SCHEDULER_QUEUE_KEY', ????????????'queue_cls': 'SCHEDULER_QUEUE_CLASS', ????????????'dupefilter_key': 'SCHEDULER_DUPEFILTER_KEY', ????????????# We use the default setting name to keep compatibility. ????????????'dupefilter_cls': 'DUPEFILTER_CLASS', ????????????'serializer': 'SCHEDULER_SERIALIZER', ????????} ????????for name, setting_name in optional.items(): ????????????val = settings.get(setting_name) ????????????if val: ????????????????kwargs[name] = val ????????# Support serializer as a path to a module. ????????if isinstance(kwargs.get('serializer'), six.string_types): ????????????kwargs['serializer'] = importlib.import_module(kwargs['serializer']) ????????server = connection.from_settings(settings) ????????# Ensure the connection is working. ????????server.ping() ????????return cls(server=server, **kwargs) ????@classmethod ????def from_crawler(cls, crawler): ????????instance = cls.from_settings(crawler.settings) ????????# FIXME: for now, stats are only supported from this constructor ????????instance.stats = crawler.stats ????????return instance ????def open(self, spider): ????????self.spider = spider ????????try: ????????????self.queue = load_object(self.queue_cls)( ????????????????server=self.server, ????????????????spider=spider, ????????????????key=self.queue_key % {'spider': spider.name}, ????????????????serializer=self.serializer, ????????????) ????????except TypeError as e: ????????????raise ValueError("Failed to instantiate queue class '%s': %s", ???????????????????????????? self.queue_cls, e) ????????try: ????????????self.df = load_object(self.dupefilter_cls)( ????????????????server=self.server, ????????????????key=self.dupefilter_key % {'spider': spider.name}, ????????????????debug=spider.settings.getbool('DUPEFILTER_DEBUG'), ????????????) ????????except TypeError as e: ????????????raise ValueError("Failed to instantiate dupefilter class '%s': %s", ???????????????????????????? self.dupefilter_cls, e) ????????if self.flush_on_start: ????????????self.flush() ????????# notice if there are requests already in the queue to resume the crawl ????????if len(self.queue): ????????????spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue)) ????def close(self, reason): ????????if not self.persist: ????????????self.flush() ????def flush(self): ????????self.df.clear() ????????self.queue.clear() ????def enqueue_request(self, request): ????????if not request.dont_filter and self.df.request_seen(request): ????????????self.df.log(request, self.spider) ????????????return False ????????if self.stats: ????????????self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider) ????????self.queue.push(request) ????????return True ????def next_request(self): ????????block_pop_timeout = self.idle_before_close ????????request = self.queue.pop(block_pop_timeout) ????????if request and self.stats: ????????????self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider) ????????return request ????def has_pending_requests(self): ????????return len(self) > 0 |
這個(gè)文件重寫了scheduler類,用來(lái)代替scrapy.core.scheduler的原有調(diào)度器。其實(shí)對(duì)原有調(diào)度器的邏輯沒(méi)有很大的改變,主要是使用了redis作為數(shù)據(jù)存儲(chǔ)的媒介,以達(dá)到各個(gè)爬蟲(chóng)之間的統(tǒng)一調(diào)度。
scheduler負(fù)責(zé)調(diào)度各個(gè)spider的request請(qǐng)求,scheduler初始化時(shí),通過(guò)settings文件讀取queue和dupefilters的類型(一般就用上邊默認(rèn)的),配置queue和dupefilters使用的key(一般就是spider name加上queue或者dupefilters,這樣對(duì)于同一種spider的不同實(shí)例,就會(huì)使用相同的數(shù)據(jù)塊了)。每當(dāng)一個(gè)request要被調(diào)度時(shí),enqueue_request被調(diào)用,scheduler使用dupefilters來(lái)判斷這個(gè)url是否重復(fù),如果不重復(fù),就添加到queue的容器中(先進(jìn)先出,先進(jìn)后出和優(yōu)先級(jí)都可以,可以在settings中配置)。當(dāng)調(diào)度完成時(shí),next_request被調(diào)用,scheduler就通過(guò)queue容器的接口,取出一個(gè)request,把他發(fā)送給相應(yīng)的spider,讓spider進(jìn)行爬取工作。
spider.py
設(shè)計(jì)的這個(gè)spider從redis中讀取要爬的url,然后執(zhí)行爬取,若爬取過(guò)程中返回更多的url,那么繼續(xù)進(jìn)行直至所有的request完成。之后繼續(xù)從redis中讀取url,循環(huán)這個(gè)過(guò)程。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 | from scrapy import signals from scrapy.exceptions import DontCloseSpider from scrapy.spiders import Spider, CrawlSpider from . import connection class RedisMixin(object): ????"""Mixin class to implement reading urls from a redis queue.""" ????redis_key = None??# If empty, uses default '<spider>:start_urls'. ????# Fetch this amount of start urls when idle. ????redis_batch_size = 100 ????# Redis client instance. ????server = None ????def start_requests(self): ????????"""Returns a batch of start requests from redis.""" ????????return self.next_requests() ????def setup_redis(self, crawler=None): ????????"""Setup redis connection and idle signal. ????????This should be called after the spider has set its crawler object. ????????""" ????????if self.server is not None: ????????????return ????????if crawler is None: ????????????# We allow optional crawler argument to keep backwrads ????????????# compatibility. ????????????# XXX: Raise a deprecation warning. ????????????assert self.crawler, "crawler not set" ????????????crawler = self.crawler ????????if not self.redis_key: ????????????self.redis_key = '%s:start_urls' % self.name ????????self.log("Reading URLs from redis key '%s'" % self.redis_key) ????????self.redis_batch_size = self.settings.getint( ????????????'REDIS_START_URLS_BATCH_SIZE', ????????????self.redis_batch_size, ????????) ????????self.server = connection.from_settings(crawler.settings) ????????# The idle signal is called when the spider has no requests left, ????????# that's when we will schedule new requests from redis queue ????????crawler.signals.connect(self.spider_idle, signal=signals.spider_idle) ????def next_requests(self): ????????"""Returns a request to be scheduled or none.""" ????????use_set = self.settings.getbool('REDIS_START_URLS_AS_SET') ????????fetch_one = self.server.spop if use_set else self.server.lpop ????????# XXX: Do we need to use a timeout here? ????????found = 0 ????????while found < self.redis_batch_size: ????????????data = fetch_one(self.redis_key) ????????????if not data: ????????????????# Queue empty. ????????????????break ????????????yield self.make_request_from_data(data) ????????????found += 1 ????????if found: ????????????self.logger.debug("Read %s requests from '%s'", found, self.redis_key) ????def make_request_from_data(self, data): ????????# By default, data is an URL. ????????if '://' in data: ????????????return self.make_requests_from_url(data) ????????else: ????????????self.logger.error("Unexpected URL from '%s': %r", self.redis_key, data) ????def schedule_next_requests(self): ????????"""Schedules a request if available""" ????????for req in self.next_requests(): ????????????self.crawler.engine.crawl(req, spider=self) ????def spider_idle(self): ????????"""Schedules a request if available, otherwise waits.""" ????????# XXX: Handle a sentinel to close the spider. ????????self.schedule_next_requests() ????????raise DontCloseSpider class RedisSpider(RedisMixin, Spider): ????"""Spider that reads urls from redis queue when idle.""" ????@classmethod ????def from_crawler(self, crawler): ????????obj = super(RedisSpider, self).from_crawler(crawler) ????????obj.setup_redis(crawler) ????????return obj class RedisCrawlSpider(RedisMixin, CrawlSpider): ????"""Spider that reads urls from redis queue when idle.""" ????@classmethod ????def from_crawler(self, crawler): ????????obj = super(RedisCrawlSpider, self).from_crawler(crawler) ????????obj.setup_redis(crawler) ????????return obj |
spider的改動(dòng)也不是很大,主要是通過(guò)connect接口,給spider綁定了spider_idle信號(hào),spider初始化時(shí),通過(guò)setup_redis函數(shù)初始化好和redis的連接,之后通過(guò)next_requests函數(shù)從redis中取出strat url,使用的key是settings中REDIS_START_URLS_AS_SET定義的(注意了這里的初始化url池和我們上邊的queue的url池不是一個(gè)東西,queue的池是用于調(diào)度的,初始化url池是存放入口url的,他們都存在redis中,但是使用不同的key來(lái)區(qū)分,就當(dāng)成是不同的表吧),spider使用少量的start url,可以發(fā)展出很多新的url,這些url會(huì)進(jìn)入scheduler進(jìn)行判重和調(diào)度。直到spider跑到調(diào)度池內(nèi)沒(méi)有url的時(shí)候,會(huì)觸發(fā)spider_idle信號(hào),從而觸發(fā)spider的next_requests函數(shù),再次從redis的start url池中讀取一些url。
組件之間的關(guān)系
最后總結(jié)一下scrapy-redis的總體思路:這個(gè)工程通過(guò)重寫scheduler和spider類,實(shí)現(xiàn)了調(diào)度、spider啟動(dòng)和redis的交互。實(shí)現(xiàn)新的dupefilter和queue類,達(dá)到了判重和調(diào)度容器和redis的交互,因?yàn)槊總€(gè)主機(jī)上的爬蟲(chóng)進(jìn)程都訪問(wèn)同一個(gè)redis數(shù)據(jù)庫(kù),所以調(diào)度和判重都統(tǒng)一進(jìn)行統(tǒng)一管理,達(dá)到了分布式爬蟲(chóng)的目的。
當(dāng)spider被初始化時(shí),同時(shí)會(huì)初始化一個(gè)對(duì)應(yīng)的scheduler對(duì)象,這個(gè)調(diào)度器對(duì)象通過(guò)讀取settings,配置好自己的調(diào)度容器queue和判重工具dupefilter。每當(dāng)一個(gè)spider產(chǎn)出一個(gè)request的時(shí)候,scrapy內(nèi)核會(huì)把這個(gè)reuqest遞交給這個(gè)spider對(duì)應(yīng)的scheduler對(duì)象進(jìn)行調(diào)度,scheduler對(duì)象通過(guò)訪問(wèn)redis對(duì)request進(jìn)行判重,如果不重復(fù)就把他添加進(jìn)redis中的調(diào)度池。當(dāng)調(diào)度條件滿足時(shí),scheduler對(duì)象就從redis的調(diào)度池中取出一個(gè)request發(fā)送給spider,讓他爬取。當(dāng)spider爬取的所有暫時(shí)可用url之后,scheduler發(fā)現(xiàn)這個(gè)spider對(duì)應(yīng)的redis的調(diào)度池空了,于是觸發(fā)信號(hào)spider_idle,spider收到這個(gè)信號(hào)之后,直接連接redis讀取strart url池,拿去新的一批url入口,然后再次重復(fù)上邊的工作。
為什么要提供這些組件?
我們先從scrapy的“待爬隊(duì)列”和“Scheduler”入手:玩過(guò)爬蟲(chóng)的同學(xué)都多多少少有些了解,在爬蟲(chóng)爬取過(guò)程當(dāng)中有一個(gè)主要的數(shù)據(jù)結(jié)構(gòu)是“待爬隊(duì)列”,以及能夠操作這個(gè)隊(duì)列的調(diào)度器(也就是Scheduler)。scrapy官方文檔對(duì)這二者的描述不多,基本上沒(méi)提。
scrapy使用什么樣的數(shù)據(jù)結(jié)構(gòu)來(lái)存放待爬取的request呢?其實(shí)沒(méi)用高大上的數(shù)據(jù)結(jié)構(gòu),就是python自帶的collection.deque(改造過(guò)后的),問(wèn)題來(lái)了,該怎么讓兩個(gè)以上的Spider共用這個(gè)deque呢?
scrapy-redis提供了一個(gè)解決方法,把deque換成redis數(shù)據(jù)庫(kù),我們從同一個(gè)redis服務(wù)器存放要爬取的request,這樣就能讓多個(gè)spider去同一個(gè)數(shù)據(jù)庫(kù)里讀取,這樣分布式的主要問(wèn)題就解決了嘛。
那么問(wèn)題又來(lái)了,我們換了redis來(lái)存放隊(duì)列,哪scrapy就能直接分布式了么?。scrapy中跟“待爬隊(duì)列”直接相關(guān)的就是調(diào)度器“Scheduler”,它負(fù)責(zé)對(duì)新的request進(jìn)行入列操作(加入deque),取出下一個(gè)要爬取的request(從deque中取出)等操作。在scrapy中,Scheduler并不是直接就把deque拿來(lái)就粗暴的使用了,而且提供了一個(gè)比較高級(jí)的組織方法,它把待爬隊(duì)列按照優(yōu)先級(jí)建立了一個(gè)字典結(jié)構(gòu),比如:
| 1 2 3 4 5 | { ????priority0:隊(duì)列0 ????priority1:隊(duì)列2 ????priority2:隊(duì)列2 } |
然后根據(jù)request中的priority屬性,來(lái)決定該入哪個(gè)隊(duì)列。而出列時(shí),則按priority較小的優(yōu)先出列。為了管理這個(gè)比較高級(jí)的隊(duì)列字典,Scheduler需要提供一系列的方法。你要是換了redis做隊(duì)列,這個(gè)scrapy下的Scheduler就用不了,所以自己寫一個(gè)吧。于是就出現(xiàn)了scrapy-redis的專用scheduler。
那么既然使用了redis做主要數(shù)據(jù)結(jié)構(gòu),能不能把其他使用自帶數(shù)據(jù)結(jié)構(gòu)關(guān)鍵功能模塊也換掉呢? 在我們爬取過(guò)程當(dāng)中,還有一個(gè)重要的功能模塊,就是request去重。scrapy中是如何實(shí)現(xiàn)這個(gè)去重功能的呢?用集合~scrapy中把已經(jīng)發(fā)送的request指紋放入到一個(gè)集合中,把下一個(gè)request的指紋拿到集合中比對(duì),如果該指紋存在于集合中,說(shuō)明這個(gè)request發(fā)送過(guò)了,如果沒(méi)有則繼續(xù)操作。
為了分布式,把這個(gè)集合也換掉吧,換了redis,照樣也得把去重類給換了。于是就有了scrapy-redis的dupefilter。那么依次類推,接下來(lái)的其他組件(Pipeline和Spider),我們也可以輕松的猜到,他們是為什么要被修改呢。
參考鏈接:
- https://github.com/younghz/scrapy-redis
- https://github.com/younghz/sr-chn
- https://github.com/KDF5000/RSpider
?
轉(zhuǎn)載于:https://www.cnblogs.com/zxtceq/p/8985622.html
總結(jié)
以上是生活随笔為你收集整理的Scrapy-redis实现分布式爬取的过程与原理的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: cesium根据输入高度设置相机came
- 下一篇: shared pool 和buffer