5分钟快速掌握 Python 定时任务框架
APScheduler 簡介
在實際開發中我們經常會碰上一些重復性或周期性的任務,比如像每天定時爬取某個網站的數據、一定周期定時運行代碼訓練模型等,類似這類的任務通常需要我們手動來進行設定或調度,以便其能夠在我們設定好的時間內運行。
在 Windows 上我們可以通過計劃任務來手動實現,而在 Linux 系統上往往我們會用到更多關于 crontab 的相關操作。但手動管理并不是一個很好的選擇,如果我們需要有十幾個不同的定時任務需要管理,那么每次通過人工來進行干預未免有些笨拙,那這時候就真的是「人工智能」了。
所以將這些定時任務的調度代碼化才是能夠讓我們很好地從這種手動管理的純人力操作中解脫出來。
在 Python 生態中對于定時任務的一些操作主要有那么幾個:
schedule:第三方模塊,該模塊適合比較輕量級的一些調度任務,但卻不適用于復雜時間的調度
APScheduler:第三方定時任務框架,是對 Java 第三方定時任務框架?Quartz?的模仿與移植,能提供比?schedule?更復雜的應用場景,并且各種組件都是模塊化,易于使用與二次開發。
Celery Beat:屬于?celery?這分布式任務隊列第三方庫下的一個定時任務組件,如果使用需要配合 RabbitMQ 或 Redis 這類的消息隊列套件,需要花費一定的時間在環境搭建上,但在高版本中已經不支持 Windows。
所以為了滿足能夠相對復雜的時間條件,又不需要在前期的環境搭建上花費很多時間的前提下,選擇?APScheduler?來對我們的調度任務或定時任務進行管理是個性價比極高的選擇。而本文主要會帶你快速上手有關?APScheduler?的使用。
APScheduler 概念與組件
雖然說官方文檔上的內容不是很多,而且所列舉的 API 不是很多,但這側面也反映了這一框架的簡單易用。所以在使用?APScheduler?之前,我們需要對這個框架的一些概念簡單了解,主要有那么以下幾個:
-
觸發器(trigger)
-
任務持久化(job stores)
-
執行器(executor)
-
調度器(scheduler)
觸發器(trigger)
所謂的觸發器就是用以觸發定時任務的組件,在?APScheduler中主要是指時間觸發器,并且主要有三類時間觸發器可供使用:
-
date:日期觸發器。日期觸發器主要是在某一日期時間點上運行任務時調用,是?APScheduler?里面最簡單的一種觸發器。所以通常也適用于一次性的任務或作業調度。
-
interval:間隔觸發器。間隔觸發器是在日期觸發器基礎上擴展了對時間部分,比如時、分、秒、天、周這幾個部分的設定。是我們用以對重復性任務進行設定或調度的一個常用調度器。設定了時間部分之后,從起始日期開始(默認是當前)會按照設定的時間去執行任務。
-
cron:cron?表達式觸發器。cron?表達式觸發器就等價于我們 Linux 上的 crontab,它主要用于更復雜的日期時間進行設定。但需要注意的是,APScheduler?不支持?6 位及以上的 cron 表達式,最多只支持到 5 位。
任務持久化(job stores)
任務持久化主要是用于將設定好的調度任務進行存儲,即便是程序因為意外情況,如斷電、電腦或服務器重啟時,只要重新運行程序時,APScheduler?就會根據對存儲好的調度任務結果進行判斷,如果出現已經過期但未執行的情況會進行相應的操作。
APScheduler?為我們提供了多種持久化任務的途徑,默認是使用memory?也就是內存的形式,但內存并不是持久化最好的方式。最好的方式則是通過像數據庫這樣的載體來將我們的定時任務寫入到磁盤當中,只要磁盤沒有損壞就能將數據給恢復。
APScheduler?支持的且常用的數據庫主要有:
-
sqlalchemy?形式的數據庫,這里就主要是指各種傳統的關系型數據庫,如 MySQL、PostgreSQL、SQLite 等。
-
mongodb?非結構化的 Mongodb 數據庫,該類型數據庫經常用于對非結構化或版結構化數據的存儲或操作,如 JSON。
-
redis?內存數據庫,通常用作數據緩存來使用,當然通過一些主從復制等方式也能實現當中數據的持久化或保存。
通常我們可以在創建?Scheduler?實例時創建,或是單獨為任務指定。配置的方式相對簡單,我們只需要指定對應的數據庫鏈接即可。
執行器(executor)
執行器顧名思義就是執行我們任務的對象,在計算機內通常要么是 CPU 調度任務,要么是單獨維護一個線程來運行任務。所以?APScheduler?里的執行器通常就是?ThreadPoolExecutor?或?ProcessPoolExecutor?這樣的線程池和進程池兩種。
當然如果是和協程或異步相關的任務調度,還可以使用對應的?AsyncIOExecutor、TwistedExecutor?和?GeventExecutor?三種執行器。
調度器(scheduler)
調度器的選擇主要取決于你當前的程序環境以及?APScheduler的用途。根據用途的不同,APScheduler?又提供了以下幾種調度器:
-
BlockingScheduler:阻塞調度器,當程序中沒有任何存在主進程之中運行東西時,就則使用該調度器。
-
BackgroundScheduler:后臺調度器,在不使用后面任何的調度器且希望在應用程序內部運行時的后臺啟動時才進行使用,如當前你已經開啟了一個 Django 或 Flask 服務。
-
AsyncIOScheduler:AsyncIO?調度器,如果代碼是通過?asyncio?模塊進行異步操作,使用該調度器。
-
GeventScheduler:Gevent?調度器,如果代碼是通過?gevent?模塊進行協程操作,使用該調度器
-
TornadoScheduler:Tornado?調度器,在?Tornado?框架中使用
-
TwistedScheduler:Twisted?調度器,在基于?Twisted的框架或應用程序中使用
-
QtScheduler:Qt?調度器,在構建?Qt?應用中進行使用。
通常情況下如果不是和 Web 項目或應用集成共存,那么往往都首選?BlockingScheduler?調度器來進行操作,它會在當前進程中啟動相應的線程來進行任務調度與處理;反之,如果是和 Web 項目或應用共存,那么需要選擇?BackgroundScheduler?調度器,因為它不會干擾當前應用的線程或進程狀況。
基于對以上的概念和組件認識,我們就能基本上摸清?APScheduler?的運行流程:
設定調度器(scheduler)用以對任務的調度與安排進行全局統籌
對相應的函數或方法上設定相應的觸發器(trigger),并添加到調度器中
如有任務持久化(job stores)需要則需要設定對應的持久化層,否則默認使用內存存儲任務
當觸發器被觸發時,就將任務交由執行器(executor)進行執行
APScheduler 快速上手
雖然?APScheduler?里面的概念和組件看起來有點多,但在使用上并不算很復雜,我們可以通過本節的示例就能夠很快使用。
選擇對應的 scheduler
在使用之前我們需要先實例化一個?scheduler?對象,所有的?scheduler?對象都被放在了?apscheduler.schedulers?模塊下,我們可以直接通過查看 API 文檔或者借助 IDE 補全的提示來獲取相應的?scheduler?對象。
這里我直接選取了最基礎的?BlockingScheduler:
#?main.pyfrom?apscheduler.schedulers.blocking?import?BlockingSchedulerscheduler?=?BlockingScheduler()配置 scheduler
對于?scheduler?的一些配置我們可以直接在實例化對象時就進行配置,當然也可以在創建實例化對象之后再進行配置。
實例化時進行參數配置:
#?main.py from?datetime?import?datetimefrom?apscheduler.executors.pool?import?ThreadPoolExecutor from?apscheduler.jobstores.sqlalchemy?import?SQLAlchemyJobStore from?apscheduler.schedulers.blocking?import?BlockingScheduler#?任務持久化?使用?SQLite jobstores?=?{'default':?SQLAlchemyJobStore(url?=?'sqlite:///jobs.db') } #?執行器配置 executors?=?{'default':?ThreadPoolExecutor(20), } #?關于?Job?的相關配置,見官方文檔?API job_defaults?=?{'coalesce':?False,'next_run_time':?datetime.now() } scheduler?=?BlockingScheduler(jobstores?=?jobstores,executors?=?executors,job_defaults?=?job_defaults,timezone?=?'Asia/Shanghai' )或是通過?scheduler.configure?方法進行同樣的操作:
scheduler?=?BlockingScheduler() scheduler.configure(jobstores=jobstores,?executors=executors,?job_defaults=job_defaults,?timezone='Asia/Shanghai')添加并執行你的任務
創建?scheduler?對象之后,我們需要調用其下的?add_job()或是?scheduled_job()?方法來將我們需要執行的函數進行注冊。前者是以傳參的形式指定對應的函數名,而后者則是以裝飾器的形式直接對我們要執行的函數進行修飾。
比如我現在有一個輸出此時此刻時間的函數?now():
from?datetime?import?datetimedef?now(trigger):print(f"trigger:{trigger}?->?{datetime.now()}")然后我打算每 5 秒的時候運行一次,那我們使用?add_job()?可以這樣寫:
if?__name__?==?'__main__':scheduler.add_job(now,?trigger?=?"interval",?args?=?("interval",),?seconds?=?5)scheduler.start()在調用?start()?方法之后調度器就會開始執行,并在控制臺上看到對應的結果了:
trigger:interval?->?2021-01-16?21:19:43.356674 trigger:interval?->?2021-01-16?21:19:46.679849 trigger:interval?->?2021-01-16?21:19:48.356595當然使用?@scheduled_job?的方式來裝飾我們的任務或許會更加自由一些,于是上面的例子就可以寫成這樣:
@scheduler.scheduled_job(trigger?=?"interval",?args?=?("interval",),?seconds?=?5) def?now(trigger):print(f"trigger:{trigger}?->?{datetime.now()}")if?__name__?==?'__main__':scheduler.start()運行之后就會在控制臺看到同樣的結果了。
不過需要注意的是,添加任務一定要在?start()?方法執行前調用,否則會找不到任務或是拋出異常。
將 APScheduler 集成到 Web 項目中
如果你是正在做有關的 Web 項目且存在一些定時任務,那么得益于?APScheduler?由于多樣的調度器,我們能夠將其和我們的項目結合到一起。
如果你正在使用?Flask,那么?Flask-APScheduler?這一別人寫好的第三方包裝庫就很適合你,雖然它沒有相關的文檔,但只要你了解了前面我所介紹的有關于?APScheduler?的概念和組件,你就能很輕易地看懂這個第三方庫倉庫里的示例代碼。
如果你使用的不是 Flask 框架,那么?APScheduler?本身也提供了一些對任務或作業的增刪改查操作,我們可以自己編寫一套合適的 API。
這里我使用的是?FastAPI?這一目前流行的 Web 框架。demo 項目結構如下:
temp-scheduler ├──?config.py???????#?配置項 ├──?main.py?????????#?API?文件 └──?scheduler.py????#?APScheduler?相關設置安裝依賴
這里我們需要的依賴不多,只需要簡單幾個即可:
pip?install?fastapi?apscheduler?sqlalchemy?uvicorn配置項
如果項目中模塊過多,那么使用一個文件或模塊來進行統一管理是最好的選擇。這里的?config.py?我們主要像 Flask 的配置那樣簡單設定:
from?apscheduler.executors.pool?import?ThreadPoolExecutor from?apscheduler.jobstores.sqlalchemy?import?SQLAlchemyJobStore from?apscheduler.schedulers.blocking?import?BlockingSchedulerclass?SchedulerConfig:JOBSTORES?=?{"default":?SQLAlchemyJobStore(url="sqlite:///job.db")}EXECUTORS?=?{"default":?ThreadPoolExecutor(20)}JOB_DEFAULTS?=?{"coalesce":?False}@classmethoddef?to_dict(cls):return?{"jobstores":?cls.JOBSTORES,"executors":?cls.EXECUTORS,"job_defaults":?cls.JOB_DEFAULTS,}在?SchedulerConfig?配置項中我們可以自己實現一個?to_dict()?類方法,以便我們后續傳參時通過解包的方式直接傳入配置參數即可。
Scheduler 相關設置
scheduler.py?模塊的設定也比較簡單,即設定對應的?scheduler?調度器即可。由于是演示 demo 我還將要定期執行的任務也放在了這個模塊當中:
import?logging from?datetime?import?datetimefrom?apscheduler.schedulers.background?import?BackgroundSchedulerfrom?config?import?SchedulerConfigscheduler?=?BackgroundScheduler() logger?=?logging.getLogger(__name__)def?init_scheduler()?->?None:#?config?schedulerscheduler.configure(**SchedulerConfig.to_dict())logger.info("scheduler?is?running...")#?schedule?testscheduler.add_job(func=mytask,trigger="date",args=("APScheduler?Initialize.",),next_run_time=datetime.now(),)scheduler.start()def?mytask(message:?str)?->?None:print(f"[{datetime.now()}]?message:?{message}")在這一部分中:
-
init_scheduler()?方法主要用于在 API 服務啟動時被調用,然后對?scheduler?對象的配置以及測試
-
mytask()?則是我們要定期執行的任務,后續我們可以通過 APScheduler 提供的方法來自行添加任務
API 設置
在?main.py?模塊就主要存放著我們由 FastAPI 所構建的相關 API。如果在后續開發時存在多個接口,此時就需要將不同接口放在不同模塊文件中,以達到路由的分發與管理,類似于 Flask 的藍圖模式。
import?logging import?uuid from?datetime?import?datetime from?typing?import?Any,?Dict,?Optional,?Sequence,?Unionfrom?fastapi?import?FastAPI from?pydantic?import?BaseModelfrom?scheduler?import?init_scheduler,?mytask,?schedulerlogger?=?logging.getLogger(__name__)app?=?FastAPI(title="APScheduler?API") app.add_event_handler("startup",?init_scheduler)class?Job(BaseModel):id:?Union[int,?str,?uuid.UUID]name:?Optional[str]?=?Nonefunc:?Optional[str]?=?Noneargs:?Optional[Sequence[Optional[str]]]?=?Nonekwargs:?Optional[Dict[str,?Any]]?=?Noneexecutor:?Optional[str]?=?Nonemisfire_grace_time:?Optional[str]?=?Nonecoalesce:?Optional[bool]?=?Nonemax_instances:?Optional[int]?=?Nonenext_run_time:?Optional[Union[str,?datetime]]?=?None@app.post("/add") def?add_job(message:?str,trigger:?str,trigger_args:?Optional[dict],id:?Union[str,?int,?uuid.UUID], ):try:scheduler.add_job(func=mytask,trigger=trigger,kwargs={"message":?message},id=id,**trigger_args,)except?Exception?as?e:logger.exception(e.args)return?{"status_code":?0,?"message":?"添加失敗"}return?{"status_code":?1,?"message":?"添加成功"}@app.delete("/delete/{id}") def?delete_job(id:?Union[str,?int,?uuid.UUID]):"""delete?exist?job?by?id"""try:scheduler.remove_job(job_id=id)except?Exception:return?dict(message="刪除失敗",status_code=0,)return?dict(message="刪除成功",status_code=1,)@app.put("/reschedule/{id}") def?reschedule_job(id:?Union[str,?int,?uuid.UUID],?trigger:?str,?trigger_args:?Optional[dict] ):try:scheduler.reschedule_job(job_id=id,?trigger=trigger,?**trigger_args)except?Exception?as?e:logger.exception(e.args)return?dict(message="修改失敗",status_code=0,)return?dict(message="修改成功",status_code=1,)@app.get("/job") def?get_all_jobs():jobs?=?Nonetry:job_list?=?scheduler.get_jobs()if?job_list:jobs?=?[Job(**task.__getstate__())?for?task?in?job_list]except?Exception?as?e:logger.exception(e.args)return?dict(message="查詢失敗",status_code=0,jobs=jobs,)return?dict(message="查詢成功",status_code=1,jobs=jobs,)@app.get("/job/{id}") def?get_job_by_id(id:?Union[int,?str,?uuid.UUID]):jobs?=?[]try:job?=?scheduler.get_job(job_id=id)if?job:jobs?=?[Job(**job.__getstate__())]except?Exception?as?e:logger.exception(e.args)return?dict(message="查詢失敗",status_code=0,jobs=jobs,)return?dict(message="查詢成功",status_code=1,jobs=jobs,)以上代碼看起來很多,其實核心的就那么幾點:
FastAPI 對象?app?的初始化。這里用到的?add_event_handler()?方法就有點像 Flask 中的?before_first_request,會在 Web 服務請求伊始進行操作,理解為初始化相關的操作即可。
API 接口路由。路由通過?app?對象下的對應 HTTP 方法來實現,如?GET、POST、PUT?等。這里的裝飾器用法其實也和 Flask 很類似,就不多贅述。
scheduler?對象的增刪改查。從?scheduler.py?模塊中引入我們創建好的?scheduler?對象之后就可以直接用來做增刪改查的操作:
增:使用?add_job()?方法,其主要的參數是要運行的函數(或方法)、觸發器以及觸發器參數等
刪:使用?delete_job()?方法,我們需要傳入一個對應任務的?id?參數,用以能夠查找到對應的任務
改:使用?reschedule_job()?方法,這里也需要一個對應任務的?id?參數,以及需要重新修改的觸發器及其參數
查:使用?get_jobs()?和?get_job()?兩個方法,前者是直接獲取到當前調度的所有任務,返回的是一個包含了?APScheduler.job.Job?對象的列表,而后者是通過?id?參數來查找對應的任務對象;這里我通過底層源碼使用?__getstate__()?來獲取到任務的相關信息,這些信息我們通過事先設定好的?Job?對象來對其進行序列化,最后將信息從接口中返回。
總結
以上是生活随笔為你收集整理的5分钟快速掌握 Python 定时任务框架的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 处理项目重大质量问题的思路和原则
- 下一篇: 如何做好技术 Team Leader