python多线程怎么写日志_Python日志记录在多进程下的使用
1、 問(wèn)題描述
項(xiàng)目中,使用RotatingFileHandler根據(jù)日志文件大小來(lái)切分日志。設(shè)置文件的MaxBytes為1GB, backupCount大小為5。
經(jīng)查看,發(fā)現(xiàn)日志文件的大小均小于10MB,且每個(gè)回滾日志文件的寫(xiě)入時(shí)間也都比較接近。
2、 分析
日志文件過(guò)小,猜測(cè)是代碼有問(wèn)題,或者是文件內(nèi)容有丟失;日志寫(xiě)入時(shí)間接近猜測(cè)是同時(shí)寫(xiě)入的問(wèn)題。
經(jīng)檢查,代碼沒(méi)有問(wèn)題,排除此原因。考慮當(dāng)前使用gunicorn的多進(jìn)程啟動(dòng)程序,多半是多個(gè)進(jìn)程同時(shí)寫(xiě)入當(dāng)個(gè)文件造成日志文件丟失。
logging模塊是線程安全的,但并不是進(jìn)程安全的。
如何解決此問(wèn)題呢?首先先過(guò)一遍Python的logging模塊在處理日志回滾的具體實(shí)現(xiàn)方法。
2.1 logging模塊實(shí)現(xiàn)日志回滾
logging中RotatingFileHandler類和TimedRotatingFileHandler類分別實(shí)現(xiàn)按照日志文件大小和日志文件時(shí)間來(lái)切分文件,均繼承自BaseRotatingHandler類。
BaseRotatingHandler類中實(shí)現(xiàn)了文件切分的觸發(fā)和執(zhí)行,具體過(guò)程如下:
def emit(self, record):
"""
Emit a record.
Output the record to the file, catering for rollover as described
in doRollover().
"""
try:
if self.shouldRollover(record):
self.doRollover()
logging.FileHandler.emit(self, record)
except Exception:
self.handleError(record)
具體的執(zhí)行過(guò)程shouldRollover(record)和doRollover()函數(shù)則在RotatingFileHandler類和TimedRotatingFileHandler類中實(shí)現(xiàn)。
以RotatingFileHandler類為例,doRollover()函數(shù)流程如下:
def doRollover(self):
if self.stream:
self.stream.close()
self.stream = None
if self.backupCount > 0:
for i in range(self.backupCount - 1, 0, -1): # 從backupCount,依次到1
sfn = self.rotation_filename("%s.%d" % (self.baseFilename, i))
dfn = self.rotation_filename("%s.%d" % (self.baseFilename, i + 1))
if os.path.exists(sfn):
if os.path.exists(dfn):
os.remove(dfn)
os.rename(sfn, dfn) # 實(shí)現(xiàn)將xx.log.i->xx.log.i+1
dfn = self.rotation_filename(self.baseFilename + ".1")
# ---------start-----------
if os.path.exists(dfn): # 判斷如果xx.log.1存在,則刪除xx.log.1
os.remove(dfn)
self.rotate(self.baseFilename, dfn) # 將xx.log->xx.log.1
# ----------end------------
if not self.delay:
self.stream = self._open() # 執(zhí)行新的xx.log
分析如上過(guò)程,整個(gè)步驟是:
當(dāng)前正在處理的日志文件名為self.baseFilename,該值self.baseFilename = os.path.abspath(filename)是設(shè)置的日志文件的絕對(duì)路徑,假設(shè)baseFilename為error.log。
當(dāng)進(jìn)行文件回滾的時(shí)候,會(huì)依次將error.log.i重命名為error.log.i+1。
判斷error.log.1是否存在,若存在,則刪除,將當(dāng)前日志文件error.log重命名為error.log.1。
self.stream重新指向新建error.log文件。
當(dāng)程序啟動(dòng)多進(jìn)程時(shí),每個(gè)進(jìn)程都會(huì)執(zhí)行doRollover過(guò)程,若有多個(gè)進(jìn)程進(jìn)入臨界區(qū),則會(huì)導(dǎo)致dfn被刪除多次等多種混亂操作。
2.2 多進(jìn)程日志安全輸出到同一文件方案
相應(yīng)的解決方法:
將日志發(fā)送到同一個(gè)進(jìn)程中,由該進(jìn)程負(fù)責(zé)輸出到文件中(使用Queue和QueueHandler將所有日志事件發(fā)送至一個(gè)進(jìn)程中)
對(duì)日志輸出加鎖,每個(gè)進(jìn)程在執(zhí)行日志輸出時(shí)先獲得鎖(用多處理模塊中的Lock類來(lái)序列化對(duì)進(jìn)程的文件訪問(wèn))
讓所有進(jìn)程都將日志記錄至一個(gè)SocketHandler,然后用一個(gè)實(shí)現(xiàn)了套接字服務(wù)器的單獨(dú)進(jìn)程一邊從套接字中讀取一邊將日志記錄至文件(Python手冊(cè)中提供)
3、解決方案
3.1 使用ConcurrentRotatingFileHandler包
該方法就屬于加鎖方案。
ConcurrentLogHandler 可以在多進(jìn)程環(huán)境下安全的將日志寫(xiě)入到同一個(gè)文件,并且可以在日志文件達(dá)到特定大小時(shí),分割日志文件(支持按文件大小分割)。但ConcurrentLogHandler 不支持按時(shí)間分割日志文件的方式。
ConcurrentLogHandler 模塊使用文件鎖定,以便多個(gè)進(jìn)程同時(shí)記錄到單個(gè)文件,而不會(huì)破壞日志事件。該模塊提供與RotatingFileHandler類似的文件循環(huán)方案。
該模塊的首要任務(wù)是保留您的日志記錄,這意味著日志文件將大于指定的最大大小(RotatingFileHandler是嚴(yán)格遵守最大文件大小),如果有多個(gè)腳本的實(shí)例同時(shí)運(yùn)行并寫(xiě)入同一個(gè)日志文件,那么所有腳本都應(yīng)該使用ConcurrentLogHandler,不應(yīng)該混合和匹配這這個(gè)類。
并發(fā)訪問(wèn)通過(guò)使用文件鎖來(lái)處理,該文件鎖應(yīng)確保日志消息不會(huì)被丟棄或破壞。這意味著將為寫(xiě)入磁盤(pán)的每個(gè)日志消息獲取并釋放文件鎖。(在Windows上,您可能還會(huì)遇到臨時(shí)情況,必須為每個(gè)日志消息打開(kāi)和關(guān)閉日志文件。)這可能會(huì)影響性能。在我的測(cè)試中,性能綽綽有余,但是如果您需要大容量或低延遲的解決方案,建議您將其放在其他地方。
這個(gè)包捆綁了portalocker來(lái)處理文件鎖定。由于使用了portalocker模塊,該模塊當(dāng)前僅支持“nt”和“posix”平臺(tái)。
安裝:
pip install ConcurrentLogHandler
該模塊支持Python2.6及以后版本。
ConcurrentLogHandler的使用方法與其他handler類一致,如與RotatingFileHandler的使用方法一樣。
初始化函數(shù)及參數(shù):
class ConcurrentRotatingFileHandler(BaseRotatingHandler):
"""
Handler for logging to a set of files, which switches from one file to the
next when the current file reaches a certain size. Multiple processes can
write to the log file concurrently, but this may mean that the file will
exceed the given size.
"""
def __init__(self, filename, mode='a', maxBytes=0, backupCount=0,
encoding=None, debug=True, delay=0):
參數(shù)含義同Python內(nèi)置RotatingFileHandler類相同,具體可參考上一篇博文Python logging日志管理 - 簡(jiǎn)書(shū)。同樣繼承自BaseRotatingHandler類。
簡(jiǎn)單的示例:
import logging
from cloghandler import ConcurrentRotatingFileHandler
?
logger = logging.getLogger()
rotateHandler = ConcurrentRotatingFileHandler('./logs/my_logfile.log', "a", 1024*1024, 5)
logger.addHandler(rotateHandler)
logger.setLevel(logging.DEBUG)
?
logger.info('This is a info message.')
為了適應(yīng)沒(méi)有ConcurrentRotatingFileHandler包的情況,增加回退使用RotatingFileHandler的代碼:
try:
from cloghandler import ConcurrentRotatingFileHandler as RFHandler
except ImportError:
from warning import warn
warn('ConcurrentRotatingFileHandler package not installed, Using builtin log handler')
from logging.handlers import RotatingFileHandler as RFHandler
運(yùn)行后可以發(fā)現(xiàn),會(huì)自動(dòng)創(chuàng)建一個(gè).lock文件,通過(guò)鎖的方式來(lái)安全的寫(xiě)日志文件。
3.2 對(duì)日志輸出加鎖
TimedRotatingFileHandler類doRollover函數(shù)的主要部分如下:
def doRollover(self):
....
dfn = self.rotation_filename(self.baseFilename + "." +
time.strftime(self.suffix, timeTuple))
# -------begin-------
if os.path.exists(dfn): # 判斷如果存在dfn,則刪除
os.remove(dfn)
self.rotate(self.baseFilename, dfn) # 將當(dāng)前日志文件重命名為dfn
# --------end--------
if self.backupCount > 0:
for s in self.getFilesToDelete():
os.remove(s)
if not self.delay:
self.stream = self._open()
....
修改思路:
判斷dfn文件是否已經(jīng)存在,如果存在,表示已經(jīng)被rename過(guò)了;如果不存在,則只允許一個(gè)進(jìn)程去rename,其他進(jìn)程需等待。
新建一個(gè)類繼承自TimeRotatingFileHandler,修改doRollover函數(shù),只需處理上面代碼的注釋部分即可。如下:
class MPTimeRotatingFileHandler(TimeRotatingFileHandler):
def doRollover(self):
....
dfn = self.rotation_filename(self.baseFilename + "." +
time.strftime(self.suffix, timeTuple))
# ----modify start----
if not os.path.exists(dfn):
f = open(self.baseFilename, 'a')
fcntl.lockf(f.fileno(), fcntl.LOCK_EX)
if os.path.exists(self.baseFilename): # 判斷baseFilename是否存在
self.rotate(self.baseFilename, dfn)
# ----modify end-----
if self.backupCount > 0:
for s in self.getFilesToDelete():
os.remove(s)
....
3.3 重寫(xiě)FileHandler類
logging.handlers.py中各類的繼承關(guān)系如下圖所示:
繼承關(guān)系
TimeRotatingFileHandler類就是繼承自該類,在FileHandler類中增加一些處理。
具體可參考以下博文:
在Python官方手冊(cè)中,提供了多進(jìn)程中日志記錄至單個(gè)文件的方法。
logging是線程安全的,將單個(gè)進(jìn)程中的多個(gè)線程日志記錄至單個(gè)文件也是支持的。但將多個(gè)進(jìn)程中的日志記錄至單個(gè)文件中則不支持,因?yàn)樵赑ython中并沒(méi)有在多個(gè)進(jìn)程中實(shí)現(xiàn)對(duì)單個(gè)文件訪問(wèn)的序列化的標(biāo)準(zhǔn)方案。
將多個(gè)進(jìn)程中日志記錄至單個(gè)文件中,有以下幾個(gè)方案:
讓所有進(jìn)程都將日志記錄至一個(gè) SocketHandler,然后用一個(gè)實(shí)現(xiàn)了套接字服務(wù)器的單獨(dú)進(jìn)程一邊從套接字中讀取一邊將日志記錄至文件。
使用 Queue 和 QueueHandler 將所有的日志事件發(fā)送至你的多進(jìn)程應(yīng)用的一個(gè)進(jìn)程中。
3.4 單獨(dú)進(jìn)程負(fù)責(zé)日志事件
一個(gè)單獨(dú)監(jiān)聽(tīng)進(jìn)程負(fù)責(zé)監(jiān)聽(tīng)其他進(jìn)程的日志事件,并根據(jù)自己的配置記錄。
示例:
import logging
import logging.handlers
import multiprocessing
?
from random import choice, random
import time
?
def listener_configurer():
root = logging.getLogger()
h = logging.handlers.RotatingFileHandler('test.log', 'a', 300,10) # rotate file設(shè)置的很小,以便于查看結(jié)果
f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
h.setFormatter(f)
root.addHandler(h)
def listenser_process(queue, configurer):
configurer()
while True:
try:
record = queue.get()
if record is None:
break
logger = logging.getLogger(record.name)
logger.handle(record)
except Exception:
import sys, traceback
print('Whoops! Problem:', file=sys.stderr)
trackback.print_exc(file=sys.stderr)
?
LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING,
logging.ERROR, logging.CRITICAL]
?
LOGGERS = ['a.b.c', 'd.e.f']
?
MESSAGES = [
'Random message #1',
'Random message #2',
'Random message #3',
]
?
def worker_configurer(queue):
h = logging.handlers.QueueHandler(queue)
root = logging.getLogger()
root.addHandler(h)
root.setLevel(logging.DEBUG)
# 該循環(huán)僅記錄10個(gè)事件,這些事件具有隨機(jī)的介入延遲,然后終止
def worker_process(queue, configurer):
configurer(queue)
name = multiprocessing.current_process().name
print('Worker started:%s'%name)
for i in range(10):
time.sleep(random())
logger = logging.getLogger(choice(LOGGERS))
level = choice(LEVELS)
message = choice(MESSAGES)
logger.log(level, message)
# 創(chuàng)建隊(duì)列,創(chuàng)建并啟動(dòng)監(jiān)聽(tīng)器,創(chuàng)建十個(gè)工作進(jìn)程并啟動(dòng)它們,等待它們完成,然后將None發(fā)送到隊(duì)列以通知監(jiān)聽(tīng)器完成
def main():
queue = multiprocessing.Queue(-1)
listener = multiprocessing.Process(target=listener_process,
args=(queue, listener_configurer))
listener.start()
workers = []
for i in range(10):
worker = multiprocessing.Process(target=worker_process,
args=(queue, listener_configurer))
workers.append(worker)
worker.start()
for w in workers:
w.join()
queue.put_nowait(None)
listener.join()
if __name__ == '__main__':
main()
使用主進(jìn)程中一個(gè)單獨(dú)的線程記錄日志
下面這段代碼展示了如何使用特定的日志記錄配置,例如foo記錄器使用了特殊的處理程序,將foo子系統(tǒng)中所有的事件記錄至一個(gè)文件mplog-foo.log。在主進(jìn)程(即使是在工作進(jìn)程中產(chǎn)生的日志事件)的日志記錄機(jī)制中將直接使用恰當(dāng)?shù)呐渲谩?/p>
import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue
import random
import threading
import time
?
def logger_thread(q):
while True:
record = q.get()
if record is None:
break
logger = logging.getLogger(record.name)
logger.handle(record)
def worker_process(q):
qh = logging.handlers.QueueHandler(q)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
root.addHandler(qh)
levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
logging.CRITICAL]
loggers = ['foo', 'foo.bar', 'foo.bar.baz', 'spam', 'spam.ham', 'spam.ham.eggs']
for i in range(100):
lv1=l = random.choice(levles)
logger = logging.getLogger(random.choice(loggers))
logger.log(lvl, 'Message no. %d', i)
?
for __name__ == '__main__':
q = Queue()
d = {
'version': 1,
'formatters': {
'detailed': {
'class': 'logging.Formatter',
'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'INFO',
},
'file': {
'class': 'logging.FileHandler',
'filename': 'mplog.log',
'mode': 'w',
'formatter': 'detailed',
},
'foofile': {
'class': 'logging.FileHandler',
'filename': 'mplog-foo.log',
'mode': 'w',
'formatter': 'detailed',
},
'errors': {
'class': 'logging.FileHandler',
'filename': 'mplog-errors.log',
'mode': 'w',
'level': 'ERROR',
'formatter': 'detailed',
},
},
'loggers': {
'foo': {
'handlers': ['foofile']
}
},
'root': {
'level': 'DEBUG',
'handlers': ['console', 'file', 'errors']
},
}
workers = []
for i in range(5):
wp = Process(target=worker_process, name='worker %d'%(i+1), args=(q,))
workers.append(wp)
wp.start()
logging.config.dictConfig(d)
lp = threading.Thread(target=logger_thread, args=(q,))
lp.start()
for wp in workers:
wp.join()
q.put(None)
lp.join()
3.5 logging.SocketHandler的方案
具體實(shí)現(xiàn)參考如下博客:
4、參考文獻(xiàn)
與50位技術(shù)專家面對(duì)面20年技術(shù)見(jiàn)證,附贈(zèng)技術(shù)全景圖總結(jié)
以上是生活随笔為你收集整理的python多线程怎么写日志_Python日志记录在多进程下的使用的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 无线网能连但是没网络连接不了怎么办 无线
- 下一篇: 塞斯纳208机长待遇?