MaxCompute的任务状态和多任务执行
本文作者:龍利民
企業介紹:
ofo 小黃車是一個無樁共享單車出行平臺,締造了“無樁單車共享”模式,致力于解決城市出行問題。用戶只需在微信服務號或App輸入車牌號,即可獲得密碼解鎖用車,隨取隨用,隨時隨地,也可以共享自己的單車到 ofo 共享平臺,獲得所有 ofo 小黃車的終身免費使用權,以1換N。
我們在使用MaxCompute的時候,我們其實非常期望知道當前有多少任務在跑,哪些任務耗時長,哪些任務已經完成,并且能通過任務的logview來分析任務耗時長的原因。
任務狀態監控
MaxCompute的任務狀態分Running和Terminated, 其中Running是包含:正在運行和等待運行的兩種狀態,Terminated包含:完成、失敗、cancel的任務三個狀態。阿里云提供了獲取上述2種狀態的SDK函數,odps.list_instances(status=Running|Terminated, start_time=開始時間,結束時間)。為了實現秒級別更新任務狀態我們可以用以下思路來實現。
1、對于已經running的任務,我們需要快速更新它的狀態,有可能已經完成了;
2、不斷獲取新的任務狀態。
我們用Mysql來記錄任務的狀態表設計如下:
CREATE TABLE maxcompute_task (
id bigint(20) unsigned NOT NULL AUTO_INCREMENT,
instanceid varchar(255) DEFAULT NULL comment '任務實例ID',
logview varchar(1024) DEFAULT NULL comment 'logview鏈接,查看問題非常有用',
start_time varchar(64) DEFAULT NULL comment '任務開始時間',
end_time varchar(64) DEFAULT NULL comment '任務結束時間',
cast_time varchar(32) DEFAULT NULL comment '耗時',
project_name varchar(255) DEFAULT NULL comment '項目名',
status varchar(64) DEFAULT NULL comment '任務狀態',
PRIMARY KEY (id),
UNIQUE KEY instanceid (instanceid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
下面的頁面可以查看當前的任務耗時,開始時間,對超過1小時的任務顏色使用紅色標注,并且能查看logview,還能對任務進行取消,非常方便。
我們來看看代碼的實現:
!/usr/bin/env python
-- coding: utf-8 --
author: lemon
import time
import threading
import traceback
import datetime
from odps import ODPS
from dataflow import config
from libs.myconn import Cursor
from config import DBINFO_BI_MASTER
from libs import logger as _logger
g_table_name = "bi_maxcompute_task"
def save_task(instanceid, odps, mysqlconn):
# 保存任務狀態到Mysql, 分別傳入odps連接器和mysql連接器 instance = odps.get_instance(instanceid) project_name = odps.project status = instance.status.value start_time = instance.start_time end_time = instance.end_timesql = "select logview,status from {0} where instanceid='{1}'".format(g_table_name, instanceid)sqlret = mysqlconn.fetchone(sql) if sqlret and sqlret["status"] == "Terminated":return if sqlret and sqlret["logview"] is not None:logview = sqlret["logview"] else:logview = instance.get_logview_address() start_time = start_time + datetime.timedelta(hours=8) if status == "Running":end_time = datetime.datetime.now() else:end_time = end_time + datetime.timedelta(hours=8) cast_time = end_time - start_time colname = "instanceid,start_time,end_time,cast_time,project_name,status,logview" values = ",".join(["'{0}'".format(r) for r in [instanceid, str(start_time),str(end_time), cast_time, project_name, status,logview]]) sql = """replace into {0}({1}) values({2}) """.format(g_table_name, colname, values) mysqlconn.execute(sql)class MaxcomputeTask(threading.Thread):
# 獲取所有任務def __init__(self, logger):threading.Thread.__init__(self)self.logger = loggerself.hour = 1self.status_conf = [("demo", "Running"), ("demo", "Terminated"),("demo1", "Running"), ("demo1","Terminated")]def run(self):# 建立mysql連接, 根據你的需要來使用self.mysqlconn = Cursor.new(**DBINFO_BI_MASTER)while True:try:self.start_more()time.sleep(10)except:self.mysqlconn = Cursor.new(**DBINFO_BI_MASTER)self.logger.error(traceback.format_exc())def start_more(self,):for params in self.status_conf:self.get_task(*params)def get_task(self, project_name, status):odps = ODPS(**config.ODPS_INFO)odps.project = project_namelist = odps.list_instances(status=status, start_time=time.time() - self.hour * 3600)self.logger.info("start {0} {1} ".format(project_name, status))for row in list:save_task(instanceid=str(row), odps=odps, mysqlconn=self.mysqlconn)self.logger.info( "end {0} {1}".format(project_name, status))class MaxcomputeTaskRunning(threading.Thread):
# 更新running任務的狀態def __init__(self, logger):threading.Thread.__init__(self)self.logger = loggerdef run(self):self.mysqlconn = Cursor.new(**DBINFO_BI_MASTER)while True:try:self.update_running()time.sleep(1)except:self.mysqlconn = Cursor.new(**DBINFO_BI_MASTER)self.logger.error(traceback.format_exc())def update_running(self):sql = "select instanceid, project_name from {0} where status='Running'".format(g_table_name)sqlret = self.mysqlconn.fetchall(sql)if not sqlret:returnself.logger.info("{1} running update length:{0}".format(len(sqlret), time.strftime("%Y-%m-%d %H:%M:%S") ))for row in sqlret:odps = ODPS(**config.ODPS_INFO)odps.project = row["project_name"]save_task(row["instanceid"], odps, self.mysqlconn)if name == "__main__":
# logger是自己編寫的日志工具類 logger = _logger.Logger("maxcompute_task.log").getLogger() running = MaxcomputeTaskRunning(logger) running.setDaemon(True) running.start()task = MaxcomputeTask(logger) task.start()多任務執行
MaxCompute可以在命令行下運行,也可以用SDK,阿里云的集成環境跑任務等。很多時候我們面臨的任務是非常多的,如何做一個多任務的代碼執行器,也是經常遇到的問題。任務執行是一個典型的生產者和消費者的關系,生產者獲取任務,消費者執行任務。這么做有2個好處。
1)任務執行的數量是需要可控的,如果同時運行的任務不可控勢必對服務器資源造成沖擊,
2)多機運行服務,避免單點故障,MaxCompute的任務是運行在云端的,可以通過instanceid獲取到結果,此結果是保留7天的。
我大致貼一些我們在實際場景種的一些代碼,生產者和消費者的代碼:
class Consumer(threading.Thread): def __init__(self, queue, lock):threading.Thread.__init__(self)self.queue = queueself.lock = lockself.timeout = 1800 def run(self):self.execute = Execute()logger.info("consumer %s start" % threading.current_thread().name)while G_RUN_FLAG:try:task = self.queue.get()self.execute.start(task)except:logger.error(traceback.format_exc())
class Producter(threading.Thread):
def __init__(self, queue, lock):threading.Thread.__init__(self)self.queue = queueself.lock = lockself.sleep_time = 30self.step_sleep_time = 5def run(self):self.mysqlconn_bi_master = Cursor.new(**config.DBINFO_BI_MASTER)logger.info("producter %s start" % threading.current_thread().name)while G_RUN_FLAG:if self.queue.qsize() >= QUEUE_SIZE:time.sleep(self.sleep_time)continue# TODOself.queue.put(task)time.sleep(self.step_sleep_time)def main():
queue = Queue.LifoQueue(QUEUE_SIZE) lock = threading.RLock()for _ in xrange(MAX_PROCESS_NUM):consumer = Consumer(queue, lock)consumer.setDaemon(True)consumer.start()producter = Producter(queue, lock) producter.start() producter.join()def signal_runflag(sig, frame):
global G_RUN_FLAG if sig == signal.SIGHUP:logger.info("receive HUP signal ")G_RUN_FLAG = Falseif name == "__main__":
logger.info("execute run") if platform.system() == "Linux":signal.signal(signal.SIGHUP, signal_runflag) main() logger.info("execute exit.")Maxcompute實際執行時的代碼:
def _max_compute_run(self, taskid, sql): # 異步的方式執行hints = {'odps.sql.planner.mode': 'lot','odps.sql.ddl.odps2': 'true','odps.sql.preparse.odps2': 'lot','odps.service.mode': 'off','odps.task.major.version': '2dot0_demo_flighting','odps.sql.hive.compatible': 'true'}new_sql = "{0}".format(sql)instance = self.odps.run_sql(new_sql, hints=hints)#instance = self.odps.run_sql(sql)# 異步的方式執行# instance = self.odps.run_sql(sql)self._save_task_instance_id(taskid, instance.id)# 阻塞直到完成instance.wait_for_success()return instance.id
獲取結果時的代碼:
def instance_result(odps, instance_id): # 通過instance_id 獲取結果 instance = odps.get_instance(instance_id) response = [] with instance.open_reader() as reader:raw_response = [r.values for r in reader]column_names = reader._schema.namesfor line in raw_response:tmp = {}for i in range(len(line)):tmp[column_names[i]] = line[i]response.append(tmp) return response
總結:
阿里云的MaxCompute是非常好用的云計算服務,它的更新和迭代速度都非常快,使用阿里云解放工程師的搭建基礎服務的時間,讓我們更多的專注業務,站在巨人的肩膀上聰明的干活。
總結
以上是生活随笔為你收集整理的MaxCompute的任务状态和多任务执行的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 在使用chrome调试angular2的
- 下一篇: python登录微信自动发送消息和绘画好