【Python SQLAlchemy】数据库连接池SQLAlchemy中多线程安全的问题
問題描述
寫了一個apscheduler定時任務
里面用到SQLAlchemy在同一時間點開啟了18個任務,用于更新18個表
但是最終看數據庫表的時候,發現有2個表未更新。查看打印日志均正常,查找原因。
參考以下
提問
最近把原來的單線程改為多線程,從而引起了sqlalchemy的錯誤。
我自己簡單封裝了 一個sqlalchemy類,用來進行各種數據庫操作。
自從改為多線程后,線程一多,在更新數據的時候就會出錯。
我自己封裝的sqlalchemy,是共用一個 Session的
回答
我也遇到了同樣的問題,產生問題的原因是session并不是線程安全,當你在一個線程commit時,所有線程都會commit(好像是這樣,不是100%確定),文檔推薦使用scoped_session
數據庫連接池SQLAlchemy中多線程安全的問題
正確示范:
1、數據庫模塊model.py
from sqlalchemy.orm import scoped_session from sqlalchemy.orm import sessionmaker session_factory = sessionmaker(bind=some_engine) Session = scoped_session(session_factory)2、業務模塊thread.py
import threading from model import Sessionclass User(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True)name = Column(String(20))fullname = Column(String(20))password = Column(String(20))age = Column(Integer)class MyThread(threading.Thread):def __init__(self, threadName):super(MyThread, self).__init__()self.name = threading.current_thread().namedef run(self):session = Session() #每個線程都可以直接使用數據庫模塊定義的Sessionsession.query(User).all()user = User(name="hawk-%s"%self.name, fullname="xxxx",password="xxxx",age=10)session.add(user)time.sleep(1)if self.name == "thread-9":session.commit()Session.remove()if __name__ == "__main__":arr = []for i in xrange(10):arr.append(MyThread('thread-%s' % i))for i in arr:i.start()for i in arr:i.join()錯誤示范:
class MyThread(threading.Thread):def __init__(self, threadName):super(MyThread, self).__init__()self.session = Session() #錯誤!self.name = threading.current_thread().namedef run(self):self.session.query(User).all()user = User(name="hawk-%s"%self.name, fullname="xxxx",password="xxxx",age=10)self.session.add(user)time.sleep(1)if self.name == "thread-9":self.session.commit()Session.remove()錯誤解析:
看了SQLAlchemy源碼之后發現,Session() 返回的是一個threading.local()對象的成員變量,threading.local()對象只有在線程內部才能實現線程隔離,因此只能放在run()函數里,而不能作為類成員變量。
如果按照錯誤示例來運行,所有線程其實公用了一個session,沒有做到線程隔離,session.commit()操作會互相影響,我們原本只想將thread-9中的數據插入,結果會發現,所有線程中的數據全部被插入。
使用Nullpool避免使用SQLAlchemy使用連接池
在使用 create_engine創建引擎時,如果默認不指定連接池設置的話,一般情況下,SQLAlchemy會使用一個 QueuePool綁定在新創建的引擎上。并附上合適的連接池參數。
在以默認的方法create_engine時(如下),就會創建一個帶連接池的引擎。
engine = create_engine('postgresql://postgres@127.0.0.1/dbname')
在這種情況下,當你使用了session后就算顯式地調用session.close(),也不能把連接關閉。連接會由QueuePool連接池進行管理并復用。
這種特性在一般情況下并不會有問題,不過當數據庫服務器因為一些原因進行了重啟的話。最初保持的數據庫連接就失效了。隨后進行的session.query()等方法就會拋出異常導致程序出錯。
如果想禁用SQLAlchemy提供的數據庫連接池,只需要在調用create_engine是指定連接池為NullPool,SQLAlchemy就會在執行session.close()后立刻斷開數據庫連接。當然,如果session對象被析構但是沒有被調用session.close(),則數據庫連接不會被斷開,直到程序終止。
結論
結合以上,因為添加過nullpool,所以整個過程只保持一個session(相當于connection)連接。
而在apscheduler開啟的多線程環境下,session.commit()、session.close()操作會互相影響,導致多次(誤)commit,甚至在執行session.close()后斷開數據庫連接,導致寫數據失敗
SQLAlchemy的session不是線程安全的
在apscheduler開啟的多線程環境下,所有線程公用了一個session
每個線程中session.commit()會互相影響,一次調用,全部提交
如果調用了session.rollback(),所有線程全部rollback,導致不可預期影響
解決方式
方式1:避免任務重疊
方式2:似乎可以使用 scoped_session,重構代碼吧,那就是另外一個坑了
拓展閱讀
使用flask-sqlalchemy寫代碼碼到一半,突然想到,Session是否是線程安全的?于是上官方文檔,答案是否
結論:flask-sqlalchemy 對 sqlachemy 做了非常好的封裝,是線程安全的
總結
以上是生活随笔為你收集整理的【Python SQLAlchemy】数据库连接池SQLAlchemy中多线程安全的问题的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【Python小工具函数】各种日期格式的
- 下一篇: Python获取当前目录和上级目录