celery 可视化_在Flask中使用Celery进行多任务分布执行
關鍵字:Flask, Redis, RabbitMQ, Celery, Broker, Backend
前言
在后端服務器有時候需要處理耗時較長的任務,例如發送電子郵件,在處理這些任務時,這個線程就處于阻塞狀態而無法處理新的請求,服務器性能就大大降低,可以將這些耗時較長的任務交給任務隊列來處理,處理完成后告訴我們結果就可以了,這樣服務器處理請求的能力得到了極大的提高,本文將介紹Flask如何使用Celery來處理耗時任務(Windows環境下,以后有機會再發Linux下的)。
1、Celery工作原理
Celery 是一個異步任務隊列。通俗的講它就是我們的助理,當我們要出差的時候,它會幫我們安排好車輛、訂好機票、預約酒店等等。把Flask看成老板,Celery就是它的助理,幫助老板處理瑣碎的事情,這樣老板就有更多的時間處理其他重要的事情。
消息隊列(message queue)作為中間件(Broker),一般可選RabbitMQ或者Redis,通俗的講它就是我們和助理溝通的工具,就像一個日程表,我們將需要助理處理的事情記錄在上面,助理就會看到這些事情然后去處理,最后把處理的結果記錄在日程表上,當我們看到處理結果后,日程表上的這個事項就劃掉了,有時候我們需要保存這些處理結果,所以為助理專門準備了一個記錄處理結果(Backend)的小本本,這個小本本通常使用Redis。
image
整個系統構成經典的生產者消費者模型。生產者和消費者是相對于消息隊列的,老板(相當于這里的Flask App)往消息隊列添加任務,所以老板是生產者,助理(Celery的Worker)從消息隊列提取任務,所以是消費者。
使用Celery主要有三個好處:
1. 應用解耦
消息是與平臺無關的,Flask只需要把需求告訴消息隊列即可,由誰來完成并不需要關心,當訪問量增加時對Flask不會造成明顯的沖擊。
2. 異步通信
縮短了請求等待的時間,提高了頁面的吞吐量,尤其是瞬間高流量時,消息隊列能夠有效緩解訪問壓力。
3. 高可靠性
消息隊列冗余機制確保了消息最終都會被處理,不會造成數據或操作丟失的情況。
2、準備工作
要使用Celery,那么首先就要確定它的中間件和結果緩沖區是什么,然后安裝并配置好Celery的worker就算是完成了初步的準備工作。
① 中間件(Broker)
可以使用RabbitMQ,這也是一般的最佳實踐,下載地址如下:
也可以直接使用Redis作為中間件,下載地址如下:
② 結果緩沖區(Backend)
這里為了簡單起見,兩者都直接采用Redis。
③ 安裝并配置Celery
直接使用命令安裝Celery到Python環境:
pip install celery
在Flask配置文件中加入Celery的配置信息,Project/config.py
class Config():
# other config info
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
在Flask項目模塊工廠函數文件中添加Celery的工廠函數,Project/webapp/init.py:
from flaskimport Flask
from celeryimport Celery
# 創建Flask App的工廠函數
def create_app(config_name) ->'app':
app =Flask(__name__)
app.config.from_object(config[config_name])
config[config_name].init_app(app)
from .mainimport mainas main_blueprint
app.register_blueprint(main_blueprint)
return app
# 創建celery的工廠函數
def make_celery(app):
celery = Celery(
app.import_name,
backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL']
)
# celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
"""Will be execute when create the instance object of ContextTask"""
with app.app_context():
return self.run(*args, **kwargs)
# 將app_context 包含在celery.Task中,這樣讓其他的Flask擴展也能正常使用
celery.Task = ContextTask
return celery
Celery 的進程必須在 Flask app 的上下文中運行, 這樣 Celery 才能夠跟其他的 Flask 擴展協同工作。所以必須將 flask_celery 對象注冊到 app 對象中, 并且每創建一個 Celery 進程都需要創建一個新的 Flask app 對象, 這里我們使用工廠模式來創建 celery 對象。
這一點是非常重要的,實際上 Flask application 和Celery application 是兩個不同的進程,在 Celery 沒有加入 Flask 上下文的情況下,Celery 的程序邏輯就不能輕易的訪問 Flask 相關資源,比如不能加載 Flask的環境配置信息,無法通過 Flask 來訪問數據庫,不能使用 Flask 的擴展功能等。如果想做到這些,Celery 都需要自己再實現一套相同的邏輯,這樣做顯然是沒有必要的。所以Flask application 原生支持將自己的 Context 嵌入到別的 application 中,當然有些情況也需要相應擴展的輔助。
注意:這里將 celery.conf.update(app.config) 給注釋起來了,因為會報錯,這個錯誤還沒有弄清楚是什么原因。
3、創建Worker的任務函數
在main文件夾下新建一個tasks.py文件,來存放需要異步執行的耗時任務,首先通過工廠函數make_celery創建Celery實例,后臺任務其實就是一個函數,把要執行的動作放入這個函數中,然后使用celery.task這個裝飾器來裝飾就可以了。
這里只有一個任務,任務的裝飾器包含了bind參數,表示Celery發送一個self參數到我們的任務,這個參數主要用來使用update_state方法更新當前任務執行狀態,update_state方法兩個參數,1個state表示當前狀態,meta是相關的數據,其實就是一個字典。
Project/webapp/main/tasks.py:
import random
import time
from .. import make_celery, create_app
celery = make_celery(create_app('default'))
@celery.task()
def log(msg):
print(msg)
return msg
# 通過celery.task裝飾耗時任務函數,bind為True會傳入self給被裝飾的函數,用于記錄和更新任務狀態
@celery.task(bind=True)
def long_task(self):
# 耗時任務邏輯
verb = ['Starting up', 'Booting', 'Repairing', 'Loading', 'Checking']
adjecetive = ['master', 'radiant', 'silent', 'harmonic', 'fast']
noun = ['solar array', 'particle reshaper', 'cosmic ray', 'orbiter', 'bit']
message = ''
total = random.randint(10, 50)
# 每次獲取一次詞匯,就通過 self.update_state () 更新 Celery 任務的狀態,Celery 包含一些
# 內置狀態,如 SUCCESS、STARTED 等等,這里使用了自定義狀態「PROGRESS」,除了狀態外,還將本
# 次循環的一些信息通過 meta 參數 (元數據) 以字典的形式存儲起來。有了這些數據,前端就可以顯示進度條了。
for i in range(total):
if not message or random.random() < 0.25:
# 隨機取一些信息
message = '{0} {1} {2}...'.format(random.choice(verb), random.choice(adjecetive), random.choice(noun))
print(message)
# 更新Celery任務狀態
self.update_state(state='PROGRESS', meta={'current': i, 'total': total, 'status': message})
time.sleep(0.5) # 每次loop延時0.5秒,為了表示這是耗時任務
# 返回字典
return {'current': 100, 'total': 100, 'status': 'Task Completed!', 'result': u'完成'}
4、在視圖中產生消息
通常在視圖函數中,處理業務邏輯時調用后臺任務,有兩種方法,一種是調用任務的delay方法,直接傳入參數即可,一種是調用入伍的apply_async方法,任務的參數通過args傳遞,countdown是定時任務,表示多少秒以后執行。
通常我們將耗時較長的任務編寫成一個函數,然后使用celery.task裝飾器將其轉化成一個任務對象,最后在視圖函數中需要使用的時候調用它的delay或者apply_async方法來執行。
Project/webapp/main/view.py:
from flask import render_template, url_for
import time
import random
from . import main
from .tasks import long_task, log
# Server 路由
@main.route('/', methods=['GET', 'POST'])
@main.route('/index', methods=['GET', 'POST'])
def index():
log.delay("helo, world!")
log.apply_async(args=["this is a message from main.index"], countdown=60)
return render_template('index.html')
5、前端實時顯示耗時過程執行狀態的應用范例
接下來我們就實現一個在前端實時顯示后端程序執行狀態的進度條應用范例,廢話不多說直接上代碼。
① 后端產生消息
Project/webapp/main/view.py:
# 耗時任務視圖接口
@main.route('/longtask', methods=['GET'])
def longtask():
# 異步調用
task = long_task.apply_async()
# 返回202和Location頭
# 返回的狀態碼為 202,202 通常表示一個請求正在進行中,然后還在返回數據包的包頭 (Header) 中添加了 Location 頭信息
# 前端可以通過讀取數據包中 Header 中的 Location 的信息來獲取任務 id 對應的完整 url
# 前端有了任務 id 對應的 url 后,還需要提供一個接口給前端,讓前端可以通過任務 id 去獲取當前時刻任務的具體狀態
return jsonify({}), 202, {'Location': url_for('main.taskstatus', task_id=task.id)}
這個視圖函數用來啟動后臺任務,返回結果中給出了任務狀態的地址,通過task.id將當前任務傳遞到任務狀態的視圖函數,這樣狀態的視圖函數就能找到這個任務,202狀態碼表示正在執行。
② 請求任務狀態
Project/webapp/main/view.py:
@main.route('/status/', methods=['GET'])
def taskstatus(task_id):
# 為了可以獲得任務對象中的信息,使用任務 id 初始化 AsyncResult 類,獲得任務對象,然后就可以從任務對象中獲得當前任務的信息
task = long_task.AsyncResult(task_id)
# 如果任務在 PENDING 狀態,表示該任務還沒有開始,在這種狀態下,任務中是沒有什么信息的,這里人為的返回一些數據
# 如果任務執行失敗,就返回 task.info 中包含的異常信息,此外就是正常執行了,正常執行可以通 task.info 獲得任務中具體的信息
if task.state == 'PENDING': # 等待中
response = {
'state': task.state,
'current': 0,
'total': 1,
'status': 'Pending...'
}
elif task.state != 'FAILURE': # 沒有失敗
response = {
'state': task.state,
# meta中的數據,通過task.info.get()可以獲得
'current': task.info.get('current', 0), # 當前循環進度
'total': task.info.get('total', 1), # 總循環進度
'status': task.info.get('status', '')
}
if 'result' in task.info:
response['result'] = task.info['result']
else:
# 后端執行任務出現了一些問題
response = {
'state': task.state,
'current': 1,
'total': 1,
'status': str(task.info) # 錯誤的具體描述
}
# 該方法會返回一個 JSON,其中包含了任務狀態以及 meta 中指定的信息,前端可以利用這些信息構建一個進度條
return jsonify(response)
這個就是用來請求任務狀態的路由,首先使用task_id找到正在執行的任務。通過task.state可以獲取到任務的狀態,PENDING表示任務還未開始,通過task.info.get()方法得到前面self.update_state中meta參數的內容。
后端的代碼就已經完成了,然后就是前端來獲取數據并使用進度條插件實現進度的可視化。
③ 前端實現
Project/webapp/templates/index.html
{% extends "base.html" %}
{% block content %}
{{ super() }}
Long running task with progress updates
Start Long Calculation
{% endblock %}
{% block scripts %}
{{ super() }}
// 按鈕點擊事件
$(function () {
$('#start-bg-job').click(start_long_task);
});
// 請求 longtask接口
function start_long_task() {
// 添加元素到html中
div = $('
0%...');
$('#progress').append(div);
// 創建進度條對象
var nanobar = new Nanobar({
bg: '#44f',
target: div[0].childNodes[0]
});
// ajax 請求longtask
$.ajax({
type: 'GET',
url: '/longtask',
// 獲得數據,從響應頭中獲取Location
success: function (data, status, request) {
let status_url = request.getResponseHeader('Location');
// 調用update_progress() 方法更新進度條
update_progress(status_url, nanobar, div[0]);
},
error: function () {
alert('Unexpected error');
}
});
}
// 更新進度條
function update_progress(status_url, nanobar, status_div) {
// getJson()方法是JQuery內置方法,這里向Location中對應的url發起請求,即請求/status/
$.getJSON(status_url, function (data) {
// 計算進度
let percent = parseInt(data['current'] * 100 / data['total']);
// 更新進度條
nanobar.go(percent);
// 更新文字
$(status_div.childNodes[1]).text(percent + '%');
$(status_div.childNodes[2]).text(data['status']);
if (data['state'] !== 'PENDING' && data['state'] !== 'PROGRESS') {
if ('result' in data) {
// 展示結果
$(status_div.childNodes[3]).text('Result:' + data['result']);
} else {
// 意料之外的事情發生
$(status_div.childNodes[3]).text('Result:' + data['state'])
}
} else {
// 1秒后再次運行
setTimeout(function () {
update_progress(status_url, nanobar, status_div);
}, 1000)
}
});
}
{% endblock %}
前端使用nanobar.js來顯示進度條,定義了兩個函數,start_long_task訪問/longtask來啟動任務,遞歸函數update_progress來更新任務處理進度。
前端沒什么好說的,注釋已經寫的很清楚了。
6、 實際運行
由系統原理可以看到,系統想要運行起來首先就要啟動Broker服務器和Backend服務器,然后啟動Worker,最后才啟動Flask服務器。
① 啟動Broker和Backend服務器
本文中這兩者都是使用的Redis,所以只需要啟動Redis-server就可以了
這里有個坑,就是安裝好redis后,直接啟動是會報錯的
(venv) D:\PythonWS\Project>redis-server
[7960] 08 Oct 07:59:35.174 Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf
[7960] 08 Oct 07:59:35.184 Creating Server TCP listening socket *:6379: bind: No such file or directory
解決辦法是要先在redis-cli中把redis給shutdown之后再啟動,如果在redis安裝目錄里的redis.windows-service.conf設置了密碼的話還要輸入auth password才能執行shutdown指令
(venv) D:\PythonWS\Project>redis-cli
127.0.0.1:6379> auth "789789"
OK
127.0.0.1:6379> shutdown
not connected> exit
(venv) D:\PythonWS\Project>redis-server
[2232] 08 Oct 08:00:23.429 Warning: no config file specified, using the default config.
In order to specify a config file use redis-server /path/to/redis.conf
_._
_.-``__ ''-._
_.-`` `. `_. ''-._ Redis 3.2.100 (00000000/0) 64 bit
.-`` .-```. ```\/ _.,_ ''-._
( ' , .-` | `, ) Running in standalone mode
|`-._`-...-` __...-.``-._|'` _.-'| Port: 6379
| `-._ `._ / _.-' | PID: 2232
`-._ `-._ `-./ _.-' _.-'
|`-._`-._ `-.__.-' _.-'_.-'|
| `-._`-._ _.-'_.-' | http://redis.io
`-._ `-._`-.__.-'_.-' _.-'
|`-._`-._ `-.__.-' _.-'_.-'|
| `-._`-._ _.-'_.-' |
`-._ `-._`-.__.-'_.-' _.-'
`-._ `-.__.-' _.-'
`-._ _.-'
`-.__.-'
至此,我們的Broker和Backend服務器已經啟動完畢。
② 啟動Worker服務器
使用以下指令啟動Worker服務器
celery -A webapp.main.tasks worker -l info -f celery.log --pool=eventlet
webapp.main.tasks 正是celery實例所在的位置,-l info表示消息記錄等級,也可以這樣寫--loglevel=info, -f celery.log表示將log信息寫到文件celery.log中, --pool=eventlet表示使用eventlet的線程池進行任務分配,這句很重要在windows環境下如果沒有這句會報錯的,可能是這個模塊在windows下的兼容性不是太好吧!
執行效果如下所示:
(venv) D:\PythonWS\Project>celery -A webapp.main.tasks worker -l info -f celery.log --pool=eventlet
-------------- celery@Matsuri v4.3.0 (rhubarb)
---- **** -----
--- * *** * -- Windows-7-6.1.7601-SP1 2019-10-09 13:28:21
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: webapp:0x7108710
- ** ---------- .> transport: redis://localhost:6379/0
- ** ---------- .> results: redis://localhost:6379/1
- *** --- * --- .> concurrency: 4 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. webapp.main.tasks.long_task
注意:Wroker的并發性(concurrency)是4,這個能不能改我還不知道,后續研究了以后再補上。
在celery.log中可以看到以下信息:
[2019-10-09 13:28:21,185: INFO/MainProcess] Connected to redis://localhost:6379/0
[2019-10-09 13:28:21,194: INFO/MainProcess] mingle: searching for neighbors
[2019-10-09 13:28:23,214: INFO/MainProcess] mingle: all alone
[2019-10-09 13:28:23,225: INFO/MainProcess] celery@Matsuri ready.
[2019-10-09 13:28:23,240: INFO/MainProcess] pidbox: Connected to redis://localhost:6379/0.
至此,我們的Wroker服務器也啟動起來了。
③ 啟動Flask服務器
(venv) D:\PythonWS\Project>python manage.py server
* Serving Flask app "webapp" (lazy loading)
* Environment: production
WARNING: This is a development server. Do not use it in a production deployment.
Use a production WSGI server instead.
* Debug mode: on
* Restarting with stat
* Debugger is active!
* Debugger PIN: 689-169-902
* Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
至此,所有的準備工作都已經完成,可以在瀏覽器中輸入我們的地址了。
④ 顯示效果
點擊Start Long Calculation按鈕5次,頁面上將生成5個進度條,可以看到前四個已經在執行了,最后一個處于掛起狀態,這是由Wroker的并發性決定的。
前端頁面顯示效果
我們最后再來看看后端的提示信息。
首先看看redis:
[2232] 09 Oct 18:24:30.496 * Background saving started by pid 11924
[2232] 09 Oct 18:24:30.596 # fork operation complete
[2232] 09 Oct 18:24:30.596 * Background saving terminated with success
[2232] 09 Oct 18:29:31.016 * 100 changes in 300 seconds. Saving...
[2232] 09 Oct 18:29:31.019 * Background saving started by pid 11900
[2232] 09 Oct 18:29:31.219 # fork operation complete
[2232] 09 Oct 18:29:31.219 * Background saving terminated with success
然后看看Flask后臺:
127.0.0.1 - - [09/Oct/2019 18:24:30] "GET /longtask HTTP/1.1" 202 -
127.0.0.1 - - [09/Oct/2019 18:24:30] "GET /status/4b3edb32-5626-4904-bf1b-d7dc2057715c HTTP/1.1" 200 -
127.0.0.1 - - [09/Oct/2019 18:24:30] "GET /longtask HTTP/1.1" 202 -
127.0.0.1 - - [09/Oct/2019 18:24:30] "GET /status/9ea6b9c6-4aa9-428b-b4f5-8dfcd98f03b8 HTTP/1.1" 200 -
127.0.0.1 - - [09/Oct/2019 18:24:31] "GET /status/4b3edb32-5626-4904-bf1b-d7dc2057715c HTTP/1.1" 200 -
127.0.0.1 - - [09/Oct/2019 18:24:31] "GET /longtask HTTP/1.1" 202 -
127.0.0.1 - - [09/Oct/2019 18:24:31] "GET /status/b069a080-d669-4c65-aacf-dcc4ca03239b HTTP/1.1" 200 -
127.0.0.1 - - [09/Oct/2019 18:24:31] "GET /status/9ea6b9c6-4aa9-428b-b4f5-8dfcd98f03b8 HTTP/1.1" 200 -
127.0.0.1 - - [09/Oct/2019 18:24:32] "GET /longtask HTTP/1.1" 202 -
127.0.0.1 - - [09/Oct/2019 18:24:32] "GET /status/6dec5d07-3ab8-4a8d-8e10-f28b8eb1b21e HTTP/1.1" 200 -
127.0.0.1 - - [09/Oct/2019 18:24:32] "GET /status/4b3edb32-5626-4904-bf1b-d7dc2057715c HTTP/1.1" 200 -
127.0.0.1 - - [09/Oct/2019 18:24:32] "GET /status/b069a080-d669-4c65-aacf-dcc4ca03239b HTTP/1.1" 200 -
127.0.0.1 - - [09/Oct/2019 18:24:32] "GET /longtask HTTP/1.1" 202 -
127.0.0.1 - - [09/Oct/2019 18:24:32] "GET /status/49f5c67d-9636-4aaf-bbd9-7a40c45c535d HTTP/1.1" 200 -
127.0.0.1 - - [09/Oct/2019 18:24:32] "GET /status/9ea6b9c6-4aa9-428b-b4f5-8dfcd98f03b8 HTTP/1.1" 200 -
127.0.0.1 - - [09/Oct/2019 18:24:33] "GET /status/6dec5d07-3ab8-4a8d-8e10-f28b8eb1b21e HTTP/1.1" 200 -
127.0.0.1 - - [09/Oct/2019 18:24:33] "GET /status/4b3edb32-5626-4904-bf1b-d7dc2057715c HTTP/1.1" 200 -
127.0.0.1 - - [09/Oct/2019 18:24:33] "GET /status/b069a080-d669-4c65-aacf-dcc4ca03239b HTTP/1.1" 200 -
127.0.0.1 - - [09/Oct/2019 18:24:33] "GET /status/49f5c67d-9636-4aaf-bbd9-7a40c45c535d HTTP/1.1" 200 -
127.0.0.1 - - [09/Oct/2019 18:24:33] "GET /status/9ea6b9c6-4aa9-428b-b4f5-8dfcd98f03b8 HTTP/1.1" 200 -
127.0.0.1 - - [09/Oct/2019 18:24:34] "GET /status/6dec5d07-3ab8-4a8d-8e10-f28b8eb1b21e HTTP/1.1" 200 -
127.0.0.1 - - [09/Oct/2019 18:24:34] "GET /status/4b3edb32-5626-4904-bf1b-d7dc2057715c HTTP/1.1" 200 -
127.0.0.1 - - [09/Oct/2019 18:24:34] "GET /status/b069a080-d669-4c65-aacf-dcc4ca03239b HTTP/1.1" 200 -
127.0.0.1 - - [09/Oct/2019 18:24:34] "GET /status/49f5c67d-9636-4aaf-bbd9-7a40c45c535d HTTP/1.1" 200 -
127.0.0.1 - - [09/Oct/2019 18:24:34] "GET /status/9ea6b9c6-4aa9-428b-b4f5-8dfcd98f03b8 HTTP/1.1" 200 -
127.0.0.1 - - [09/Oct/2019 18:24:35] "GET /status/6dec5d07-3ab8-4a8d-8e10-f28b8eb1b21e HTTP/1.1" 200 -
最后看看celery.log 里的內容:
[2019-10-09 18:24:30,408: INFO/MainProcess] Received task: webapp.main.tasks.long_task[4b3edb32-5626-4904-bf1b-d7dc2057715c]
[2019-10-09 18:24:30,792: INFO/MainProcess] Received task: webapp.main.tasks.long_task[9ea6b9c6-4aa9-428b-b4f5-8dfcd98f03b8]
[2019-10-09 18:24:31,496: INFO/MainProcess] Received task: webapp.main.tasks.long_task[b069a080-d669-4c65-aacf-dcc4ca03239b]
[2019-10-09 18:24:32,001: INFO/MainProcess] Received task: webapp.main.tasks.long_task[6dec5d07-3ab8-4a8d-8e10-f28b8eb1b21e]
[2019-10-09 18:24:32,563: INFO/MainProcess] Received task: webapp.main.tasks.long_task[49f5c67d-9636-4aaf-bbd9-7a40c45c535d]
[2019-10-09 18:24:45,730: INFO/MainProcess] Task webapp.main.tasks.long_task[6dec5d07-3ab8-4a8d-8e10-f28b8eb1b21e] succeeded in 13.728000000002794s: {'current': 100, 'total': 100, 'status': 'Task Completed!', 'result': '完成'}
[2019-10-09 18:24:50,746: INFO/MainProcess] Task webapp.main.tasks.long_task[4b3edb32-5626-4904-bf1b-d7dc2057715c] succeeded in 20.32699999999022s: {'current': 100, 'total': 100, 'status': 'Task Completed!', 'result': '完成'}
[2019-10-09 18:24:53,878: INFO/MainProcess] Task webapp.main.tasks.long_task[b069a080-d669-4c65-aacf-dcc4ca03239b] succeeded in 22.386000000013155s: {'current': 100, 'total': 100, 'status': 'Task Completed!', 'result': '完成'}
[2019-10-09 18:24:56,217: INFO/MainProcess] Task webapp.main.tasks.long_task[9ea6b9c6-4aa9-428b-b4f5-8dfcd98f03b8] succeeded in 25.427999999999884s: {'current': 100, 'total': 100, 'status': 'Task Completed!', 'result': '完成'}
[2019-10-09 18:25:11,115: INFO/MainProcess] Task webapp.main.tasks.long_task[49f5c67d-9636-4aaf-bbd9-7a40c45c535d] succeeded in 25.396999999997206s: {'current': 100, 'total': 100, 'status': 'Task Completed!', 'result': '完成'}
最終的結果和我們想要的一樣,如果在啟動地方需要使用Celery,也可以這樣進行處理。
總結
以上是生活随笔為你收集整理的celery 可视化_在Flask中使用Celery进行多任务分布执行的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 可视化_仓库管理可视化
- 下一篇: python字符计数怎样去除空格_去除p