flask + celery实现定时任务和异步
參考資料:?
Celery 官網:http://www.celeryproject.org/
Celery 官方文檔英文版:http://docs.celeryproject.org/en/latest/index.html
Celery 官方文檔中文版:http://docs.jinkan.org/docs/celery/
Celery簡介
除Celery是一個異步任務的調度工具。 Celery 是 Distributed Task Queue,分布式任務隊列,分布式決定了可以有多個 worker 的存在,隊列表示其是異步操作,即存在一個產生任務提出需求的工頭,和一群等著被分配工作的碼農。
Broker
在 Python 中定義 Celery 的時候,我們要引入 Broker(消息中間件),中文翻譯過來就是“中間人”的意思,在這里 Broker 起到一個中間人的角色。在工頭提出任務的時候,把所有的任務放到 Broker 里面,在 Broker 的另外一頭,一群碼農等著取出一個個任務準備著手做。
Backend
這種模式注定了整個系統(tǒng)會是個開環(huán)系統(tǒng),工頭對于碼農們把任務做的怎樣是不知情的。所以我們要引入 Backend 來保存每次任務的結果。這個 Backend 有點像我們的 Broker,也是存儲任務的信息用的,只不過這里存的是那些任務的返回結果。我們可以選擇只讓錯誤執(zhí)行的任務返回結果到 Backend,這樣我們取回結果,便可以知道有多少任務執(zhí)行失敗了。
Celery應用場景
1.你想對100臺機器執(zhí)行一條批量命令,可能會花很長時間 ,但你不想讓你的程序等著結果返回,而是給你返回 一個任務ID,你過一段時間只需要拿著這個任務id就可以拿到任務執(zhí)行結果, 在任務執(zhí)行ing進行時,你可以繼續(xù)做其它的事情。
2.你想做一個定時任務,比如每天檢測一下你們所有客戶的資料,如果發(fā)現今天 是客戶的生日,就給他發(fā)個短信祝福
Celery的特點
1.簡單:一單熟悉了celery的工作流程后,配置和使用還是比較簡單的
2.高可用:當任務執(zhí)行失敗或執(zhí)行過程中發(fā)生連接中斷,celery 會自動嘗試重新執(zhí)行任務
3.快速:一個單進程的celery每分鐘可處理上百萬個任務
3.靈活: 幾乎celery的各個組件都可以被擴展及自定制
Celery工作基本流程
?
我們的項目
項目目錄:
proj/celery.py
from __future__ import absolute_import, unicode_literals from celery import Celeryapp = Celery('proj',broker = 'amqp://',backend = 'amqp://',include = ['proj.tasks'])app.conf.update(result_expires = 3600 )if __name__ == '__main__':app.start()在這個模塊中創(chuàng)建了Celery實例(通常稱為app)
要在項目中使用Celery只需要通過import導入該實例就行了
- broker參數指定要使用的中間件的URL
- backend參數指定使用的result backend
用來跟蹤任務狀態(tài)和結果,雖然默認狀態(tài)下結果不可用。以上例子中使用RPC result backend。當然,不同的result backend都有自己的好處和壞處,根據自己實際情況進行選擇,如果不需要最好禁用。通過設置@task(ignore_result=True)選項來禁用耽擱任務)
- include參數是當worker啟動時導入的模塊列表需要在這里添加自己的任務莫夸這樣worker就可以找到任務
proj/tasks.py
from __future__ import absolute_import, unicode_literals from .celery import app@app.task def add(x, y):return x + y@app.task def mul(x, y):return x * y@app.task def xsum(numbers):return sum(numbers)
?
啟動worker
Celery程序可以用來啟動worker:
celery -A proj worker -l info -------------- celery@centos6 v4.1.0 (latentcall) ---- **** ----- --- * *** * -- Linux-2.6.32-696.el6.x86_64-x86_64-with-centos-6.9-Final 2018-03-26 12:27:49 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: task:0x7fe5cfbd20d0 - ** ---------- .> transport: amqp://guest:**@localhost:5672// - ** ---------- .> results: amqp:// - *** --- * --- .> concurrency: 4 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues].> celery exchange=celery(direct) key=celery[tasks][2018-03-26 12:27:49,921: INFO/MainProcess] Connected to amqp://guest:**@localhost:5672// [2018-03-26 12:27:49,926: INFO/MainProcess] mingle: searching for neighbors [2018-03-26 12:27:49,499: INFO/MainProcess] mingle: sync with 1 nodes [2018-03-26 12:27:50,950: INFO/MainProcess] mingle: sync complete [2018-03-26 12:27:50,957: INFO/MainProcess] celery@centos6 ready.- broker是在celery模塊中指定的中間件參數的url,也可以在命令行中通過-b選項指定不同的中間件
- Concurrent是用于并行處理的任務的預創(chuàng)建worker進程數量,當所有的任務都在忙于工作時,新的任務必須等待之前的執(zhí)行完成才能處理
默認的并發(fā)數是機器上CPU的數量,可以通過celery worker -c選項指定自定義數量。沒有推薦值,最佳數量取決于很多因素,但是如果你的任務主要是I/O相關的,就可以增加這個數量。實驗表明,增加超過兩倍CPU數量效果很差,而且可能會降低性能
除了prefork pool,Celery還支持Eventlet、Gevent并且還能在單線程上運行
- Event是一個可選項,當啟用的時候,Celery會發(fā)送監(jiān)控(消息)來反映worker的操作,也可以被用來監(jiān)視像celery、events和Flower(實時Celery監(jiān)控)這樣的程序。
- Queues是worker將使用的任務的隊列的集合,worker可以一次接受幾個隊列,它用來將消息路由到特定的工作者以作為服務質量、關注點分離、和優(yōu)化的一種方式
可以通過命令行獲取完整的列表————celery worker --help
停止worker
ctrl-c
后臺
生產環(huán)境中一般將worker放到后臺,后臺腳本使用celery multi命令后臺啟動一個或多個worker
celery multi start w1 -A proj -l info控制臺打印
celery multi v4.1.0 (latentcall) > Starting nodes...> w1@centos6: OK Stale pidfile exists - Removing it.也可以重啟:
celery multi restart w1 -A proj -l info celery multi v4.1.0 (latentcall) > Stopping nodes...> w1@centos6: TERM -> 23620 > Waiting for 1 node -> 23620.....> w1@centos6: OK > Restarting node w1@centos6: OK > Waiting for 1 node -> None...停止:
celery multi stop w1 -A proj -l infostop命令是異步的所以它不會等待worker關閉,可以使用stopwait命令來確保當前執(zhí)行都任務在退出前都已執(zhí)行完畢
celery multi stopwait w1 -A proj -l infocelery multi不會存儲關于worker的信息,所以重啟的時候需要使用同樣的命令行參數。在停止時,必須使用相同的pidfile和logfile參數
默認情況下,程序將在當期目錄創(chuàng)建pid和log文件,為了防止多個worker運行出錯,推薦將這些文件放在專門的目錄:
mkdir -p /var/run/celery mkdir -p /var/log/celery celery multi start w1 -A proj -l info --pidfile=/var/run/celery/%n.pid --logfile=/var/log/celery/%n%I.log使用multi指令可以啟動多個worker,并且有一個強大的命令行語法來為不同的worker指定參數:
celery multi start 10 -A proj -l info -Q:1-3 images, video -Q:4, 5 data -Q default -L:4,5 debug~Detail about?multi?temp
--app參數
--app參數指定使用的Celery應用實例,必須以module.path:attribute的形式出現
但也支持快捷方式,只要包名指定了,就會嘗試在應用實例中搜索
使用--app=proj:
任務調用
- 可以通過使用delay()方法來調用一個任務
這個方法實際上是另一種叫做apply_async()方法的快捷方式
add.applay_async((3, 3))后者(applay_async())能夠指定執(zhí)行選項,比如運行時間(倒計時)、應該發(fā)送的隊列等等:
add.apply_async((2, 2), queue='lopri', countdown=10)上述案例中,任務會被發(fā)送給一個名為lopri的隊列,該任務會在信息發(fā)送后十秒執(zhí)行
直接應用該任務會在當前進程中執(zhí)行任務,不會發(fā)送消息
add(3, 3)result:6
三種方法delay()、apply_async()和應用__call__,代表了Celery調用API,也同樣用于簽名
-
每一個任務調用都有一個唯一的標識符(UUID),這個就是任務的id
-
delay()和apply_async方法會返回一個AsyncResult實例,可以被用來跟蹤任務執(zhí)行狀態(tài),但是需要開啟result backend這樣狀態(tài)才能被存儲在某處
-
Results默認是禁用的,因為實際上沒有一個result backend適用于每個應用程序,所以要考慮到每個獨立backend的缺點來選擇一個使用。對于許多保持返回值的任務來說都不是很有用,所以這個默認的禁用是很明智的。還需要注意的是,result backend并不用來監(jiān)控任務和worker,對于Celery有專門的事件消息
如果配置了result backend就可以接收到任務的返回值
result = add.delay(2, 2) res.get(timeout=1)retult:4
- 可以通過查看id屬性找到任務的id
result:073c568d-ca88-4198-b735-0f98f861218b
-
如果任務拋出異常也可以檢查到異常,默認result.get()可以傳播任何錯誤
-
如果不希望錯誤傳播,可以通過propagete屬性禁用
在這種情況下,它會返回所提出的異常實例,以便檢查任務是否成功或失敗,您將不得不在結果實例上使用相應的方法
res.failed() res.successful()也可以通過state找到任務的狀態(tài)
res.stateresult:FAILUTE
- 一個任務只能有一個狀態(tài),但是可以在幾個狀態(tài)中發(fā)展,典型任務階段可能是這樣
STARTED狀態(tài)是一個特殊的狀態(tài),只有在task_track_started設置啟用或者@task(track_started=True)選項設置的時候才會被記錄下來
PENDING狀態(tài)實際上不是記錄狀態(tài),而是未知任務id的默認狀態(tài)
from proj.celery import app res = app.AsyncResult('this-id-does-not-exist') res.stateresult:PENDING
- 如果重新嘗試這個任務可能會變得更復雜,對于一個嘗試過兩遍的任務來說階段可能是這樣:
Canvas:設計任務流
前面學習了通過delay方法調用任務,通常這樣就夠了,但是有時可能需要將任務調用的簽名傳遞給另一個進程或者另一個函數的參數,對Celery來說叫做signatures
簽名以某種方式包裝了單一任務調用的參數和執(zhí)行選項,以便將其傳遞給函數,甚至序列化后發(fā)送。
可以使用參數(2, 2)和十秒的計時器來為add任務創(chuàng)建一個簽名
add.signature((2, 2), countdown=10)也可以簡寫:
add.s(2, 2)調用API
簽名的實例也支持調用API,意味著也可以有delay和apply_async方法
但是有一個區(qū)別,那就是簽名可能已經指定了一個參數簽名,add任務接受兩個參數,所以一個制定了兩個參數的簽名將會形成一個完整的簽名
s1 = add.s(2, 2) res = s1.delay() res.get()也可以使用不完成的簽名,叫做partials:
s1 = add.s(2)s2現在是部分簽名,需要另一個參數才完整,則可以在調用signature的時候處理
# resolves the partial: add(8, 2) res = s2.delay(8) res.get()在這里,添加了參數8,對已存在的參數2組成了一個完整的簽名add(8, 2)
關鍵字參數也可以延遲添加,會和已存在的關鍵字參數合并,新參數優(yōu)先(新參數覆蓋舊參數)
s3 = add.s(2, 2, debug=True) s3.delay(debug=False)已聲明的簽名支持調用API:
- sig.apply_async(arg=(), kwargs={}, **options
使用可選部分參數和部分關鍵字參數調用簽名,也支持部分可執(zhí)行選項 - sig.delay(*args, **kwargs)
apply_async的星參版本,任何參數都會被預先記錄在簽名的參數你,關鍵字參數會和現有的keys合并
基本體
- group
- chain
- chord
- map
- starmap
- chunks
這些基本體本身就是簽名對象,因此,它們可以以任何多種方式組合起來組成復雜的工作流
Group
一個group同時調用任務列表,返回一個特殊結果實例,這樣可以以組的形式檢查結果,并按順序檢索返回值
from celery import group from proj.tasks import addgroup(add.s(i, i) for in in range(10))().get()result:[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
- Partial group
result:[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
Chains
任務可以被相互連接起來,這樣在一個任務返回后另一個任務被調用
from celery import chain form proj.tasks import add, mul// 用法1 chian(add.s(4, 4) | mul.s(8))().get()// 用法2 g = chain(add.s(4) | mul.s(8)) g(4).get()// 用法3 (add.s(4, 4) | mul.s(8))().get()Chords
chord是一個有返回值的group
from celery import chord from proj.tasks import add, xsum// 用法1 (group(add.s(i, i) for i in range(10)) | xsum.s())().get()// 用法2 upload_document.s(file) | group(apply_filter.s() for filter in filters)路由
Celery支持AMQP提供的所有路由設施,但是它也支持簡單路由,將消息發(fā)送到指定的隊列
task_routes設置可以是用戶按名稱對任務進行路由,并將一切集中在一個位置
app.conf.update{task_routes = {'proj.tasks.add': {'queue': 'hipri'},} }可以在運行時通過queue參數指定隊列到apply_async:
from proj.tasks import add add.apply_async((2,2), queue='hipri')然后可以通過指定celery worker -Q選項使worker從隊列中消費
celery -A proj worker -Q hipri也可以通過使用逗號分隔符(,)來指定多個隊列
celery -A proj worker -Q hipri, celery默認隊列因為歷史原因命名為:celery
隊列的順序無關緊要,因為worker會給隊列相同的權重
遠程控制
如果使用RabbitMQ(AMQP)、Redis或者Qpid作為中間件就可以在運行時監(jiān)視worker
- 查看worker當前執(zhí)行的任務
這是通過使用廣播消息實現的,因此,急群眾的每一個工作人員都能接收到所有遠程控制命令
- 也可以指令一個或多個worker使用--destination選項請求行動,這是一個逗號分隔的worker主機名列表
如果沒有提供目標,那么每個worker都會對請求做出反應并回復
- celery inspect命令包含的命令不會改變worker的任何東西,它只會回復關于worker內部發(fā)生的事情的信息和統(tǒng)計信息,可以執(zhí)行命令檢查列表:
- celery control命令,包含在運行時實際改變worker操作的命令
- 強制worker啟用事件消息(用于監(jiān)視任務和工作人員)
當事件激活,可以啟動event dumper查看worker正在做什么
celery -A proj events --dump或者
celery -A proj events當完成監(jiān)控可以再次禁用events
celery -A proj control disable_eventscelery status命令還能使用遠程控制命令,并顯示集群中的在線worker列表
celery -A proj status時區(qū)
所有的時間和日期、內部和消息多使用UTC時間區(qū)域
當worker收到消息,例如使用倒計時設置,它將UTC時間轉換為本地時間。如果希望使用與系統(tǒng)時區(qū)不同的地區(qū),那么必須要使用時區(qū)設置來配置該時區(qū):
app.conf.timezone = 'Asia/Shanghai'?
最優(yōu)化
默認的配置并沒有針對吞吐量進行優(yōu)化,它試圖在許多短任務和更少的長任務之間走中間路線,這是吞吐量和公平調度之間的折中
?
總結
以上是生活随笔為你收集整理的flask + celery实现定时任务和异步的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: celery定时任务简单使用
- 下一篇: Python—进程、线程、协程