python apscheduler执行_Python下定时任务框架APScheduler的使用
今天準備實現一個功能需要用到定時執行任務,所以就看到了Python的一個定時任務框架APScheduler,試了一下感覺還不錯。
1.APScheduler簡介:
APScheduler是Python的一個定時任務框架,可以很方便的滿足用戶定時執行或者周期執行任務的需求,它提供了基于日期date、固定時間間隔interval 、以及類似于Linux上的定時任務crontab類型的定時任務。并且該框架不僅可以添加、刪除定時任務,還可以將任務存儲到數據庫中,實現任務的持久化,所以使用起來非常方便。
2.APScheduler安裝:
APScheduler的安裝相對來說也非常簡單,可以直接利用pip安裝,如果沒有pip可以下載源碼,利用源碼安裝。
1).利用pip安裝:(推薦)#?pip?install?apscheduler#?python?setup.py?install
3.基本概念
APScheduler有四種組件及相關說明:
1) triggers(觸發器):觸發器包含調度邏輯,每一個作業有它自己的觸發器,用于決定接下來哪一個作業會運行,除了他們自己初始化配置外,觸發器完全是無狀態的。
2)job stores(作業存儲):用來存儲被調度的作業,默認的作業存儲器是簡單地把作業任務保存在內存中,其它作業存儲器可以將任務作業保存到各種數據庫中,支持MongoDB、Redis、SQLAlchemy存儲方式。當對作業任務進行持久化存儲的時候,作業的數據將被序列化,重新讀取作業時在反序列化。
3) executors(執行器):執行器用來執行定時任務,只是將需要執行的任務放在新的線程或者線程池中運行。當作業任務完成時,執行器將會通知調度器。對于執行器,默認情況下選擇ThreadPoolExecutor就可以了,但是如果涉及到一下特殊任務如比較消耗CPU的任務則可以選擇ProcessPoolExecutor,當然根據根據實際需求可以同時使用兩種執行器。
4) schedulers(調度器):調度器是將其它部分聯系在一起,一般在應用程序中只有一個調度器,應用開發者不會直接操作觸發器、任務存儲以及執行器,相反調度器提供了處理的接口。通過調度器完成任務的存儲以及執行器的配置操作,如可以添加。修改、移除任務作業。
APScheduler提供了多種調度器,可以根據具體需求來選擇合適的調度器,常用的調度器有:
BlockingScheduler:適合于只在進程中運行單個任務的情況,通常在調度器是你唯一要運行的東西時使用。
BackgroundScheduler: 適合于要求任何在程序后臺運行的情況,當希望調度器在應用后臺執行時使用。
AsyncIOScheduler:適合于使用asyncio框架的情況
GeventScheduler: 適合于使用gevent框架的情況
TornadoScheduler: 適合于使用Tornado框架的應用
TwistedScheduler: 適合使用Twisted框架的應用
QtScheduler: 適合使用QT的情況
4.配置調度器
APScheduler提供了許多不同的方式來配置調度器,你可以使用一個配置字典或者作為參數關鍵字的方式傳入。你也可以先創建調度器,再配置和添加作業,這樣你可以在不同的環境中得到更大的靈活性。
1)下面一個簡單的示例:import?time
from?apscheduler.schedulers.blocking?import?BlockingScheduler
def?test_job():
print?time.strftime('%Y-%m-%d?%H:%M:%S',?time.localtime(time.time()))
scheduler?=?BlockingScheduler()
'''
#該示例代碼生成了一個BlockingScheduler調度器,使用了默認的默認的任務存儲MemoryJobStore,以及默認的執行器ThreadPoolExecutor,并且最大線程數為10。
'''
scheduler.add_job(test_job,?'interval',?seconds=5,?id='test_job')
'''
#該示例中的定時任務采用固定時間間隔(interval)的方式,每隔5秒鐘執行一次。
#并且還為該任務設置了一個任務id
'''
scheduler.start()
2)如果想執行一些復雜任務,如上邊所說的同時使用兩種執行器,或者使用多種任務存儲方式,并且需要根據具體情況對任務的一些默認參數進行調整??梢詤⒖枷旅娴姆绞?。(http://apscheduler.readthedocs.io/en/latest/userguide.html)
第一種方式:from?pytz?import?utc
from?apscheduler.schedulers.background?import?BackgroundScheduler
from?apscheduler.jobstores.mongodb?import?MongoDBJobStore
from?apscheduler.jobstores.sqlalchemy?import?SQLAlchemyJobStore
from?apscheduler.executors.pool?import?ThreadPoolExecutor,?ProcessPoolExecutor
jobstores?=?{
'mongo':?MongoDBJobStore(),
'default':?SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors?=?{
'default':?ThreadPoolExecutor(20),
'processpool':?ProcessPoolExecutor(5)
}
job_defaults?=?{
'coalesce':?False,
'max_instances':?3
}
scheduler?=?BackgroundScheduler(jobstores=jobstores,?executors=executors,?job_defaults=job_defaults,?timezone=utc)
第二種方式:from?apscheduler.schedulers.background?import?BackgroundScheduler
#?The?"apscheduler."?prefix?is?hard?coded
scheduler?=?BackgroundScheduler({
'apscheduler.jobstores.mongo':?{
'type':?'mongodb'
},
'apscheduler.jobstores.default':?{
'type':?'sqlalchemy',
'url':?'sqlite:///jobs.sqlite'
},
'apscheduler.executors.default':?{
'class':?'apscheduler.executors.pool:ThreadPoolExecutor',
'max_workers':?'20'
},
'apscheduler.executors.processpool':?{
'type':?'processpool',
'max_workers':?'5'
},
'apscheduler.job_defaults.coalesce':?'false',
'apscheduler.job_defaults.max_instances':?'3',
'apscheduler.timezone':?'UTC',
})
第三種方式:from?pytz?import?utc
from?apscheduler.schedulers.background?import?BackgroundScheduler
from?apscheduler.jobstores.sqlalchemy?import?SQLAlchemyJobStore
from?apscheduler.executors.pool?import?ProcessPoolExecutor
jobstores?=?{
'mongo':?{'type':?'mongodb'},
'default':?SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors?=?{
'default':?{'type':?'threadpool',?'max_workers':?20},
'processpool':?ProcessPoolExecutor(max_workers=5)
}
job_defaults?=?{
'coalesce':?False,
'max_instances':?3
}
scheduler?=?BackgroundScheduler()
#?..?do?something?else?here,?maybe?add?jobs?etc.
scheduler.configure(jobstores=jobstores,?executors=executors,?job_defaults=job_defaults,?timezone=utc)
5.對任務作業的基本操作:
1).添加作業有兩種方式:第一種可以直接調用add_job(),第二種使用scheduled_job()修飾器。
而add_job()是使用最多的,它可以返回一個apscheduler.job.Job實例,因而可以對它進行修改或者刪除,而使用修飾器添加的任務添加之后就不能進行修改。
例如使用add_job()添加作業:#!/usr/bin/env?python
#-*-?coding:UTF-8
import?time
import?datetime
from?apscheduler.schedulers.blocking?import?BlockingScheduler
def?job1(f):
print?time.strftime('%Y-%m-%d?%H:%M:%S',?time.localtime(time.time())),?f
def?job2(arg1,?args2,?f):
print?f,?args1,?args2
def?job3(**args):
print?args
'''
APScheduler支持以下三種定時任務:
cron:?crontab類型任務
interval:?固定時間間隔任務
date:?基于日期時間的一次性任務
'''
scheduler?=?BlockingScheduler()
#循環任務示例
scheduler.add_job(job1,?'interval',?seconds=5,?args=('循環',),?id='test_job1')
#定時任務示例
scheduler.add_job(job1,?'cron',?second='*/5',?args=('定時',),?id='test_job2')
#一次性任務示例
scheduler.add_job(job1,?next_run_time=(datetime.datetime.now()?+?datetime.timedelta(seconds=10)),?args=('一次',),?id='test_job3')
'''
傳遞參數的方式有元組(tuple)、列表(list)、字典(dict)
注意:不過需要注意采用元組傳遞參數時后邊需要多加一個逗號
'''
#基于list
scheduler.add_job(job2,?'interval',?seconds=5,?args=['a','b','list'],?id='test_job4')
#基于tuple
scheduler.add_job(job2,?'interval',?seconds=5,?args=('a','b','tuple',),?id='test_job5')
#基于dict
scheduler.add_job(job3,?'interval',?seconds=5,?kwargs={'f':'dict',?'a':1,'b':2},?id='test_job7')
print?scheduler.get_jobs()
scheduler.start()
#帶有參數的示例
scheduler.add_job(job2,?'interval',?seconds=5,?args=['a','b'],?id='test_job4')
scheduler.add_job(job2,?'interval',?seconds=5,?args=('a','b',),?id='test_job5')
scheduler.add_job(job3,?'interval',?seconds=5,?kwargs={'a':1,'b':2},?id='test_job6')
print?scheduler.get_jobs()
scheduler.start()
或者使用scheduled_job()修飾器來添加作業:@sched.scheduled_job('cron',?second='*/5'?,id='my_job_id',)
def?test_task():
print("Hello?world!")
2).獲得任務列表:
可以通過get_jobs方法來獲取當前的任務列表,也可以通過get_job()來根據job_id來獲得某個任務的信息。并且apscheduler還提供了一個print_jobs()方法來打印格式化的任務列表。
例如:scheduler.add_job(my_job,?'interval',?seconds=5,?id='my_job_id'?name='test_job')
print?scheduler.get_job('my_job_id')
print?scheduler.get_jobs()
3).修改任務:
修改任務任務的屬性可以使用apscheduler.job.Job.modify()或者modify_job()方法,可以修改除了id的其它任何屬性。
例如:job?=?scheduler.add_job(my_job,?'interval',?seconds=5,?id='my_job'?name='test_job')
job.modify(max_instances=5,?name='my_job')
4).刪除任務:
刪除調度器中的任務有可以用remove_job()根據job ID來刪除指定任務或者使用remove(),如果使用remove()需要事先保存在添加任務時返回的實例對象,任務刪除后就不會在執行。
注意:通過scheduled_job()添加的任務只能使用remove_job()進行刪除。
例如:job?=?scheduler.add_job(my_job,?'interval',?seconds=5,?id='my_job_id'?name='test_job')
job.remove()
或者scheduler.add_job(my_job,?'interval',?seconds=5,?id='my_job_id'?name='test_job')
scheduler.remove_job('my_job')
5).暫停與恢復任務:
暫停與恢復任務可以直接操作任務實例或者調度器來實現。當任務暫停時,它的運行時間會被重置,暫停期間不會計算時間。
暫停任務:apscheduler.job.Job.pause()
apscheduler.schedulers.base.BaseScheduler.pause_job()
恢復任務apscheduler.job.Job.resume()
apscheduler.schedulers.BaseScheduler.resume_job()
6).啟動調度器
可以使用start()方法啟動調度器,BlockingScheduler需要在初始化之后才能執行start(),對于其他的Scheduler,調用start()方法都會直接返回,然后可以繼續執行后面的初始化操作。
例如:from?apscheduler.schedulers.blocking?import?BlockingScheduler
def?my_job():
print?"Hello?world!"
scheduler?=?BlockingScheduler()
scheduler.add_job(my_job,?'interval',?seconds=5)
scheduler.start()
7).關閉調度器:
使用下邊方法關閉調度器:scheduler.shutdown()
默認情況下調度器會關閉它的任務存儲和執行器,并等待所有正在執行的任務完成,如果不想等待,可以進行如下操作:scheduler.shutdown(wait=False)
注意:
當出現No handlers could be found for logger “apscheduler.scheduler”次錯誤信息時,說明沒有
logging模塊的logger存在,所以需要添加上,對應新增內容如下所示(僅供參考):import?logging
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s?%(filename)s[line:%(lineno)d]?%(levelname)s?%(message)s',
datafmt='%a,?%d?%b?%Y?%H:%M:%S',
filename='/var/log/aaa.txt',
filemode='a')
總結
以上是生活随笔為你收集整理的python apscheduler执行_Python下定时任务框架APScheduler的使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ps如何给证件照换底色(PS如何给证件照
- 下一篇: python画二次函数图像的顶点_画二次