python 多线程 全站小说_多线程下载小说
引言
不知道說啥子, 直接進入正題,哦,本人實測,一本一萬多章的小說大概四分鐘能爬完,在此想說(小說)網站牛逼,不像學校的教務系統,說多了都是淚
多線程
爬取屬于io密集型的,所以引入多線程
redis
用redis分發任務以及數據暫存,所以想用的得去下個redis,嘻嘻嘻,其實之前還用了mongodb,為了減少麻煩,就縮減為redis了
redis windows安裝 需要開著redis-server,到了啟動redis-server那步即可停止
代理
質量不錯就用,免費的去一邊(自己試了些免費,不咋行)
代碼
'''
安裝redis 開啟server
采用redis 數據太多好像一次取不出來 試了一個一萬多章的,一次只取出了9000多章, 現在分批次取
所以大概需要的值有兩個
1. 所需要爬取小說的目錄頁面的鏈接
2. 第一章的開頭位置, 因為有些可能前六章是最新章節 默認:0
3.有問題再聯系,小說網站總體差不多(網頁排版)應該能適用幾個網站吧吧吧吧
'''
import random
import redis
# import pymongo
import requests
import json
import logging
import threading
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor
# from Proxyer import Proxyer
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
class BookDownload:
def __init__(self, chapter_link, start_place=0, abbr='http://', proxy=None):
self.chapter_link = chapter_link # 目錄頁面的鏈接
self.start_place = start_place # 第一章在目錄中的位置
self.abbr_link = chapter_link # 前綴 鏈接的前一部分 做了處理 不需要再傳了
self.redis_client = redis.Redis()
self.event = threading.Event()
self.redis_list = 'url_info'
self.redis_failed_list = 'failed_url'
self.redis_cache = 'download'
self._proxy = proxy
# self.mongo_collect = pymongo.MongoClient().chapter_3.test3 # mongodb,自己設置
self.all_chapter = 0
self.successed_download = 0
self.session = requests.session()
self.header = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.36'}
@property
def proxy(self):
# 代理質量好就用代理 免費代理不推薦(本機就行) 測試的這個網站不咋封ip
# 要求傳遞list or None
if self._proxy:
choosed_proxy = random.choice(self._proxy)
return {'http': 'http:' + choosed_proxy}
return None
@proxy.setter
def proxy(self, value):
if isinstance(value, list) or value is None:
self._proxy = value
else:
raise ValueError('must be list type or None')
def get_all_chapter(self):
res = self.session.get(self.chapter_link, headers=self.header, timeout=5)
if res.status_code != 200:
raise Exception("can't access the website")
soup = BeautifulSoup(res.content.decode(), 'lxml')
name_list = soup.find('div', attrs={'id': 'list'})
dl_list = name_list.find('dl')
wanted_download = dl_list.find_all('dd')[self.start_place:]
self.all_chapter = len(wanted_download)
for order, value in enumerate(wanted_download):
yield order, value.a.get('href').rsplit('/')[-1], value.a.text
def store_name_in_redis(self):
"""
直接調用get_all_chapter
:return:
"""
for info in self.get_all_chapter():
try:
self.redis_client.rpush(self.redis_list, json.dumps(info))
except Exception as e:
logging.info(e)
def requests_one_link(self, detail_link, timeout):
"""
直接解析了
:param detail_link:
:return: 正文內容
"""
try:
res = self.session.get(detail_link, proxies=self.proxy, headers=self.header, timeout=timeout)
text = res.content.decode()
soup = BeautifulSoup(text, 'lxml')
zhengwen = soup.find('div', attrs={'id': 'content'}).text.replace(r'
', '\n')
return zhengwen
except Exception as e:
# raise e
return None
def _clear_redis(self):
"""
清除redis
:return:
"""
try:
self.redis_client.delete(self.redis_list)
self.redis_client.delete(self.redis_failed_list)
self.redis_client.delete(self.redis_cache)
# self.redis_client.lpop(self.redis_cache)
except Exception as e:
pass
def init_work(self):
self._clear_redis()
def get_url(self, name):
"""
從redis中獲取url信息
:return:
"""
burl_info = self.redis_client.lpop(name)
if burl_info:
url_info = json.loads(burl_info)
order, after_link, name = url_info
return order, after_link, name
return None
def handle(self, order, after_link, name, timeout=2):
"""
成功不管,失敗返回信息扔進failed隊列
:param order:
:param after_link:
:param name:
:return:
"""
content = self.requests_one_link(self.abbr_link + after_link, timeout)
if content:
keys = name + '\n' + content
self.redis_client.zadd(self.redis_cache, {keys: order})
logging.info('sucess download {}'.format(name))
self.successed_download += 1
# self.mongo_collect.insert_one({'order': order, 'name': name, 'content': content})
# logging.info('sucess download {}'.format(name))
# self.successed_download += 1
return None
else:
logging.info('failed to download {}'.format(name))
return order, after_link, name
def _callback(self, futures):
"""
回調函數
:param futures:
:return:
"""
res = futures.result()
if res:
try:
self.redis_client.rpush(self.redis_failed_list, json.dumps(res))
except Exception as e:
logging.info(e)
def start_download(self, Pool: ThreadPoolExecutor):
while True:
info = self.get_url(self.redis_list)
if info:
futures = Pool.submit(self.handle, *info)
futures.add_done_callback(self._callback)
else:
break
self.event.set()
Pool.shutdown()
def failed_download(self):
"""
對第一次失敗的進行下載
最多嘗試三次
:return:
"""
if self.event.wait():
while True:
info = self.get_url(self.redis_failed_list)
if info:
try_times = 3
while try_times:
if not self.handle(*info, timeout=3):
break
try_times -= 1
else:
break
logging.info("=============end download==============")
logging.info("===all chapter {}=== success download {}=====".format(self.all_chapter, self.successed_download))
def start_failed_download(self):
thread = threading.Thread(target=self.failed_download)
thread.start()
thread.join()
def store_txt(self):
txt = 'download.txt'
# count = self.redis_client.zcard(self.redis_cache)
while self.redis_client.zcard(self.redis_cache):
content = ''
for x in self.redis_client.zrange(self.redis_cache, 0, 1000):
content += x.decode() + '\n'
with open(txt, 'a+',encoding='utf8') as f:
f.write(content)
self.redis_client.zremrangebyrank(self.redis_cache, 0, 1000)
if __name__ == '__main__':
Pool = ThreadPoolExecutor(15)
bookdownload = BookDownload('http://www.xbiquge.la/54/54101/', 0)
bookdownload.init_work()
bookdownload.store_name_in_redis()
logging.info('=======================start================================')
bookdownload.start_download(Pool)
bookdownload.start_failed_download()
bookdownload.store_txt()
###測試結果
原文鏈接:https://blog.csdn.net/qq_45667109/article/details/106041255
總結
以上是生活随笔為你收集整理的python 多线程 全站小说_多线程下载小说的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: mysql error.log 权限_d
- 下一篇: 深入理解python特性_深入理解Pyt