容器中apscheduler不执行_APScheduler:定时任务框架
APScheduler:定時任務框架
安裝
文檔: https://apscheduler.readthedocs.io/en/stable/userguide.html
安裝
$?pip?install?apscheduler>>>?import?apscheduler
>>>?apscheduler.version
'3.6.3'
組件
APScheduler由一下四部分組成
- triggers:觸發器,指定定時任務執行的時機,每個任務都有自己的觸發器.
- job stores:存儲器,持久存儲,默認存儲在內存中.
- executors:執行器,在定時任務執行時,以進程或線程方式執行
- scheduler:調度器,包含BackgroundScheduler(后臺運行)和BlockingScheduler(阻塞運行).他會合理安排作業存儲器,執行器,觸發器進行工作.并進行添加和刪除任務等.調度器通常是只有一個的,開發人員很少直接操作觸發器,存儲器,執行器等.因為這些都由調度器自動來實現了.
觸發器(triggers)
1.date在特定時間執行
示例:
from?datetime?import?date,?datetimefrom?apscheduler.schedulers.blocking?import?BlockingScheduler
sched?=?BlockingScheduler()
def?my_job(text):
???print(text)
#?run_date?接受?date,?datetime?數據類型
sched.add_job(my_job,?'date',?run_date=datetime(2020,?11,?27,?19,?33,?30),?args=['text'])
sched.start()
更多:https://apscheduler.readthedocs.io/en/stable/modules/triggers/date.html
2.interval間隔執行
在固定的時間間隔后觸發事件.參數如下:
| days | 一個月中的第幾天,整形 |
| hours | 時,整形 |
| minutes | 分,整形 |
| seconds | 秒,整形 |
| start_date | 起始時間 |
| end_date | 結束時間 |
| jitter | 觸發的時間誤差 |
from?datetime?import?datetime
sched?=?BlockingScheduler()
def?job_funciton():
???print('hello?world')
sched.add_job(job_funciton,?trigger='interval',seconds=5)
#?指定小時
#?sched.add_job(job_funciton,?trigger='interval',?hours=2)
#?指定開始,結束時間
#?sched.add_job(job_funciton,?trigger='interval',?start_date='2020-11-27?20:30:00',?end_date='2010-11-27?21:30:00)
sched.start()?
3.crontab
在某個確切的時間周期性觸發事件.
| week | 1-53周 | day_of_week | 一周中的第幾天(0/Monday) | hour | 0-23 |
| minute | 0-59 | second | 0-59 | start_date | datetime數據類型,或者字符串類型 |
| end_date | 結束時間 | timezone | 時區 | jitter | 觸發的誤差時間 |
也可以用表達式類型,可以用以下方式:
| * | 任何 | 在每個值都觸發 |
| */a | 任何 | 每隔 a觸發一次 |
| a-b | 任何 | 在 a-b區間內任何一個時間觸發( a必須小于 b) |
| a-b/c | 任何 | 在 a-b區間內每隔 c觸發一次 |
| xth y | day | 第 x個星期 y觸發 |
| lastx | day | 最后一個星期 x觸發 |
| last | day | 一個月中的最后一天觸發 |
| x,y,z | 任何 | 可以把上面的表達式進行組合 |
def?job_function():
???print?"Hello?World"
sched?=?BlockingScheduler()
#?Schedules?job_function?to?be?run?on?the?third?Friday
#?of?June,?July,?August,?November?and?December?at?00:00,?01:00,?02:00?and?03:00
sched.add_job(job_function,?'cron',?month='6-8,11-12',?day='3rd?fri',?hour='0-3')
sched.start()
調度器(schedulers)
任務存儲器(job stores)
有2中方式,一種是加載在內存中(默認配置),一種是使用數據庫.使用內存簡單高效,但是程序出現問題,從新運行,會把以前的任務從新再執行一次.數據庫則可以在中斷的地方恢復正常使用.
- MemoryJobStore:使用內存
- MongoDBJobStore:使用mongodb
- RedisJobStore:使用redis
- SQLAlchemy:使用SQLAlchemy框架
1.RedisJobStore
RedisJobStore(db=0,?jobs_key='apscheduler.jobs',?run_times_key='apscheduler.run_times',?pickle_protocol=pickle.HIGHEST_PROTOCOL,?**connect_args)有2種創建的方法:
- add_jobstore:需要指定redis的相關參數.
import?sys
import?os
from?apscheduler.schedulers.blocking?import?BlockingScheduler
from?apscheduler.jobstores.redis?import?RedisJobStore
def?alarm(time):
????print('Alarm!?This?alarm?was?scheduled?at?%s.'?%?time)
if?__name__?==?'__main__':
????scheduler?=?BlockingScheduler()
????scheduler.add_jobstore('redis',?jobs_key='example.jobs',?run_times_key='example.run_times',?host='192.168.0.101',?port=6379,?db=0)
????if?len(sys.argv)?>?1?and?sys.argv[1]?==?'--clear':
????????scheduler.remove_all_jobs()
????alarm_time?=?datetime.now()?+?timedelta(seconds=300)
????scheduler.add_job(alarm,?'date',?run_date=alarm_time,?args=[datetime.now()])
????print('To?clear?the?alarms,?run?this?example?with?the?--clear?argument.')
????print('Press?Ctrl+{0}?to?exit'.format('Break'?if?os.name?==?'nt'?else?'C'))
????try:
????????scheduler.start()
????except?(KeyboardInterrupt,?SystemExit):
????????pass
- RedisJobStore
from?apscheduler.jobstores.redis?import?RedisJobStore
from?datetime?import?datetime,?timedelta
jobstore?=?{
????'default'?:?RedisJobStore(db=0,?jobs_key='myfunc',?run_times_key='myfunc_time',?host='192.168.0.101',?port=6379)
}
def?my_func(t):
????print('hello?world,?%s'?%t)
if?__name__?==?'__main__':
????sched?=?BlockingScheduler(jobstores=jobstore)
????alarm_time?=?datetime.now()?+?timedelta(seconds=300)
????sched.add_job(my_func,run_date=alarm_time,?args=['ning'])
????sched.start()
均可在redis中查詢到數據.
2.SQLAlchemy
使用ORM框架,演示使用MySql
from?apscheduler.schedulers.blocking?import?BlockingSchedulerfrom?datetime?import?datetime,?timedelta
def?my_func(t):
???print('hello?%s'?%t)
if?__name__?==?"__main__":
???sched?=?BlockingScheduler()
???url?=?'mysql+pymysql://root:2008.Cn123@192.168.0.101:3306/test'
???sched.add_jobstore('sqlalchemy',?url=url,tablename='api_job')
???alarm_time?=?datetime.now()?+?timedelta(seconds=300)
???sched.add_job(my_func,run_date=alarm_time,?args=['ning'])
???sched.start()
如果表不存在,會自動創建表.tablename用于指定表的名稱.
在數據庫中可以查看到表
select?*?from?api_job;10336
執行器executors
執行器取決于應用場景,默認是ThreadPoolExecutor,它可以滿足大部分需求.如果是CPU密集型計算,可以選擇ProcessPoolExecutor
class?apscheduler.executors.pool.ThreadPoolExecutor(max_workers=10)class?apscheduler.executors.pool.ProcessPoolExecutor(max_workers=10)#?max_worker?指定最多使用線程/進程from?apscheduler.schedulers.background?import?BackgroundScheduler
from?apscheduler.executors.pool?import?ThreadPoolExecutor
executors?=?{
???'default':?ThreadPoolExecutor(20),
}
conf?=?{?#?redis配置
???"host":127.0.0.1,
???"port":6379,
???"db":15,?#?連接15號數據庫
???"max_connections":10?#?redis最大支持300個連接數
}
scheduler?=?BackgroundScheduler(executors=executors)
scheduler.add_jobstore(jobstore='redis',?**conf)?#?添加任務持久化存儲方式,如果未安裝redis可省略此步驟
任務操作
- add_job(func, id='xxx', args=None, kwargs=None)添加任務
#??tree?-L?2
#├──?func
#│???├──?add_func.py
#│???├──?__init__.py
#└──?jobs.py
#?add_func.py
def?add(x,y):
????print(x+y)
????
#?jobs.py
from?apscheduler.schedulers.blocking?import?BlockingScheduler
from?func.add_func?import?add
shced?=?BlockingScheduler()
if?__name__?==?"__main__":
????shced.add_job('func.add_func:add',?args=[1,2],?id='job1')
????shced.start()???
除去使用add_job(),還可以使用裝飾器函數scheduled_job來添加任務.
- remove_job(job_id):刪除任務,需要指定job_id
- pause_job(job_id):暫停任務
- resume_job(job_id):恢復任務
- modify_job(job_id, **changes):修改任務屬性
- print_jobs():作業信息
job?=?scheduler.add_job(myfunc,?'interval',?minutes=2)??#?添加任務
job.remove()??#?刪除任務
job.pause()?#?暫定任務
job.resume()??#?恢復任務
#?方法2
scheduler.add_job(myfunc,?'interval',?minutes=2,?id='my_job_id')??#?添加任務????
scheduler.remove_job('my_job_id')??#?刪除任務
scheduler.pause_job('my_job_id')??#?暫定任務
scheduler.resume_job('my_job_id')??#?恢復任務
示例
方法1
from?pytz?import?utcfrom?datetime?import?datetime
from?apscheduler.schedulers.background?import?BackgroundScheduler
from?apscheduler.jobstores.mongodb?import?MongoDBJobStore
from?apscheduler.jobstores.sqlalchemy?import?SQLAlchemyJobStore
from?apscheduler.executors.pool?import?ThreadPoolExecutor,?ProcessPoolExecutor
def?tick():
???print('Tick!?The?time?is:?%s'?%?datetime.now())
#?選擇MongoDB作為任務存儲數據庫
jobstores?=?{
???'mongo':?MongoDBJobStore(),
???'default':?SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
#?默認使用線程池
executors?=?{
???'default':?ThreadPoolExecutor(20),
???'processpool':?ProcessPoolExecutor(5)
}
#?默認參數配置
job_defaults?=?{
???'coalesce':?False,??#?積攢的任務是否只跑一次,是否合并所有錯過的Job
???'max_instances':?3,??#?默認同一時刻只能有一個實例運行,通過max_instances=3修改為3個。
???'misfire_grace_time':?30??#?30秒的任務超時容錯
}
scheduler?=?BackgroundScheduler(jobstores=jobstores,?executors=executors,?job_defaults=job_defaults,?timezone=utc)
scheduler.add_job(tick,?'interval',?seconds=3)
scheduler.start()
方法2
from?apscheduler.schedulers.background?import?BackgroundScheduler#?The?"apscheduler."?prefix?is?hard?coded
scheduler?=?BackgroundScheduler({
???'apscheduler.jobstores.mongo':?{
????????'type':?'mongodb'
???},
???'apscheduler.jobstores.default':?{
???????'type':?'sqlalchemy',
???????'url':?'sqlite:///jobs.sqlite'
???},
???'apscheduler.executors.default':?{
???????'class':?'apscheduler.executors.pool:ThreadPoolExecutor',
???????'max_workers':?'20'
???},
???'apscheduler.executors.processpool':?{
???????'type':?'processpool',
???????'max_workers':?'5'
???},
???'apscheduler.job_defaults.coalesce':?'false',
???'apscheduler.job_defaults.max_instances':?'3',
???'apscheduler.timezone':?'UTC',
})
方法3
from?pytz?import?utcfrom?apscheduler.schedulers.background?import?BackgroundScheduler
from?apscheduler.jobstores.sqlalchemy?import?SQLAlchemyJobStore
from?apscheduler.executors.pool?import?ProcessPoolExecutor
jobstores?=?{
???'mongo':?{'type':?'mongodb'},
???'default':?SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors?=?{
???'default':?{'type':?'threadpool',?'max_workers':?20},
???'processpool':?ProcessPoolExecutor(max_workers=5)
}
job_defaults?=?{
???'coalesce':?False,
???'max_instances':?3
}
scheduler?=?BackgroundScheduler()
#?..這里可以添加任務
scheduler.configure(jobstores=jobstores,?executors=executors,?job_defaults=job_defaults,?timezone=utc)
misfire_grace_time:如果一個job本來14:00有一次執行,但是由于某種原因沒有被調度上,現在14:01了,這個14:00的運行實例被提交時,會檢查它預訂運行的時間和當下時間的差值(這里是1分鐘),大于我們設置的30秒限制,那么這個運行實例不會被執行。合并:最常見的情形是scheduler被shutdown后重啟,某個任務會積攢了好幾次沒執行如5次,下次這個job被submit給executor時,執行5次。將coalesce=True后,只會執行一次
replace_existing: 如果在程序初始化時,是從數據庫讀取任務的,那么必須為每個任務定義一個明確的ID,并且使用replace_existing=True,否則每次重啟程序,你都會得到一份新的任務拷貝,也就意味著任務的狀態不會保存。
總結
以上是生活随笔為你收集整理的容器中apscheduler不执行_APScheduler:定时任务框架的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 澳大利亚元属于外汇吗
- 下一篇: android 首页广告显示不出来的,a