利用 Celery 构建 Web 服务的后台任务调度模块
來源:http://www.tuicool.com/articles/Enaeymm
任務(wù)隊列在 Web 服務(wù)里的應(yīng)用
在 Web2.0 后的時代,社交網(wǎng)站、搜索引擎的的迅猛發(fā)展對 Web 服務(wù)的后臺管理系統(tǒng)提出了更高的需求。考慮幾個常見的使用場景:
考慮對于高并發(fā)大用戶量的 Web 服務(wù)系統(tǒng),對于場景一和場景二中的需求,如果在請求處理周期內(nèi)完成這些任務(wù),然后再返回結(jié)果,這種傳統(tǒng)的做法會導(dǎo)致用戶等待的時間過長。同時 Web 服務(wù)管理后臺對任務(wù)處理能力也缺乏擴(kuò)展性。
在這種場景下,任務(wù)隊列是有效的解決方案。在一個任務(wù)隊列系統(tǒng)中,“將新鮮事推送至用戶 A 的所有好友”或者“查詢當(dāng)前最熱門的十大文獻(xiàn)”這種查詢或者計算工作可以被當(dāng)成一個“任務(wù)”。在任務(wù)隊列系統(tǒng)中,一般有任務(wù)生產(chǎn)者、任務(wù)處理中間方以及任務(wù)消費者三方。其中任務(wù)生產(chǎn)者負(fù)責(zé)生產(chǎn)任務(wù),比如“將新鮮事推送至用戶 A 的所有好友”這一任務(wù)的發(fā)起方就可以稱作任務(wù)生產(chǎn)者。任務(wù)處理中間方負(fù)責(zé)接收任務(wù)生產(chǎn)者的任務(wù)處理請求,對任務(wù)進(jìn)行調(diào)度,最后將任務(wù)分發(fā)給任務(wù)消費者來進(jìn)行處理。任務(wù)消費者就是執(zhí)行任務(wù)的一方,它負(fù)責(zé)接收任務(wù)處理中間方發(fā)來的任務(wù)處理請求,完成這些任務(wù),并且返回任務(wù)處理的結(jié)果。在生產(chǎn)方、消費者和任務(wù)處理中間方之間一般使用消息傳遞的方式來進(jìn)行通信。
在任務(wù)隊列系統(tǒng)框架中,任務(wù)消費者可以跨越不同的服務(wù)節(jié)點,可以動態(tài)地增加節(jié)點來增加系統(tǒng)的任務(wù)處理能力,非常適合高并發(fā)、需要橫向擴(kuò)展的 Web 服務(wù)后臺。
回頁首
Celery: 基于 Python 的開源分布式任務(wù)調(diào)度模塊
Celery 是一個用 Python 編寫的分布式的任務(wù)調(diào)度模塊,它有著簡明的 API,并且有豐富的擴(kuò)展性,適合用于構(gòu)建分布式的 Web 服務(wù)。
圖 1. Celery 的模塊架構(gòu)
Celery 的模塊架構(gòu)較為簡潔,但是提供了較為完整的功能:
任務(wù)生產(chǎn)者 (task producer)
任務(wù)生產(chǎn)者 (task producer) 負(fù)責(zé)產(chǎn)生計算任務(wù),交給任務(wù)隊列去處理。在 Celery 里,一段獨立的 Python 代碼、一段嵌入在 Django Web 服務(wù)里的一段請求處理邏輯,只要是調(diào)用了 Celery 提供的 API,產(chǎn)生任務(wù)并交給任務(wù)隊列處理的,我們都可以稱之為任務(wù)生產(chǎn)者。
任務(wù)調(diào)度器 (celery beat)
Celery beat 是一個任務(wù)調(diào)度器,它以獨立進(jìn)程的形式存在。Celery beat 進(jìn)程會讀取配置文件的內(nèi)容,周期性地將執(zhí)行任務(wù)的請求發(fā)送給任務(wù)隊列。Celery beat 是 Celery 系統(tǒng)自帶的任務(wù)生產(chǎn)者。系統(tǒng)管理員可以選擇關(guān)閉或者開啟 Celery beat。同時在一個 Celery 系統(tǒng)中,只能存在一個 Celery beat 調(diào)度器。
任務(wù)代理 (broker)
任務(wù)代理方負(fù)責(zé)接受任務(wù)生產(chǎn)者發(fā)送過來的任務(wù)處理消息,存進(jìn)隊列之后再進(jìn)行調(diào)度,分發(fā)給任務(wù)消費方 (celery worker)。因為任務(wù)處理是基于 message(消息) 的,所以我們一般選擇 RabbitMQ、Redis 等消息隊列或者數(shù)據(jù)庫作為 Celery 的 message broker。
任務(wù)消費方 (celery worker)
Celery worker 就是執(zhí)行任務(wù)的一方,它負(fù)責(zé)接收任務(wù)處理中間方發(fā)來的任務(wù)處理請求,完成這些任務(wù),并且返回任務(wù)處理的結(jié)果。Celery worker 對應(yīng)的就是操作系統(tǒng)中的一個進(jìn)程。Celery 支持分布式部署和橫向擴(kuò)展,我們可以在多個節(jié)點增加 Celery worker 的數(shù)量來增加系統(tǒng)的高可用性。在分布式系統(tǒng)中,我們也可以在不同節(jié)點上分配執(zhí)行不同任務(wù)的 Celery worker 來達(dá)到模塊化的目的。
結(jié)果保存
Celery 支持任務(wù)處理完后將狀態(tài)信息和結(jié)果的保存,以供查詢。Celery 內(nèi)置支持 rpc, Django ORM,Redis,RabbitMQ 等方式來保存任務(wù)處理后的狀態(tài)信息。
回頁首
構(gòu)建第一個 Celery 程序
在我們的第一個 Celery 程序中,我們嘗試在 Celery 中構(gòu)建一個“將新鮮事通知到朋友”的任務(wù),并且嘗試通過編寫一個 Python 程序來啟動這個任務(wù)。
安裝 Celery
Pip install celery選擇合適的消息代理中間件
Celery 支持 RabbitMQ、Redis 甚至其他數(shù)據(jù)庫系統(tǒng)作為其消息代理中間件,在本文中,我們選擇 RabbitMQ 作為消息代理中間件。
sudo apt-get install rabbitmq-server
創(chuàng)建 Celery 對象
Celery 對象是所有 Celery 功能的入口,所以在開始其它工作之前,我們必須先定義我們自己的 Celery 對象。該對象定義了任務(wù)的具體內(nèi)容、任務(wù)隊列的服務(wù)地址、以及保存任務(wù)執(zhí)行結(jié)果的地址等重要信息。
# notify_friends.py from celery import Celery import time app = Celery('notify_friends', backend='rpc://', broker='amqp://localhost')@app.task def notify_friends(userId, newsId):print 'Start to notify_friends task at {0}, userID:{1} newsID:{2}'.format(time.ctime(), userId, newsId)time.sleep(2)print 'Task notify_friends succeed at {0}'.format(time.ctime())return True在本文中,為了模擬真實的應(yīng)用場景,我們定義了 notify_friends 這個任務(wù),它接受兩個參數(shù),并且在輸出流中打印出一定的信息,
創(chuàng)建 Celery Worker 服務(wù)進(jìn)程
在定義完 Celery 對象后,我們可以創(chuàng)建對應(yīng)的任務(wù)消費者--Celery worker 進(jìn)程,后續(xù)的任務(wù)處理請求都是由這個 Celery worker 進(jìn)程來最終執(zhí)行的。
celery -A celery_test worker --loglevel=info
在 Python 程序中調(diào)用 Celery Task
我們創(chuàng)建一個簡單的 Python 程序,來觸發(fā) notify_friends 這個任務(wù)。
# call_notify_friends.pyfrom notify_friends import notify_friends import timedef notify(userId, messageId):result = notify_friends.delay(userId, messageId)while not result.ready():time.sleep(1)print result.get(timeout=1)if __name__ == '__main__':notify('001', '001')我們在 call_notify_friends.py 這個程序文件中,定義了 Notify 函數(shù),它調(diào)用了我們之前定義的 notify_friends 這個 API,來發(fā)送任務(wù)處理請求到任務(wù)隊列,并且不斷地查詢等待來獲得任務(wù)處理的結(jié)果。
Celery worker 中的 log 信息:
[tasks]. celery_test.notify_friends[2015-11-16 15:02:31,113: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672// [2015-11-16 15:02:31,122: INFO/MainProcess] mingle: searching for neighbors [2015-11-16 15:02:32,142: INFO/MainProcess] mingle: all alone [2015-11-16 15:02:32,179: WARNING/MainProcess] celery@yuwenhao-VirtualBox ready. [2015-11-16 15:04:45,474: INFO/MainProcess] Received task: celery_test.notify_friends[3f090a76-7678-4f9c-a37b-ceda59600f9c] [2015-11-16 15:04:45,475: WARNING/Worker-2] Start to notify_friends task at Mon Nov 16 15:04:45 2015, userID:001 newsID:001 [2015-11-16 15:04:47,477: WARNING/Worker-2] Task notify_friends succeed at Mon Nov 16 15:04:47 2015 [2015-11-16 15:04:47,511: INFO/MainProcess] Task celery_test.notify_friends[3f090a76-7678-4f9c-a37b-ceda59600f9c] succeeded in 2.035536565s: True我們可以看到,Celery worker 收到了 Python 程序的 notify_friends 任務(wù)的處理請求,并且執(zhí)行完畢。
回頁首
利用調(diào)度器創(chuàng)建周期任務(wù)
在我們第二個 Celery 程序中,我們嘗試構(gòu)建一個周期性執(zhí)行“查詢當(dāng)前一小時最熱門文獻(xiàn)”的任務(wù),每隔 100 秒執(zhí)行一次,并將結(jié)果保存起來。后續(xù)的搜索請求到來后可以直接返回已有的結(jié)果,極大優(yōu)化了用戶體驗。
創(chuàng)建配置文件
Celery 的調(diào)度器的配置是在 CELERYBEAT_SCHEDULE 這個全局變量上配置的,我們可以將配置寫在一個獨立的 Python 模塊,在定義 Celery 對象的時候加載這個模塊。我們將 select_populate_book 這個任務(wù)定義為每 100 秒執(zhí)行一次。
# config.py from datetime import timedeltaCELERYBEAT_SCHEDULE = {'select_populate_book': {'task': 'favorite_book.select_populate_book','schedule': timedelta(seconds=100),}, }創(chuàng)建 Celery 對象
在 Celery 對象的定義里,我們加載了之前定義的配置文件,并定義了 select_populate_book 這個任務(wù)。
#favorite_book.py from celery import Celery import timeapp = Celery('select_populate_book', backend='rpc://', broker='amqp://localhost') app.config_from_object('config')@app.task def select_populate_book():print 'Start to select_populate_book task at {0}'.format(time.ctime())time.sleep(2)print 'Task select_populate_book succeed at {0}'.format(time.ctime())return True啟動 Celery worker
celery -A favorite_book worker --loglevel=info啟動 Celery beat
啟動 Celery beat 調(diào)度器,Celery beat 會周期性地執(zhí)行在 CELERYBEAT_SCHEDULE 中定義的任務(wù),即周期性地查詢當(dāng)前一小時最熱門的書籍。
celery -A favorite_book beatyuwenhao@yuwenhao:~$ celery -A favorite_book beat celery beat v3.1.15 (Cipater) is starting. __ - ... __ - _ Configuration ->. broker -> amqp://guest:**@localhost:5672//. loader -> celery.loaders.app.AppLoader. scheduler -> celery.beat.PersistentScheduler. db -> celerybeat-schedule. logfile -> [stderr]@%INFO. maxinterval -> now (0s) [2015-11-16 16:21:15,443: INFO/MainProcess] beat: Starting... [2015-11-16 16:21:15,447: WARNING/MainProcess] Reset: Timezone changed from 'UTC' to None [2015-11-16 16:21:25,448: INFO/MainProcess] Scheduler: Sending due task select_populate_book (favorite_book.select_populate_book) [2015-11-16 16:21:35,485: INFO/MainProcess] Scheduler: Sending due task select_populate_book (favorite_book.select_populate_book) [2015-11-16 16:21:45,490: INFO/MainProcess] Scheduler: Sending due task select_populate_book (favorite_book.select_populate_book)我們可以看到,Celery beat 進(jìn)程周期性地將任務(wù)執(zhí)行請求 select_populate_book 發(fā)送至任務(wù)隊列。
yuwenhao@yuwenhao:~$ celery -A favorite_book worker --loglevel=info [2015-11-16 16:21:11,560: WARNING/MainProcess] /usr/local/lib/python2.7/dist-packages/celery/apps/worker.py:161: CDeprecationWarning: Starting from version 3.2 Celery will refuse to accept pickle by default.The pickle serializer is a security concern as it may give attackers the ability to execute any command. It's important to secure your broker from unauthorized access when using pickle, so we think that enabling pickle should require a deliberate action and not be the default choice.If you depend on pickle then you should set a setting to disable this warning and to be sure that everything will continue working when you upgrade to Celery 3.2::CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']You must only enable the serializers that you will actually use.warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))-------------- celery@yuwenhao-VirtualBox v3.1.15 (Cipater) ---- **** ----- --- * *** * -- Linux-3.5.0-23-generic-x86_64-with-Ubuntu-12.04-precise -- * - **** --- - ** ---------- [config] - ** ---------- .> app: select_populate_book:0x1b219d0 - ** ---------- .> transport: amqp://guest:**@localhost:5672// - ** ---------- .> results: rpc:// - *** --- * --- .> concurrency: 2 (prefork) -- ******* ---- --- ***** ----- [queues]-------------- .> celery exchange=celery(direct) key=celery[tasks]. favorite_book.select_populate_book[2015-11-16 16:21:11,579: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672// [2015-11-16 16:21:11,590: INFO/MainProcess] mingle: searching for neighbors [2015-11-16 16:21:12,607: INFO/MainProcess] mingle: all alone [2015-11-16 16:21:12,631: WARNING/MainProcess] celery@yuwenhao-VirtualBox ready. [2015-11-16 16:21:25,459: INFO/MainProcess] Received task: favorite_book.select_populate_book[515f7c55-7ff0-4fcf-bc40-8838f69805fd] [2015-11-16 16:21:25,460: WARNING/Worker-2] Start to select_populate_book task at Mon Nov 16 16:21:25 2015 [2015-11-16 16:21:27,462: WARNING/Worker-2] Task select_populate_book succeed at Mon Nov 16 16:21:27 2015 [2015-11-16 16:21:27,475: INFO/MainProcess] Task favorite_book.select_populate_book [515f7c55-7ff0-4fcf-bc40-8838f69805fd] succeeded in 2.015802141s: True [2015-11-16 16:21:35,494: INFO/MainProcess] Received task: favorite_book.select_populate_book[277d718a-3435-4bca-a881-a8f958d64aa9] [2015-11-16 16:21:35,498: WARNING/Worker-1] Start to select_populate_book task at Mon Nov 16 16:21:35 2015 [2015-11-16 16:21:37,501: WARNING/Worker-1] Task select_populate_book succeed at Mon Nov 16 16:21:37 2015 [2015-11-16 16:21:37,511: INFO/MainProcess] Task favorite_book.select_populate_book [277d718a-3435-4bca-a881-a8f958d64aa9] succeeded in 2.014368786s: True我們可以看到,任務(wù) select_populate_book 的 Celery worker 周期性地收到 Celery 調(diào)度器的任務(wù)的處理請求,并且運行該任務(wù)。
回頁首
結(jié)束語
任務(wù)隊列技術(shù)可以滿足 Web 服務(wù)系統(tǒng)后臺任務(wù)管理和調(diào)度的需求,適合構(gòu)建分布式的 Web 服務(wù)系統(tǒng)后臺。Celery 是一個基于 Python 的開源任務(wù)隊列系統(tǒng)。它有著簡明的 API 以及良好的擴(kuò)展性。本文首先介紹了隊列技術(shù)的基本原理,然后介紹了 Celery 的模塊架構(gòu)以及工作原理。最后,本文通過實例介紹了如何在 Python 程序中調(diào)用 Celery API 并通過 Celery 任務(wù)隊列來執(zhí)行任務(wù),以及如何通過 Celery beat 在 Celery 任務(wù)隊列中創(chuàng)建周期性執(zhí)行的任務(wù)。希望本文可以對 Web 后臺開發(fā)者、以及 Celery 的初學(xué)者有所幫助。
總結(jié)
以上是生活随笔為你收集整理的利用 Celery 构建 Web 服务的后台任务调度模块的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: c++中求前n项和(这代码不得不让我佩服
- 下一篇: easyExcel 使用指南详解