celery基本使用
目錄一、celery介紹二、celery架構(gòu)消息中間件任務(wù)執(zhí)行單元任務(wù)結(jié)果存儲使用場景三、celery使用方式一:在一個文件夾內(nèi)的三個頁面方式二:worker單獨做一個項目文件,添加任務(wù)和獲取結(jié)果分離出來(執(zhí)行異步任務(wù))執(zhí)行延遲任務(wù)添加定時任務(wù)四、django中配置celery
一、celery介紹
Celery 官網(wǎng):http://www.celeryproject.org/
Celery 官方文檔英文版:http://docs.celeryproject.org/en/latest/index.html
Celery 官方文檔中文版:http://docs.jinkan.org/docs/celery/
異步任務(wù)框架
執(zhí)行異步任務(wù)
執(zhí)行延遲任務(wù)
執(zhí)行定時任務(wù)
二、celery架構(gòu)
# 如果 Celery對象:Celery(...) 是放在一個模塊下的
# 1)終端切換到該模塊所在文件夾位置:scripts
# 2)執(zhí)行啟動worker的命令:celery worker -A 模塊名 -l info -P eventlet
# 注:windows系統(tǒng)需要eventlet支持,Linux與MacOS直接執(zhí)行:celery worker -A 模塊名 -l info
# 注:模塊名隨意
# 如果 Celery對象:Celery(...) 是放在一個包下的
# 1)必須在這個包下建一個celery.py的文件,將Celery(...)產(chǎn)生對象的語句放在該文件中
# 2)執(zhí)行啟動worker的命令:celery worker -A 包名 -l info -P eventlet
# 注:windows系統(tǒng)需要eventlet支持,Linux與MacOS直接執(zhí)行:celery worker -A 模塊名 -l info
# 注:包名隨意
消息中間件
Celery本身不提供消息服務(wù),但是可以方便的和第三方提供的消息中間件集成。包括,RabbitMQ, Redis等等
任務(wù)執(zhí)行單元
Worker是Celery提供的任務(wù)執(zhí)行的單元,worker并發(fā)的運行在分布式的系統(tǒng)節(jié)點中。
任務(wù)結(jié)果存儲
Task result store用來存儲Worker執(zhí)行的任務(wù)的結(jié)果,Celery支持以不同方式存儲任務(wù)的結(jié)果,包括AMQP, redis等
使用場景
異步執(zhí)行:解決耗時任務(wù),將耗時操作任務(wù)提交給Celery去異步執(zhí)行,比如發(fā)送短信/郵件、消息推送、音視頻處理等等
延遲執(zhí)行:解決延遲任務(wù)
定時執(zhí)行:解決周期(周期)任務(wù),比如每天數(shù)據(jù)統(tǒng)計
三、celery使用
安裝:pip install celery
根據(jù)celery架構(gòu),我們可以看出,worker就像是一個工人,一直在工作,如果有工作的時候就會從broker(Redist)里去拿過來執(zhí)行,并放到pakend(redist)中存放結(jié)果
方式一:在一個文件夾內(nèi)的三個頁面
worker執(zhí)行頁面
import celery
# broker存儲的位置
broker = 'redis://127.0.0.1:6379/1'
# backend存儲的位置
backend ='redis://127.0.0.1:6379/2'
# 實例化的celery對象
app=celery.Celery(__name__,broker=broker,backend=backend)
# 需要添加的任務(wù)
@app.task
def add(x,y):
print(x*y)
return x+y
broker提交任務(wù)的頁面
from celery_test import add
# 執(zhí)行這個文件,就是把這個任務(wù)添加到數(shù)據(jù)庫中,只要worker在工作
# 就會把這個任務(wù)從數(shù)據(jù)庫1中拿出來執(zhí)行,并把結(jié)果放到數(shù)據(jù)庫2中
ret = add.delay(3,4)
# ret 是這個任務(wù)的uuid,用于獲取任務(wù)結(jié)果
backend獲取任務(wù)結(jié)果的頁面
from celery_test import app
from celery.result import AsyncResult
# 任務(wù)對象的唯一標(biāo)識:uuid
id = '19dc2faa-39f9-47b6-af77-e9d3a4d05d2e'
if __name__ == '__main__':
async1 = AsyncResult(id=id, app=app)
if async1.successful():
result = async1.get()
print(result)
elif async1.failed():
print('任務(wù)失敗')
elif async1.status == 'PENDING':
print('任務(wù)等待中被執(zhí)行')
elif async1.status == 'RETRY':
print('任務(wù)異常后正在重試')
elif async1.status == 'STARTED':
print('任務(wù)已經(jīng)開始被執(zhí)行')
方式二:worker單獨做一個項目文件,添加任務(wù)和獲取結(jié)果分離出來(執(zhí)行異步任務(wù))
創(chuàng)建一個celery項目(包),內(nèi)部必須含有名字為celery的py文件,在內(nèi)部創(chuàng)建celery對象
celery.py
import celery
broker = 'redis://127.0.0.1:6379/1'
backend ='redis://127.0.0.1:6379/2'
app=celery.Celery(__name__,broker=broker,backend=backend,include=['celery_task.task1','celery_task.task2'])
編寫需要添加的任務(wù)也在這個包內(nèi),可創(chuàng)建不同的任務(wù)文件,可添加多個
task1.py
from .celery import app
@app.task
def add(x,y):
print(x,y)
return x+y
執(zhí)行延遲任務(wù)
添加任務(wù)頁面
# 執(zhí)行延遲任務(wù)就是多個一個時間參數(shù)
# 這里注意,時間參數(shù)是根據(jù)utc時間,并不是中國時間
from datetime import datetime, timedelta
# 時間對象必須和時間對象相加
eta=datetime.utcnow() + timedelta(seconds=10)
add.apply_async(args=(200, 50), eta=eta)
添加定時任務(wù)
celery頁面
# 時區(qū)
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
# 任務(wù)的定時配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
'low-task': {
'task': 'celery_task.tasks.low',
'schedule': timedelta(seconds=3),
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八點
'args': (300, 150),
}
}
# 定時任務(wù)的添加必須要新啟動一個beat命令去工作
# celery beat -A celery_task -l info
四、django中配置celery
celery包最好放在根路徑下
添加定時任務(wù)
celery.py
import os
# 配置django配置
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffyapi.settings.dev')
# 配置celery的worker環(huán)境
from celery import Celery
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
# 實例化worker對象app,用include添加定時任務(wù)
app=Celery(__name__,broker=broker,backend=backend,include=['celery_task.task2'])
# 時區(qū)
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
# 任務(wù)的定時配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
'low-task': {
'task': 'celery_task.task2.update_banner',
'schedule': timedelta(seconds=10),
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八點
# 'args': (),
}
}
# 定時任務(wù)的添加必須要新啟動一個beat命令去工作
# celery beat -A celery_task -l info
task定時任務(wù)
from .celery import app
from django.core.cache import cache
# 每三秒跟新一次緩存
# 添加一個任務(wù),celery內(nèi)設(shè)定間隔時間3秒
# 任務(wù)內(nèi)去cache跟新banner_list,從數(shù)據(jù)庫中拿到banner放到cache中的banner_list中
from home.models import Banner
# from luffyapi.settings import const
# [OrderedDict(
# [('image', 'http://127.0.0.1:8000/media/banner/banner1.png'), ('link', '/free-courses'), ('name', 'banner1')]),
# OrderedDict(
# [('image', 'http://127.0.0.1:8000/media/banner/banner2.png'), ('link', '/light-courses'), ('name', 'banner2')]),
# OrderedDict(
# [('image', 'http://127.0.0.1:8000/media/banner/banner3.png'), ('link', '/actual-courses'), ('name', 'banner3')])]
from home.sers import BannerSer
@app.task
def update_banner(*args, **kwargs):
queryset = Banner.objects.filter(is_delete=False)
banner_ser = BannerSer(queryset,many=True)
for banner in banner_ser.data:
banner['image'] = 'http://127.0.0.1:8000' + banner['image']
cache.set('banner_list', banner_ser.data)
print(banner_ser.data)
return '更新banner成功'
總結(jié)
以上是生活随笔為你收集整理的celery基本使用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: linux xampp nginx,ng
- 下一篇: 电视机盒子“内存”与“闪存”有何区别,1