python 装饰器实践,实现定时函数和失败异常重复调用
生活随笔
收集整理的這篇文章主要介紹了
python 装饰器实践,实现定时函数和失败异常重复调用
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
python 裝飾器實(shí)踐,實(shí)現(xiàn)定時(shí)函數(shù)和失敗異常重復(fù)調(diào)用
執(zhí)行請(qǐng)求或函數(shù),出現(xiàn)異常情況下指定重復(fù)執(zhí)行次數(shù)
可以作為一個(gè)包調(diào)用
方法 get()和post 做請(qǐng)求,execcunt = 指定請(qǐng)求失敗再次請(qǐng)求次數(shù)
方法loopExecution作為裝飾器,循環(huán)執(zhí)行函數(shù),execcunt = 指定異常再次執(zhí)行次數(shù)
#函數(shù)異常情況下再次執(zhí)行,執(zhí)行次數(shù)
import traceback def loopExecution(func):def wrapper(*args,**kwargs):count = kwargs.pop("execcount", 1)try:res = func(*args,**kwargs)except Exception as err:for i in range(count):try:res = func(*args,**kwargs)if isinstance(res,Exception):return resexcept Exception as err:pass#失敗到達(dá)限定值返回異常信息return traceback.format_exc()# return Nonereturn resreturn wrapper@loopExecution def func():print("我是一個(gè)函數(shù)")raise Exception("執(zhí)行失敗了")if __name__ == '__main__':data = func(execcount=6)print("多次失敗情況下返回的數(shù)據(jù):",data)通用請(qǐng)求,指定請(qǐng)求次數(shù),將其作為一個(gè)包調(diào)用,如包名為my_request
import my_reqeust
response = my_request.get(url)
使用線程對(duì)執(zhí)行函數(shù)限制執(zhí)行時(shí)間,可不加定時(shí)為非阻塞,加定時(shí)間到會(huì)強(qiáng)制停止線程
from threading import Thread import threading import inspect import ctypesdef _async_raise(tid, exctype):"""raises the exception, performs cleanup if needed"""tid = ctypes.c_long(tid)if not inspect.isclass(exctype):exctype = type(exctype)res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))if res == 0:raise ValueError("invalid thread id")elif res != 1:# """if it returns a number greater than one, you're in trouble,# and you should call it again with exc=NULL to revert the effect"""ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)raise SystemError("PyThreadState_SetAsyncExc failed")def stop_thread(thread):_async_raise(thread.ident, SystemExit)def timelimited(exectime=None):def decorator(function):def decorator2(*args, **kwargs):time_out = exectime if not exectime is None else kwargs.pop("exectime",None)if time_out is None:return Exception("exec time param missing ")class TimeLimited(threading.Thread):def __init__(self, _error=None, ):Thread.__init__(self)self._error = _errorself._result = Nonedef run(self):try:result = function(*args, **kwargs)if result is None:self._result = Trueelse:self._result = resultexcept Exception as err:self._error = Exception(err)t = TimeLimited()t.setDaemon(True)t.start()if not time_out is False:t.join(time_out)if isinstance(t._error, Exception):return t._errorelse:if t._result is None:stop_thread(t)return Exception("Time out!")else:return t._resultreturn decorator2return decorator@timelimited(exectime=3)# 設(shè)置運(yùn)行超時(shí)時(shí)間S,優(yōu)先使用,exectime=False是不加定時(shí) def func():time.sleep(5)print("==========")print("==========")print("==========")print("==========")return "aaaa"if __name__ == "__main__":print(func())time.sleep(4)print(threading.enumerate())time.sleep(6)總結(jié)
以上是生活随笔為你收集整理的python 装饰器实践,实现定时函数和失败异常重复调用的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 多媒体计算机软件组成,多媒体计算机系统的
- 下一篇: 让小家变得温馨的小诀窍