分布式异步任务框架之Celery定义、异步任务框架特点、架构、使用场景、安装配置、基本使用、多任务结构使用、延时任务、定时任务及django中使用celery
生活随笔
收集整理的這篇文章主要介紹了
分布式异步任务框架之Celery定义、异步任务框架特点、架构、使用场景、安装配置、基本使用、多任务结构使用、延时任务、定时任务及django中使用celery
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
文章目錄
- 1、定義
- 2、Celery異步任務框架特點
- 3、Celery架構
- 4、使用場景
- 5、Celery的安裝配置
- 6、基本使用
- 7、celery多任務結構
- 8、延時任務
- 8.1、方式一
- 8.2、方式二
- 9、定時任務
- 10、django中使用celery()
1、定義
python中的一個分布式異步任務框架 Celery是一個簡單、靈活且可靠的,處理大量消息的分布式系統 專注于實時處理的異步任務隊列 同時也支持任務調度 (1) 執行異步任務(對立: 同步任務),解決耗時任務,將耗時操作任務提交給Celery去異步執行,比如發送短信/郵件、消息推送、音視頻處理等 (2) 執行延時任務(比如5分鐘后干一件事): 解決延遲任務 (3) 執行定時任務: 每天隔幾分鐘干什么事,解決周期(周期)任務,比如每天數據統計Celery 官網: http://www.celeryproject.org/ Celery 官方文檔英文版: http://docs.celeryproject.org/en/latest/index.html Celery 官方文檔中文版: http://docs.jinkan.org/docs/celery/2、Celery異步任務框架特點
(1) 可以不依賴任何服務器,通過自身命令,啟動服務(內部支持socket) (2) celery服務為為其他項目服務提供異步解決任務需求的注: 會有兩個服務同時運行,一個是項目服務,一個是celery服務,項目服務將需要異步處理的任務交給celery服務,celery就會在需要時異步完成項目的需求 人是一個獨立運行的服務 | 醫院也是一個獨立運行的服務 正常情況下,人可以完成所有健康情況的動作,不需要醫院的參與,但當人生病時,就會被醫院接收,解決人生病問題 人生病的處理方案交給醫院來解決,所有人不生病時,醫院獨立運行,人生病時,醫院就來解決人生病的需求3、Celery架構
Celery的架構由三部分組成,消息中間件(message broker)、任務執行單元(worker)和任務執行結果存儲(task result store)組成
4、使用場景
異步執行: 解決耗時任務,將耗時操作任務提交給Celery去異步執行,比如發送短信/郵件、消息推送、音視頻處理等 延遲執行: 解決延遲任務 定時執行: 解決周期(周期)任務,比如每天數據統計關于秒殺系統可以使用celery不能秒超,使用鎖機制(mysql悲觀鎖,樂觀鎖),redis鎖提高并發量 ---> 把同步做成異步 ---> 使用celery前端點擊秒殺按鈕,向后端發送秒殺請求 ---> 同步操作同步操作請求來到后端,判斷數量是否夠,如果夠要生成訂單(mysql),訂單狀態是待支付狀態 請求返回,告訴前端秒殺成功異步操作請求來到后端,提交一個celery任務 ---> celery任務異步的執行判斷數量是否夠,如果夠,要生成訂單(mysql)秒殺是否成功的結果還沒有,直接返回了(返回任務id)前端啟動一個定時任務,每隔5s,向后臺發送一個查詢請求,查詢秒殺任務是否執行完成(帶著任務id查)如果是未執行狀態或者執行中 ---> 返回給前端,前端不處理,定時任務繼續執行又隔了5s,發送查詢,查詢到秒殺成功的結果,返回給前端,秒殺成功5、Celery的安裝配置
pip install celery 消息中間件: RabbitMQ/Redis app=Celery(‘任務名’, broker=’xxx’, backend=’xxx’)6、基本使用
1、定義一個py文件(t_celery.py) import celery # 消息中間件(redis) broker='redis://127.0.0.1:6379/1' # 1 表示使用redis 1 這個db # 結果存儲(redis) backend='redis://127.0.0.1:6379/2' # 2 表示使用redis 2 這個db# 實例化得到對象,指定中間件和結果存儲 app=celery.Celery('test',broker=broker,backend=backend)# 定義任務(可以有多個) @app.task def add(a,b):return a+b@app.task def mul(a,b):return a*b2、提交任務(在其它文件中,task.py) from t_celery import add res=add.delay(100,4) print(res) # 任務id號3、啟動worker 非windows平臺: celery worker -A t_celery -l info windows需要裝eventlet模塊: celery worker -A t_celery -l info -P eventlet4、查看執行結果 from t_celery import app from celery.result import AsyncResult # 關鍵字,變量不能定義為關鍵字 id = '5331c70b-1b51-4a15-aa17-2fa0f7952c00' # 定義任務的id號,第二步中res的值 if __name__ == '__main__':res = AsyncResult(id=id, app=app)if res.successful():result = res.get()print(result)elif res.failed():print('任務失敗')elif res.status == 'PENDING':print('任務等待中被執行')elif res.status == 'RETRY':print('任務異常后正在重試')elif res.status == 'STARTED':print('任務已經開始被執行')app參數
celery配置文件參數 # 有些情況可以防止死鎖 CELERYD_FORCE_EXECV=True # 設置并發worker數量 CELERYD_CONCURRENCY=4 # 允許重試 CELERY_ACKS_LATE=True # 每個worker最多執行100個任務被銷毀,可以防止內存泄漏 CELERYD_MAX_TASKS_PER_CHILD=100 # 超時時間 CELERYD_TASK_TIME_LIMIT=12*307、celery多任務結構
目錄結構:package_celery: # 項目名celery_task # celery包名__init__.py celery.py # celery 的app,必須叫celeryorder_task.py # 任務user_task.py # 任務result.py # 結果查詢submit_tast.py # 提交任務celery.py
import celerybroker = 'redis://127.0.0.1:6379/1' backend = 'redis://127.0.0.1:6379/2'app = celery.Celery(broker=broker, backend=backend, include=['celery_task.order_task', 'celery_task.user_task' # 一定要記得注冊一下 ])order_task.py
from .celery import app@app.task def cannel_order(name):return '用戶{}取消訂單'.format(name)user_task.py
from .celery import app@app.task def send_msg(phone):return '{}發送短信成功'.format(phone)result.py
from celery_task.celery import appfrom celery.result import AsyncResult# 關鍵字,變量不能定義為關鍵字 # 發短信任務: 39744a3f-02ec-4b8b-b855-111025e4abe4 # 取消訂單任務:6d1e5e91-236a-449c-ad32-eac093b240bdid = '6d1e5e91-236a-449c-ad32-eac093b240bd' if __name__ == '__main__':res = AsyncResult(id=id, app=app)if res.successful():result = res.get()print(result)elif res.failed():print('任務失敗')elif res.status == 'PENDING':print('任務等待中被執行')elif res.status == 'RETRY':print('任務異常后正在重試')elif res.status == 'STARTED':print('任務已經開始被執行')submit_tast.py
from celery_task import order_task, user_taskres = order_task.cannel_order.delay('allen') print(res) # 返回任務id res = user_task.send_msg.delay('13666666666') print(res) # 返回任務id運行
# 運行worker(在package_celery目錄下執行) celery worker -A celery_task -l info -P eventlet8、延時任務
8.1、方式一
# 發送短信為例: 2021年1月7日21點58分55秒發送短信 from datetime import datetime from celery_task import order_task, user_task# eta: 延遲多長時間執行,eta需要傳時間對象,并且是utc時間 v1 = datetime(2021, 1, 7, 21, 58, 55) v2 = datetime.utcfromtimestamp(v1.timestamp()) # 轉成utc時間,比正常時間慢8個小時 print(v2) res = user_task.send_msg.apply_async(args=['13666666666', ], eta=v2)8.2、方式二
# 發送短信為例: 以當前時間為基準,過10秒后發送短信(隔幾秒后執行) from datetime import datetime, timedeltactime = datetime.now() utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) time_delay = timedelta(seconds=10) task_time = utc_ctime + time_delay res = user_task.send_msg.apply_async(args=['13666666666', ], eta=task_time) print(res)9、定時任務
# 在celery.py中配置 # 時區 app.conf.timezone = 'Asia/Shanghai' # 是否使用UTC app.conf.enable_utc = False# 任務的定時配置 from datetime import timedelta from celery.schedules import crontabapp.conf.beat_schedule = {'send-msg':{'task': 'celery_task.user_task.send_msg','schedule': timedelta(seconds=5), # 這里是timedelta# 'schedule': timedelta(hours=24*10), # # 這里是timedelta# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八點,這里是crontab'schedule': crontab(hour=8, day_of_month=1), # 每月一號早八點,這里是crontab'args': ('13666666666',),} }# 添加任務: 自動添加任務,所以要啟動一個添加任務的服務 # 啟動beat,負責每隔3s提交一個任務 celery beat -A celery_task -l info# 啟動worker celery worker -A celery_task -l info -P eventlet10、django中使用celery()
1、celery是獨立的,跟框架沒有關系 2、django-celery第三方模塊,兼容性不好不采用,使用通用方式 3、目錄結構celery_task__init__.pycelery.pyhome_task.pyorder_task.pyuser_task.py celery框架django項目工作流程 (1) 加載django配置環境 (2) 創建Celery框架對象app,配置broker和backend,得到的app就是worker (3) 給worker對應的app添加可處理的任務函數,用include配置給worker的app (4) 完成提供的任務的定時配置app.conf.beat_schedule (5) 啟動celery服務,運行worker,執行任務 (6) 啟動beat服務,運行beat,添加任務重點: 由于采用了django的反射機制,使用celery.py所在的celery_task包必須放置項目的根目錄下路由
path('test_celery', views.test_celery),視圖函數
def test_celery(request):from celery_task.celery import appfrom celery_task.user_task import send_msgfrom celery.result import AsyncResultid = request.GET.get('id')if id:res = AsyncResult(id=id, app=app)if res.successful():result = res.get()return HttpResponse(result)id = send_msg.delay('13666666666')print(id)return HttpResponse('發送短信成功')總結
以上是生活随笔為你收集整理的分布式异步任务框架之Celery定义、异步任务框架特点、架构、使用场景、安装配置、基本使用、多任务结构使用、延时任务、定时任务及django中使用celery的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: linux卸载mono,CentOS7安
- 下一篇: MySQL InnoDB Cluster