loglevel python 不输出_Python 通过 Celery 框架实现分布式任务队列!
Celery 是一個簡單、靈活且可靠的分布式消息處理系統,主要用來作為任務隊列對海量消息數據進行實時的處理,在多個程序線程或者主機之間傳遞和分發工作任務。同時也支持計劃任務等需求。
一、環境配置
Celery 框架自身并不對傳入的消息進行存儲,因此在使用前需要先安裝第三方的 Message Broker。如 RabbitMQ 和 Redis 等。
安裝 RabbitMQ
對于 Linux 系統,執行以下命令:
$ sudo apt-get install rabbitmq-server # 安裝 RabbitMQ $ sudo rabbitmqctl add_user myuser mypassword # 添加用戶 myuser/mypassword $ sudo rabbitmqctl add_vhost myvhost # 添加 vhost $ sudo rabbitmqctl set_user_tags myuser mytag $ sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*" # 為用戶 myuser 設置訪問 myvhost 的權限通過 Docker 安裝的步驟如下:
$ docker pull rabbitmq:3.8-management # 拉取 docker 鏡像(包含 web 管理) # 啟動 rabbitmq 容器 $ docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=myvhost -e RABBITMQ_DEFAULT_USER=myuser -e RABBITMQ_DEFAULT_PASS=mypassword rabbitmq:3.8-management安裝 Redis
$ sudo apt-get install redis-server
安裝 Celery
$ pip install celery
二、創建 Celery 應用
Celery 應用是該框架所能提供的所有功能(如管理 tasks 和 workers 等)的入口,須確保它可以被其他模塊導入。
以下是一段簡單的 Celery app 代碼 tasks.py :
# tasks.py from celery import Celeryapp = Celery('tasks',broker='pyamqp://myuser:mypassword@localhost:5672/myvhost',backend='redis://localhost:6379/0')@app.task def add(x, y):return x + y使用 RabbitMQ 作為 broker 接收和發送任務消息,使用 Redis 作為 backend 存儲計算結果。
運行 Celery worker 服務
$ celery -A tasks worker --loglevel=info
$ celery -A tasks worker --loglevel=info-------------- celery@skitarniu-ubuntu18 v4.3.0 (rhubarb) ---- **** ----- --- * *** * -- Linux-4.15.0-60-generic-x86_64-with-debian-buster-sid 2019-11-01 07:21:34 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: tasks:0x7f4f30b84a90 - ** ---------- .> transport: amqp://myuser:**@localhost:5672/myvhost - ** ---------- .> results: redis://localhost:6379/0 - *** --- * --- .> concurrency: 2 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ------------------- [queues].> celery exchange=celery(direct) key=celery[tasks]. tasks.add[2019-11-01 07:21:35,316: INFO/MainProcess] Connected to amqp://myuser:**@127.0.0.1:5672/myvhost [2019-11-01 07:21:35,367: INFO/MainProcess] mingle: searching for neighbors [2019-11-01 07:21:36,535: INFO/MainProcess] mingle: all alone [2019-11-01 07:21:36,782: INFO/MainProcess] celery@skitarniu-ubuntu18 ready.任務測試
進入 Python Shell,執行以下命令發布任務并獲取結果:
>>> from tasks import add >>> result = add.delay(4, 4) >>> result <AsyncResult: 6f435bc7-f194-469c-837f-54d77f880ace> >>> result.ready() True >>> result.get() 8 >>> result.traceback >>>delay() 方法用于發布任務消息,它是 apply_async() 方法的簡寫,即以異步的方式將任務需求提交給前面啟動好的 worker 去處理。 delay() 方法返回一個 AsyncResult 對象。
result.ready() 方法可以用來檢查提交的任務是否已經完成,返回布爾值。
result.get() 方法則用于獲取執行完成后的結果。如任務未完成,則程序會一直等待直到有結果返回。因此該方法是 阻塞 的,并不常用。可以傳入 timeout 參數指定等待的時間上限。
如 result.get(timeout=1) ,嘗試獲取任務執行后的結果,等待 1 秒。若 1 秒之后結果仍未返回,拋出 celery.exceptions.TimeoutError: The operation timed out. 異常。
如果任務執行過程中有拋出異常,則使用 get() 方法獲取結果時會重新拋出該異常導致程序中斷。可以通過修改 propagate 參數避免此情況:
result.get(propagate=False)
result.traceback 則用于獲取任務的 traceback 信息。
三、Calling Tasks
Celery 定義了一些可供 task 實例調用的通用的 Calling API ,包括三個方法和一些標準的執行選項:
apply_async(args[, kwargs[, ...]]) delay(*args, **kwargs) calling (__call__)以下是一些常見的調用示例:
- T.delay(arg, kwarg=value)
- T.apply_async((arg,), {'kwarg': value})
- T.apply_async(countdown=10)
10 秒之后開始執行某個任務 - T.apply_async(eta=now + timedelta(seconds=10))
10 秒之后開始執行某個任務 - T.apply_async(countdown=60, expires=120)
預計 1 分鐘后開始執行,但 2 分鐘后還未執行則失效 - T.apply_async(expires=now + timedelta(days=2))
2 天后失效
通過 countdown 設置任務的延遲執行:
>>> from tasks import add >>> result = add.apply_async((2, 3)) >>> result.get() 5 >>> delay_result = add.apply_async((2, 3), countdown=15) >>> delay_result.ready() False >>> delay_result.ready() False >>> delay_result.ready() False >>> delay_result.ready() True >>> delay_result.get() 5還可以通過 eta (estimated time of arrival) 設置延遲執行的時間:
>>> from datetime import datetime, timedelta >>> tomorrow = datetime.utcnow() + timedelta(days=1) >>> add.apply_async((2, 3), eta=tomorrow) <AsyncResult: c7dc6d7f-8b87-49d1-8077-73d7f046d709>此時 worker 在命令行的日志輸出如下:
[2019-11-06 05:16:21,362: INFO/MainProcess] Received task: tasks.add[c7dc6d7f-8b87-49d1-8077-73d7f046d709] ETA:[2019-11-07 05:16:06.652736+00:00]四、計劃任務
Celery 允許像使用 crontab 那樣按計劃地定時執行某個任務。參考代碼如下:
# tasks.py from celery import Celeryapp = Celery('tasks',broker='pyamqp://myuser:mypassword@localhost:5672/myvhost',backend='redis://localhost:6379/1')app.conf.beat_schedule = {'add-every-60-seconds': {'task': 'tasks.add','schedule': 60.0,'args': (16, 16)}, } app.conf.timezone = 'UTC'@app.task def add(x, y):print(x + y)運行 celery -A tasks worker -B 啟動 worker 服務。
-B 選項表示 beat ,即 celery beat 服務,負責執行計劃任務。
輸出如下(每隔一分鐘執行一次):
$ celery -A tasks worker -B ... [2019-11-06 05:41:34,057: WARNING/ForkPoolWorker-3] 32 [2019-11-06 05:42:33,998: WARNING/ForkPoolWorker-3] 32 [2019-11-06 05:43:34,056: WARNING/ForkPoolWorker-3] 32 [2019-11-06 05:44:34,105: WARNING/ForkPoolWorker-3] 32 [2019-11-06 05:45:34,157: WARNING/ForkPoolWorker-3] 32 ...同時 Celery 也支持更復雜的 crontab 類型的時間規劃:
from celery.schedules import crontabapp.conf.beat_schedule = {# Executes every Monday morning at 7:30 a.m.'add-every-monday-morning': {'task': 'tasks.add','schedule': crontab(hour=7, minute=30, day_of_week=1),'args': (16, 16),}, }Crontab 表達式支持的語法如下:
ExampleMeaningcrontab()每分鐘執行一次crontab(minute=0, hour=0)每天半夜 0 點執行crontab(minute=0, hour='*/3')每隔 3 小時執行一次(從 0 時開始)crontab(minute=0, hour='0,3,6,9,12,15,18,21')同上一條crontab(day_of_week='sunday')只在周日執行,每隔一分鐘執行一次crontab(minute='*', hour='*', day_of_week='sun')同上一條crontab(minute='*/10', hour='3,17,22', day_of_week='thu,fri')只在周四、周五的 3、17、22 時執行,每隔 10 分鐘執行一次crontab(minute=0, hour='*/2,*/3')只在能被 2 或者 3 整除的整點執行crontab(minute=0, hour='*/3,8-17')在能被 3 整除的整點,和 8-17 點之間的整點執行crontab(0, 0, day_of_month='2')在每個月的第二天的 0 時執行crontab(0, 0, day_of_month='11', month_of_year='5')在每年的 5 月 11 號 0 點執行
與50位技術專家面對面20年技術見證,附贈技術全景圖總結
以上是生活随笔為你收集整理的loglevel python 不输出_Python 通过 Celery 框架实现分布式任务队列!的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: gb50268-2008给水排水管道施工
- 下一篇: python datetime time