python重复执行_关于计时器:在Python中每x秒重复执行一次函数的最佳方法是什么?...
我想永遠每60秒在Python中重復執行一個函數(就像目標C中的NSTimer一樣)。 這段代碼將作為守護進程運行,實際上就像使用cron每分鐘調用python腳本一樣,但不需要用戶設置。
在這個關于用Python實現的cron的問題中,解決方案似乎實際上只是sleep()x秒。 我不需要這樣的高級功能,所以也許這樣的東西可行
1
2
3while True:
# Code executed here
time.sleep(60)
這段代碼有可預見的問題嗎?
一個迂腐的觀點,但可能很關鍵,代碼上面的代碼不會每60秒執行一次,它會在執行之間產生60秒的差距。如果執行的代碼完全沒有時間,則每60秒才會發生一次。
Dupe:stackoverflow.com/questions/373335/…
time.sleep(60)也可以提前和退回
我仍然想知道:這段代碼有任何可預見的問題嗎?
"可預見的問題"是你不能期望通過使用time.sleep(60)每小時進行60次迭代。因此,如果您每次迭代附加一個項目并保留設置長度列表......該列表的平均值將不代表一致的"時間段";因此,諸如"移動平均線"之類的功能可以引用太舊的數據點,這會扭曲您的指示。
有關簡單的示例,請參閱Python調度。
@Banana是的,你可以期待任何問題,因為你的腳本沒有每60秒執行一次。例如。我開始做這樣的事情來分割視頻流和上傳,我最終得到了5-10秒的延遲,因為媒體隊列正在緩沖,而我在循環內處理數據。這取決于您的數據。如果該功能是某種簡單的監視器,它會警告你,例如,當你的磁盤已滿時,你應該沒有任何問題。如果你正在檢查核電站警告警報,你可能最終得到一個城市完全炸毀了x
使用sched模塊,它實現了一個通用的事件調度程序。
1
2
3
4
5
6
7
8
9import sched, time
s = sched.scheduler(time.time, time.sleep)
def do_something(sc):
print"Doing stuff..."
# do your stuff
s.enter(60, 1, do_something, (sc,))
s.enter(60, 1, do_something, (s,))
s.run()
sched模塊用于調度函數在一段時間后運行,如何使用它來每隔x秒重復一次函數調用而不使用time.sleep()?
@Baishampayan:安排新的跑步。
基于sched的Kronos提供了更高級別的界面:razorvine.net/download/kronos.py由TurboGears使用。
那么在packages.python.org/APScheduler上的apscheduler也應該在這一點上提到。
shed模塊的文檔也指向threading.Timer類,它更適合多線程環境。 docs.python.org/2/library/threading.html#threading.Timer可以在sched模塊文檔中找到一個示例。
注意:此版本可能會漂移。您可以使用enterabs()來避免它。這是一個非漂移版本進行比較。
你好,沒有輸出,代碼不起作用。
@ J.F.Sebastian:為什么這個版本可以漂移?
@JavaSa:因為"你的東西"不是瞬間的,time.sleep的錯誤可能會在這里積累。"每隔X秒執行一次"和"反復執行延遲~X秒"是不一樣的。另見此評論
謝謝,接受的答案應該改變,并非所有人都閱讀評論......
這不回答這個問題。
好的,但如何退出此計時器(循環)?
@Apostolos它運行直到沒有更多的預定事件,所以你可以不安排另一次運行,s.enter(60, 1, do_something, (sc,))是調度新運行的行,只是當你想要停止循環時不運行該部分
如果你刪除這一行,那么它不再是一個運行的間隔:它只是執行的延遲;它運行一次,就是這樣。我的問題是,你需要添加一個這個循環將終止的條件。例如。 if ...: return這將終止循環。順便說一下,還有其他更重要的東西:這個循環,正如在示例中使用的那樣,鎖定程序,即在終止之前沒有其他任何東西可以運行。
@Apostolos如果,在do_something()函數內,你執行if ...: return而不調用將終止循環的s.enter(...),因為沒有其他任何東西將被安排運行。代碼流將從s.run()調用解鎖,并在此后有代碼時繼續。
確切地說,這就是我的意思。如果需要終止循環,則需要if ...: return條件。但最重要的是,正如我所說,這種方法"鎖定"程序的流程,即在s.run()之后你不能做任何事情(直到循環終止)。 (請參閱我提供的解決方案,進一步說明。)
@Apostolos您提供的解決方案使用Tkinter.Tk.mainloop()來執行相同操作。用你的術語:在mainloop()之后你不能做任何事情(直到循環結束)。唯一的區別是你正在使用一個UI庫,而我正在使用專門用于安排不嘗試創建UI窗口的調用的庫。
我想我已經解釋過了,但我會根據你的最后評論再試一次。兩者之間的區別在于,當你的時鐘工作時,即在發出s.run()命令之后,沒有其他任何東西可以運行,而在我的方法中,時鐘開始工作,你仍然可以在那之后做你想做的任何動作。只有在完成了想要做的所有事情后,才能給出mainloop()命令。所以差異真的很大。
@Apostolos呵呵,但是,如果時鐘在到達mainloop()之前到期,則不會調用函數... after()調用僅在mainloop運行時才有效,而不是之前。您可以計算通過的時間,并在計劃第一次運行時減去該時間(如果需要)
你喜歡就好。
python如何處理堆棧?由于遞歸,這不會由于尚未完成的函數而執行幾乎無限的堆棧嗎?我需要每天安排幾個數百萬的功能。
@DGoiko預定的功能被添加到列表中,不會被重復調用。當前函數完成后,調度程序將查找要調用的下一個函數。所以調用堆棧永遠不會堆積 - 你不會遇到遞歸問題。
@nosklo因此,如果我做對了,s.enter(60,1,do_something,(sc,))將任務放在一個列表中,在60秒后執行一次,然后,如果調度程序設置為運行,計數器開始向下計時,直到零,函數執行的時刻。如果向列表中添加新元素,則會在執行.enter后立即開始計時。我對嗎?
看來你好嗎?對于@DGoiko來說,如果你計劃一個新的跑步,時鐘就會在enter之后開始
只需將時間循環鎖定到系統時鐘即可。簡單。
1
2
3
4
5import time
starttime=time.time()
while True:
print"tick"
time.sleep(60.0 - ((time.time() - starttime) % 60.0))
+1。你的和twisted答案是每隔x秒運行一個函數的唯一答案。其余的在每次調用后執行該函數延遲x秒。
如果你在哪里添加一些代碼,這需要超過一秒......它會拋出時間并開始落后......在這種情況下接受的答案是正確的...任何人都可以循環一個簡單的打印命令讓它每秒都運行一次......
由于存在的影響,我更喜歡from time import time, sleep;)
@Mayhem:錯了。 1-如果代碼花費的時間超過句點,則沒有解決方案可行(否則最終會耗盡資源)。 2-這個解決方案可以跳過一個勾號,但它總是在整個時期邊界(在這種情況下是一分鐘)運行。這里不能只是"循環一個簡單的打印命令" - 代碼的目的是避免多次迭代后的漂移。
工作非常好。如果你開始將它同步到某個時間,則無需減去starttime:time.sleep(60 - time.time() % 60)對我來說一直很好。我已經將它用作time.sleep(1200 - time.time() % 1200)并且它給了我:00 :20 :40的日志,正如我想要的那樣。
@ J.F.Sebastian:最后% 60的目的是什么?
@AntonSchigur避免多次迭代后的漂移。單個迭代可能會遲早或稍后開始,具體取決于sleep(),timer()精度以及執行循環體所需的時間,但平均迭代總是發生在間隔邊界上(即使跳過一些):while keep_doing_it(): sleep(interval - timer() % interval) 。將其與while keep_doing_it(): sleep(interval)進行比較,其中錯誤可能在多次迭代后累積。
@TrainHeartnet當遺漏模數時,time.time() - starttime的結果將大于設定的時間(在這種情況下為60),因此60 - (time.time() - starttime)的結果將為負,這會導致睡眠功能凍結(不完全是但它只是等待了大量的時間)。在這種情況下,%60可以防止它變得大于60。
這個解決方案在這個線程中具有最少的漂移,但缺點是time.time() - starttime將在一段時間后成為一個非常大的數字。我更喜歡做的是在循環中移動starttime聲明。這不太精確,但在使用較小時間時僅具有明顯的效果。 -Edit:沒關系,它永遠不會變得超過timer.timer(),所以只有你的腳本運行了幾十億年才會出現問題
我一直在尋找比我一直使用的time.time() - start_time方法更好的方法,這個方法看起來精確到0.1秒,這對我來說已經足夠了。
我認為starttime=time.time()也應該作為while循環內的第一行。
@backslashN不,我相信這是重點(記住初始開始并使用模數) - 防止漂移。我在while循環中有類似的代碼(sleep(interval - (end-start))和startTime,但我認為這個解決方案更好。我改編了它,謝謝。
@smoothware,你甚至可以從"time.time() - starttime"刪除"starttime = time.time()"和" - starttime"。間隔仍然相等而不是漂移。只是沒有連接到循環之前的任何時間點。
您可能想要考慮Twisted,它是一個實現Reactor Pattern的Python網絡庫。
1
2
3
4
5
6
7
8
9
10
11
12from twisted.internet import task, reactor
timeout = 60.0 # Sixty seconds
def doWork():
#do work here
pass
l = task.LoopingCall(doWork)
l.start(timeout) # call every sixty seconds
reactor.run()
雖然"while True:sleep(60)"可能會工作Twisted可能已經實現了你最終需要的許多功能(如bobince所指出的守護進程,日志記錄或異常處理),并且可能是一個更強大的解決方案
我知道Twisted可以做到這一點。感謝分享示例代碼!
很好的答案,非常準確,沒有漂移。我想知道這是否會讓CPU在等待執行任務時休眠(等等。)
這種漂移在毫秒級
如果你想要一種非阻塞方式來定期執行你的函數,而不是阻塞無限循環,我會使用一個線程計時器。這樣,您的代碼可以繼續運行并執行其他任務,并且每隔n秒仍然會調用您的函數。我在很長的CPU /磁盤/網絡密集型任務中使用這種技術來打印進度信息。
這是我在類似問題中發布的代碼,包含start()和stop()控件:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26from threading import Timer
class RepeatedTimer(object):
def __init__(self, interval, function, *args, **kwargs):
self._timer = None
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.is_running = False
self.start()
def _run(self):
self.is_running = False
self.start()
self.function(*self.args, **self.kwargs)
def start(self):
if not self.is_running:
self._timer = Timer(self.interval, self._run)
self._timer.start()
self.is_running = True
def stop(self):
self._timer.cancel()
self.is_running = False
用法:
1
2
3
4
5
6
7
8
9
10
11from time import sleep
def hello(name):
print"Hello %s!" % name
print"starting..."
rt = RepeatedTimer(1, hello,"World") # it auto-starts, no need of rt.start()
try:
sleep(5) # your long-running job goes here...
finally:
rt.stop() # better in a try/finally block to make sure the program ends!
特征:
僅限標準庫,無外部依賴項
即使定時器已經啟動/停止,start()和stop()也可以安全地多次調用
要調用的函數可以有位置和命名參數
您可以隨時更改interval,它將在下次運行后生效。 args,kwargs甚至function也是如此!
這個解決方案似乎隨著時間而變化;我需要一個版本,旨在每隔n秒調用該函數而不會漂移。我將在一個單獨的問題中發布更新。
在def _run(self)中,我試圖解決為什么在self.function()之前調用self.start()。你能詳細說說嗎?我認為通過調用start()首先self.is_running總是False所以我們總是會啟動一個新線程。
我想我已經到底了。 @ MestreLion的解決方案每x秒運行一個函數(即t = 0,t = 1x,t = 2x,t = 3x,...),其中在原始海報中,樣本代碼運行一個函數,其間有x秒間隔。此外,我相信這個解決方案有一個錯誤,如果interval比function執行的時間短。在這種情況下,self._timer將在start函數中被覆蓋。
是的,@ RichieEpiscopo,.start()之后對.function()的調用是在t = 0運行函數。如果function花費的時間超過interval,我認為這不會是一個問題,但是在代碼中可能存在一些競爭條件。
這是我能得到的唯一無阻塞方式。謝謝。
@eraoul:是的,這個解決方案確實漂移,雖然它需要幾百甚至幾千次運行才能漂移一秒鐘,具體取決于你的系統。如果這種漂移與您相關,我強烈建議您使用適當的系統調度程序,例如cron
我認為更簡單的方法是:
1
2
3
4
5
6
7
8import time
def executeSomething():
#code here
time.sleep(60)
while True:
executeSomething()
這樣你的代碼就會被執行,然后等待60秒然后它再次執行,等待,執行等......
無需復雜化:D
關鍵字True應為大寫
實際上這是對這個問題最合適的答案!
實際上這不是答案:time sleep()只能在每次執行后等待X秒。例如,如果您的函數需要0.5秒執行并且您使用time.sleep(1),則表示您的函數每1.5秒執行一次,而不是1.您應該使用其他模塊和/或線程來確保某些內容適用于Y次在每X秒。
@kommradHomer:Dave Rove的回答表明你可以每隔X秒使用time.sleep()運行一些東西
在我看來,代碼應該在while True循環中調用time.sleep(),如:def executeSomething(): print('10 sec left') ; while True: executeSomething(); time.sleep(10)
非常好...使用python 3 ...
以下是MestreLion代碼的更新,可以避免隨著時間的推移而進行漫游:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29import threading
import time
class RepeatedTimer(object):
def __init__(self, interval, function, *args, **kwargs):
self._timer = None
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.is_running = False
self.next_call = time.time()
self.start()
def _run(self):
self.is_running = False
self.start()
self.function(*self.args, **self.kwargs)
def start(self):
if not self.is_running:
self.next_call += self.interval
self._timer = threading.Timer(self.next_call - time.time(), self._run)
self._timer.start()
self.is_running = True
def stop(self):
self._timer.cancel()
self.is_running = False
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19import time, traceback
def every(delay, task):
next_time = time.time() + delay
while True:
time.sleep(max(0, next_time - time.time()))
try:
task()
except Exception:
traceback.print_exc()
# in production code you might want to have this instead of course:
# logger.exception("Problem while executing repetitive task.")
# skip tasks if we are behind schedule:
next_time += (time.time() - next_time) // delay * delay + delay
def foo():
print("foo", time.time())
every(5, foo)
如果你想在不阻塞剩余代碼的情況下執行此操作,可以使用它來讓它在自己的線程中運行:
1
2import threading
threading.Thread(target=lambda: every(5, foo)).start()
該解決方案結合了其他解決方案中很少結合的幾種功能:
異常處理:在此級別上盡可能正確處理異常,即:即記錄以進行調試,而不會中止我們的程序。
沒有鏈接:你在許多答案中找到的常見的類鏈實現(用于調度下一個事件)在調度機制(threading.Timer或其他)中出現任何錯誤的方面是脆弱的,這將終止鏈。即使問題的原因已經解決,也不會再發生進一步的執行。與簡單的sleep()相比,一個簡單的循環更加穩健。
沒有漂移:我的解決方案可以準確跟蹤它應該運行的時間。根據執行時間沒有漂移(如許多其他解決方案中那樣)。
跳過:如果一次執行耗費太多時間,我的解決方案將跳過任務(例如,每五秒執行一次X,但X需要6秒)。這是標準的cron行為(并且有充分的理由)。然后,許多其他解決方案只是連續幾次執行任務而沒有任何延遲。對于大多數情況(例如清理任務),這是不希望的。如果需要,只需使用next_time += delay。
不漂流的最佳答案。
upvoted!如何在沒有睡眠的情況下這樣做,我有一個redis訂閱者,有實時數據傳入,因此無法承受睡眠但需要每分鐘運行一些東西
@PirateApp我會在另一個線程中執行此操作。你可以在同一個線程中完成它,但最后你編寫自己的調度系統,這對于評論來說太復雜了。
感謝分享我唯一關心的是我需要訪問一個變量來閱讀它,讀取2個線程中的變量是一個壞主意沒有,因此問題
在Python中,由于GIL,在兩個線程中訪問變量是非常安全的。僅僅讀取兩個線程永遠不應該是一個問題(也不是在其他線程環境中)。僅在沒有GIL的系統中從兩個不同的線程寫入(例如在Java,C ++等中)需要一些顯式同步。
一段時間后我遇到了類似的問題。可能是http://cronus.readthedocs.org可能有幫助嗎?
對于v0.2,以下代碼段有效
1
2
3
4
5
6import cronus.beat as beat
beat.set_rate(2) # 2 Hz
while beat.true():
# do some time consuming work here
beat.sleep() # total loop duration would be 0.5 sec
它與cron之間的主要區別在于異常將終止該守護進程。您可能希望使用異常捕獲器和記錄器進行換行。
這是MestreLion代碼的改編版本。
除了原始函數,這段代碼:
1)添加first_interval用于在特定時間觸發定時器(調用者需要計算first_interval并傳入)
2)用原始代碼解決競爭條件。在原始代碼中,如果控制線程未能取消正在運行的計時器("停止計時器,并取消執行計時器的操作。這只有在計時器仍處于等待階段時才有效。"引自https:// docs.python.org/2/library/threading.html),計時器將無休止地運行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37class RepeatedTimer(object):
def __init__(self, first_interval, interval, func, *args, **kwargs):
self.timer = None
self.first_interval = first_interval
self.interval = interval
self.func = func
self.args = args
self.kwargs = kwargs
self.running = False
self.is_started = False
def first_start(self):
try:
# no race-condition here because only control thread will call this method
# if already started will not start again
if not self.is_started:
self.is_started = True
self.timer = Timer(self.first_interval, self.run)
self.running = True
self.timer.start()
except Exception as e:
log_print(syslog.LOG_ERR,"timer first_start failed %s %s"%(e.message, traceback.format_exc()))
raise
def run(self):
# if not stopped start again
if self.running:
self.timer = Timer(self.interval, self.run)
self.timer.start()
self.func(*self.args, **self.kwargs)
def stop(self):
# cancel current timer in case failed it's still OK
# if already stopped doesn't matter to stop again
if self.timer:
self.timer.cancel()
self.running = False
一個可能的答案:
1
2
3
4
5
6
7import time
t=time.time()
while True:
if time.time()-t>10:
#run your task here
t=time.time()
這是忙著等待因此非常糟糕。
尋找非阻塞計時器的人的好解決方案。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16''' tracking number of times it prints'''
import threading
global timeInterval
count=0
def printit():
threading.Timer(timeInterval, printit).start()
print("Hello, World!")
global count
count=count+1
print(count)
printit
if __name__ =="__main__":
timeInterval= int(input('Enter Time in Seconds:'))
printit()
在用戶輸入的基礎上,它將在每個時間間隔迭代該方法。
它將迭代直到我們手動停止它
我使用Tkinter after()方法,它不會"竊取游戲"(就像之前介紹的sched模塊一樣),即它允許其他東西并行運行:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27import Tkinter
def do_something1():
global n1
n1 += 1
if n1 == 6: # (Optional condition)
print"* do_something1() is done *"; return
# Do your stuff here
# ...
print"do_something1()"+str(n1)
tk.after(1000, do_something1)
def do_something2():
global n2
n2 += 1
if n2 == 6: # (Optional condition)
print"* do_something2() is done *"; return
# Do your stuff here
# ...
print"do_something2()"+str(n2)
tk.after(500, do_something2)
tk = Tkinter.Tk();
n1 = 0; n2 = 0
do_something1()
do_something2()
tk.mainloop()
do_something1()和do_something2()可以并行運行,也可以以任何間隔速度運行。在這里,第二個將執行兩倍快。還注意我使用一個簡單的計數器作為終止任一功能的條件。您可以使用您喜歡的任何其他部分,或者如果您在程序終止之前運行的功能(例如時鐘),則可以使用任何其他部分。
小心你的措辭:after不允許并行運行。 Tkinter是單線程的,一次只能做一件事。如果after安排的某些內容正在運行,則它不會與其余代碼并行運行。如果do_something1和do_something2都安排在同一時間運行,它們將按順序運行,而不是并行運行。
@Apostolos你所有的解決方案都是使用tkinter mainloop而不是sched mainloop,所以它的工作方式完全相同,但允許tkinter接口繼續響應。如果你沒有將tkinter用于其他事情,那么就sched解決方案而言,它不會改變任何東西。您可以在sched解決方案中使用兩個或多個具有不同間隔的預定函數,它將與您的工作完全相同。
不,它的工作方式不同。我解釋了這個。一個"鎖定"程序(即停止流程,你不能做任何其他事情 - 甚至沒有按照你的建議開始另一個場景工作)直到它完成而另一個讓你的手/自由自由(即你可以做在它開始之后的其他事情。你不必等待它完成。這是一個巨大的差異。如果你嘗試過我提出的方法,你會親眼看到。我試過你的。為什么不是你試試我的?
例如,顯示當前本地時間
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15import datetime
import glib
import logger
def get_local_time():
current_time = datetime.datetime.now().strftime("%H:%M")
logger.info("get_local_time(): %s",current_time)
return str(current_time)
def display_local_time():
logger.info("Current time is: %s", get_local_time())
return True
# call every minute
glib.timeout_add(60*1000, display_local_time)
我使用它來導致每小時60個事件,大多數事件發生在整個分鐘后的相同秒數:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43import math
import time
import random
TICK = 60 # one minute tick size
TICK_TIMING = 59 # execute on 59th second of the tick
TICK_MINIMUM = 30 # minimum catch up tick size when lagging
def set_timing():
now = time.time()
elapsed = now - info['begin']
minutes = math.floor(elapsed/TICK)
tick_elapsed = now - info['completion_time']
if (info['tick']+1) > minutes:
wait = max(0,(TICK_TIMING-(time.time() % TICK)))
print ('standard wait: %.2f' % wait)
time.sleep(wait)
elif tick_elapsed < TICK_MINIMUM:
wait = TICK_MINIMUM-tick_elapsed
print ('minimum wait: %.2f' % wait)
time.sleep(wait)
else:
print ('skip set_timing(); no wait')
drift = ((time.time() - info['begin']) - info['tick']*TICK -
TICK_TIMING + info['begin']%TICK)
print ('drift: %.6f' % drift)
info['tick'] = 0
info['begin'] = time.time()
info['completion_time'] = info['begin'] - TICK
while 1:
set_timing()
print('hello world')
#random real world event
time.sleep(random.random()*TICK_MINIMUM)
info['tick'] += 1
info['completion_time'] = time.time()
根據實際情況,您可能會得到長度的標記:
160,60,62,58,60,60,120,30,30,60,60,60,60,60...etc.
但在60分鐘結束時,你將有60個蜱蟲;并且它們中的大多數將以正確的偏移量發生到您喜歡的那一分鐘。
在我的系統上,我得到典型的漂移<1/20秒,直到需要進行校正。
這種方法的優點是時鐘漂移的分辨率;如果您正在執行諸如每個刻度附加一個項目并且您希望每小時附加60個項目,這可能會導致問題。如果不考慮漂移,可能會導致移動平均等次要指示將數據過深地考慮在過去,從而導致輸出錯誤。
總結
以上是生活随笔為你收集整理的python重复执行_关于计时器:在Python中每x秒重复执行一次函数的最佳方法是什么?...的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python数据分析、整理、汇总展示_p
- 下一篇: 相关方登记册模板_项目的主要相关方