flask框架基础
一 web的一些框架介紹
Flask:短小精悍,內部沒有包含多少組件,但是第三方的組件是非常豐富的。
Django:django是一個重武器,內部包含了非常多的組件:orm,form,modelForm,緩存,session等等
Tornado:牛逼之處就是異步非阻塞框架和node.js
二 Flask的快速入門
創建python虛擬環境:virtualenv 虛擬名
安裝:pip install flask
什么是werkzeug:Werkzeug是一個WSGI工具包,他可以作為一個Web框架的底層庫。官方的介紹說是一個 WSGI 工具包,它可以作為一個 Web 框架的底層庫,因為它封裝好了很多 Web 框架的東西,例如 Request,Response 等等
from werkzeug.wrappers import Request, Response@Request.application def hello(request):return Response('Hello World!')if __name__ == '__main__':from werkzeug.serving import run_simplerun_simple('localhost', 4000, hello) View Code基本使用:
from flask import Flask app = Flask(__name__)@app.route('/') def hello_world():return 'Hello World!'if __name__ == '__main__':app.run() View Codewsgi:使用werkzeug模塊實現的,還可以使用wsgiref實現。本質是導入socket實現的。
一旦出現這個,監聽事件就開始了
實例化Flask對象
app.run():監聽用戶的請求,一旦有用戶的請求過來,就會直接執行用戶的__call__方法。
flask的所有相關的組件全都存在flask文件下的,需要什么導入什么
from flask import Flask,render_template,request,redirect,session,url_forfor_url:高級版的重定向
from flask import Flask,render_template,request,redirect,session,url_forapp = Flask(__name__) app.debug=True app.secret_key='fangshao' USERS = {1:{'name':'張桂坤','age':18,'gender':'男','text':"當眼淚掉下來的時候,是真的累了, 其實人生就是這樣: 你有你的煩,我有我的難,人人都有無聲的淚,人人都有難言的苦。 忘不了的昨天,忙不完的今天,想不到的明天,走不完的人生,過不完的坎坷,越不過的無奈,聽不完的謊言,看不透的人心放不下的牽掛,經歷不完的酸甜苦辣,這就是人生,這就是生活。"},2:{'name':'主城','age':28,'gender':'男','text':"高中的時候有一個同學家里窮,每頓飯都是膜膜加點水,有時候吃點咸菜,我們六科老師每天下課都叫他去辦公室回答問題背誦課文,然后說太晚啦一起吃個飯,后來他考上了人大,拿到通知書的時候給每個老師磕了一個頭"},3:{'name':'服城','age':18,'gender':'女','text':"高中的時候有一個同學家里窮,每頓飯都是膜膜加點水,有時候吃點咸菜,我們六科老師每天下課都叫他去辦公室回答問題背誦課文,然后說太晚啦一起吃個飯,后來他考上了人大,拿到通知書的時候給每個老師磕了一個頭"}, } @app.route('/') def hello_world():return 'Hello World!'# @app.route('/index',methods=['GET']) # def index(): # user=session.get('user_info') # print(user) # if user: # return render_template('index.html',data=user,user_dict=USERS) # return redirect('/login') @app.route('/index',methods=['GET']) def index():user=session.get('user_info')if user:return render_template('index.html',data=user,user_dict=USERS)url=url_for('111')return redirect(url)@app.route('/detail/<int:nid>',methods=['GET']) def detail(nid):user=session.get('user_info')if not user:return redirect('/login')return render_template('detail.html',data=USERS[nid])@app.route('/login',methods=['GET','POST'],endpoint='111') def login():if request.method=='POST':user=request.form.get('user')pwd=request.form.get('pwd')if user=='fang' and pwd=='123':session['user_info']=userreturn redirect('/index')else:return render_template('login.html',msg='用戶名或密碼錯誤')return render_template('login.html')if __name__ == '__main__':app.run() View Code三 配置文件
開放封閉原則:對代碼的修改是封閉的,對配置文件的修改時開放的。
app.debug=True:修改過后自動重啟項目
app.secret_key='隨機設置字符串':全局設置session,Session, Cookies以及一些第三方擴展都會用到,
app.config['debug']=True:修改過后自動重啟項目
app.config:獲取當前app的所有配置
app.config.from_object:導入文件的一個類,
內部實現原理:導入時,將路徑由點分割
配置文件的格式有:
flask中的配置文件是一個flask.config.Config對象(繼承字典),默認配置為:{'DEBUG': get_debug_flag(default=False), 是否開啟Debug模式'TESTING': False, 是否開啟測試模式'PROPAGATE_EXCEPTIONS': None, 'PRESERVE_CONTEXT_ON_EXCEPTION': None,'SECRET_KEY': None,'PERMANENT_SESSION_LIFETIME': timedelta(days=31),'USE_X_SENDFILE': False,'LOGGER_NAME': None,'LOGGER_HANDLER_POLICY': 'always','SERVER_NAME': None,'APPLICATION_ROOT': None,'SESSION_COOKIE_NAME': 'session','SESSION_COOKIE_DOMAIN': None,'SESSION_COOKIE_PATH': None,'SESSION_COOKIE_HTTPONLY': True,'SESSION_COOKIE_SECURE': False,'SESSION_REFRESH_EACH_REQUEST': True,'MAX_CONTENT_LENGTH': None,'SEND_FILE_MAX_AGE_DEFAULT': timedelta(hours=12),'TRAP_BAD_REQUEST_ERRORS': False,'TRAP_HTTP_EXCEPTIONS': False,'EXPLAIN_TEMPLATE_LOADING': False,'PREFERRED_URL_SCHEME': 'http','JSON_AS_ASCII': True,'JSON_SORT_KEYS': True,'JSONIFY_PRETTYPRINT_REGULAR': True,'JSONIFY_MIMETYPE': 'application/json','TEMPLATES_AUTO_RELOAD': None,} View Code格式一:
app.config['DEBUG'] = TruePS: 由于Config對象本質上是字典,所以還可以使用app.config.update(...) View Code格式二:
app.config.from_pyfile("python文件名稱")如:settings.pyDEBUG = Trueapp.config.from_pyfile("settings.py")app.config.from_envvar("環境變量名稱")環境變量的值為python文件名稱名稱,內部調用from_pyfile方法app.config.from_json("json文件名稱")JSON文件名稱,必須是json格式,因為內部會執行json.loadsapp.config.from_mapping({'DEBUG':True}) View Code格式三:
app.config.from_object("python類或類的路徑")app.config.from_object('pro_flask.settings.TestingConfig')settings.pyclass Config(object):DEBUG = FalseTESTING = FalseDATABASE_URI = 'sqlite://:memory:'class ProductionConfig(Config):DATABASE_URI = 'mysql://user@localhost/foo'class DevelopmentConfig(Config):DEBUG = Trueclass TestingConfig(Config):TESTING = TruePS: 從sys.path中已經存在路徑開始寫 View Codesettings.py默認路徑要放在當前項目的第一級目錄下面
? PS: settings.py文件默認路徑要放在程序root_path目錄,如果instance_relative_config為True,則就是instance_path目錄
四 路由系統
- @app.route('/user/<username>')
- @app.route('/post/<int:post_id>')
- @app.route('/post/<float:post_id>')
- @app.route('/post/<path:path>')
- @app.route('/login', methods=['GET', 'POST'])
路由比較特殊,:是基于裝飾器實現的,但是究其本質還是有add_url_rule實現的。
裝飾器可以有多個,放在上面和下面是不同的,
from flask import Flask,render_template,request,redirect,session,url_forapp = Flask(__name__) app.debug=True app.secret_key='fangshao' USERS = {1:{'name':'張桂坤','age':18,'gender':'男','text':"當眼淚掉下來的時候,是真的累了, 其實人生就是這樣: 你有你的煩,我有我的難,人人都有無聲的淚,人人都有難言的苦。 忘不了的昨天,忙不完的今天,想不到的明天,走不完的人生,過不完的坎坷,越不過的無奈,聽不完的謊言,看不透的人心放不下的牽掛,經歷不完的酸甜苦辣,這就是人生,這就是生活。"},2:{'name':'主城','age':28,'gender':'男','text':"高中的時候有一個同學家里窮,每頓飯都是膜膜加點水,有時候吃點咸菜,我們六科老師每天下課都叫他去辦公室回答問題背誦課文,然后說太晚啦一起吃個飯,后來他考上了人大,拿到通知書的時候給每個老師磕了一個頭"},3:{'name':'服城','age':18,'gender':'女','text':"高中的時候有一個同學家里窮,每頓飯都是膜膜加點水,有時候吃點咸菜,我們六科老師每天下課都叫他去辦公室回答問題背誦課文,然后說太晚啦一起吃個飯,后來他考上了人大,拿到通知書的時候給每個老師磕了一個頭"}, }def get_session(func):def inner(*args,**kwargs):user = session.get('user_info')if not user:return redirect('/login')return func(*args,**kwargs)return inner@app.route('/',endpoint='1111') @get_session def hello_world():return 'Hello World!'@app.route('/index',methods=['GET'],endpoint='n1') @get_session def index():# user=session.get('user_info')# print(user)# if user:return render_template('index.html',user_dict=USERS)# return redirect('/login')# @app.route('/index',methods=['GET']) # def index(): # user=session.get('user_info') # if user: # return render_template('index.html',user_dict=USERS) # url=url_for('111') # return redirect(url) @app.route('/detail/<int:nid>',methods=['GET'],endpoint='n2') def detail(nid):user=session.get('user_info')if not user:return redirect('/login')return render_template('detail.html',data=USERS[nid])@app.route('/login',methods=['GET','POST'],endpoint='111') def login():if request.method=='POST':user=request.form.get('user')pwd=request.form.get('pwd')if user=='fang' and pwd=='123':session['user_info']=userreturn redirect('/index')else:return render_template('login.html',msg='用戶名或密碼錯誤')return render_template('login.html')if __name__ == '__main__':app.run() View Code注意:這里加上裝飾器,會重名,給他設置一個別名endpoint=‘別名’
functools.wraps(函數):# 幫助我們保存一些設置函數的元信息
from flask import Flask,render_template,request,redirect,session,url_forapp = Flask(__name__) app.debug=True app.secret_key='fangshao' USERS = {1:{'name':'張桂坤','age':18,'gender':'男','text':"當眼淚掉下來的時候,是真的累了, 其實人生就是這樣: 你有你的煩,我有我的難,人人都有無聲的淚,人人都有難言的苦。 忘不了的昨天,忙不完的今天,想不到的明天,走不完的人生,過不完的坎坷,越不過的無奈,聽不完的謊言,看不透的人心放不下的牽掛,經歷不完的酸甜苦辣,這就是人生,這就是生活。"},2:{'name':'主城','age':28,'gender':'男','text':"高中的時候有一個同學家里窮,每頓飯都是膜膜加點水,有時候吃點咸菜,我們六科老師每天下課都叫他去辦公室回答問題背誦課文,然后說太晚啦一起吃個飯,后來他考上了人大,拿到通知書的時候給每個老師磕了一個頭"},3:{'name':'服城','age':18,'gender':'女','text':"高中的時候有一個同學家里窮,每頓飯都是膜膜加點水,有時候吃點咸菜,我們六科老師每天下課都叫他去辦公室回答問題背誦課文,然后說太晚啦一起吃個飯,后來他考上了人大,拿到通知書的時候給每個老師磕了一個頭"}, }import functools def get_session(func):@functools.wraps(func)def inner(*args,**kwargs):user = session.get('user_info')if not user:return redirect('/login')return func(*args,**kwargs)return inner@app.route('/',endpoint='1111') @get_session def hello_world():return 'Hello World!'@app.route('/index',methods=['GET']) @get_session def index():# user=session.get('user_info')# print(user)# if user:return render_template('index.html',user_dict=USERS)# return redirect('/login')# @app.route('/index',methods=['GET']) # def index(): # user=session.get('user_info') # if user: # return render_template('index.html',user_dict=USERS) # url=url_for('111') # return redirect(url) @app.route('/detail/<int:nid>',methods=['GET']) def detail(nid):user=session.get('user_info')if not user:return redirect('/login')return render_template('detail.html',data=USERS[nid])@app.route('/login',methods=['GET','POST']) def login():if request.method=='POST':user=request.form.get('user')pwd=request.form.get('pwd')if user=='fang' and pwd=='123':session['user_info']=userreturn redirect('/index')else:return render_template('login.html',msg='用戶名或密碼錯誤')return render_template('login.html')if __name__ == '__main__':app.run() View CodeFBV的寫法:
使用裝飾器,將括號里面的內容添加到路由中@app.route()
DEFAULT_CONVERTERS = {'default': UnicodeConverter,'string': UnicodeConverter,'any': AnyConverter,'path': PathConverter,'int': IntegerConverter,'float': FloatConverter,'uuid': UUIDConverter, } View Codemethods=[]:里面寫的是請求的方法,支持什么方法都要寫上什么方法。
??? endpoint=別名:為當前的url反向生成一個url,也就是起一個別名
??? app.add_url_rule('/...'):添加路由的另一種方法
def auth(func):def inner(*args, **kwargs):print('before')result = func(*args, **kwargs)print('after')return resultreturn inner@app.route('/index.html',methods=['GET','POST'],endpoint='index')@authdef index():return 'Index'或def index():return "Index"self.add_url_rule(rule='/index.html', endpoint="index", view_func=index, methods=["GET","POST"])orapp.add_url_rule(rule='/index.html', endpoint="index", view_func=index, methods=["GET","POST"])app.view_functions['index'] = index View CodeCBV的寫法:
MethodView:API的簡單的一種實現方式,class創建的視圖類就需要繼承它。
class IndexView(views.MethodView):methods=['GET']decorators=[auth,]def get(self):return 'Index.GET'def post(self):return 'Index.POST' View Codeapp.add_url_rule('/路徑',view_func='類名'.as_view(name=返回的那個函數)
app.add_url_rule('/index',view_func=IndexView.as_view(name='index'))對于CBV來說:雖然傳進去的是類名,但是最后返回的還是一個函數。
def auth(func):def inner(*args, **kwargs):print('before')result = func(*args, **kwargs)print('after')return resultreturn innerclass IndexView(views.View):methods = ['GET']decorators = [auth, ]def dispatch_request(self):print('Index')return 'Index!'app.add_url_rule('/index', view_func=IndexView.as_view(name='index')) # name=endpoint 或class IndexView(views.MethodView):methods = ['GET']decorators = [auth, ]def get(self):return 'Index.GET'def post(self):return 'Index.POST'app.add_url_rule('/index', view_func=IndexView.as_view(name='index')) # name=endpoint @app.route和app.add_url_rule參數:rule, URL規則view_func, 視圖函數名稱defaults=None, 默認值,當URL中無參數,函數需要參數時,使用defaults={'k':'v'}為函數提供參數endpoint=None, 名稱,用于反向生成URL,即: url_for('名稱')methods=None, 允許的請求方式,如:["GET","POST"]strict_slashes=None, 對URL最后的 / 符號是否嚴格要求,如:@app.route('/index',strict_slashes=False),訪問 http://www.xx.com/index/ 或 http://www.xx.com/index均可@app.route('/index',strict_slashes=True)僅訪問 http://www.xx.com/index redirect_to=None, 重定向到指定地址如:@app.route('/index/<int:nid>', redirect_to='/home/<nid>')或def func(adapter, nid):return "/home/888"@app.route('/index/<int:nid>', redirect_to=func)subdomain=None, 子域名訪問from flask import Flask, views, url_forapp = Flask(import_name=__name__)app.config['SERVER_NAME'] = 'wupeiqi.com:5000'@app.route("/", subdomain="admin")def static_index():"""Flask supports static subdomainsThis is available at static.your-domain.tld"""return "static.your-domain.tld"@app.route("/dynamic", subdomain="<username>")def username_index(username):"""Dynamic subdomains are also supportedTry going to user1.your-domain.tld/dynamic"""return username + ".your-domain.tld"if __name__ == '__main__':app.run() View Codedefault:傳入函數的參數。就是url后面的那個參數
subdomain={}:創建子域名
window設置域名:hosts而文件下面直接就可以設置了C:\Windows\System32\driver\etc\hosts
mcs系統設置域名:/ect/hosts文件下面就可以設置域名了
from flask import Flask, views, url_forfrom werkzeug.routing import BaseConverterapp = Flask(import_name=__name__)class RegexConverter(BaseConverter):"""自定義URL匹配正則表達式"""def __init__(self, map, regex):super(RegexConverter, self).__init__(map)self.regex = regexdef to_python(self, value):"""路由匹配時,匹配成功后傳遞給視圖函數中參數的值:param value: :return: """return int(value)def to_url(self, value):"""使用url_for反向生成URL時,傳遞的參數經過該方法處理,返回的值用于生成URL中的參數:param value: :return: """val = super(RegexConverter, self).to_url(value)return val# 添加到flask中app.url_map.converters['regex'] = RegexConverter@app.route('/index/<regex("\d+"):nid>')def index(nid):print(url_for('index', nid='888'))return 'Index'if __name__ == '__main__':app.run() View Code五 模板
1、模板的使用:Flask使用的是Jinja2模板,所以其語法和Django無差別
2、自定義模板方法:Flask中自定義模板方法的方式和Bottle相似,創建一個函數并通過參數的形式傳入
for循環,并取到索引
{% for k,v in 對象.items()%}
字典的取值方法:v.字段名? v['字段名']? v.get('字段名')
{% endfor %}
函數渲染:不僅要加上括號,還可以加上參數
{{函數名(參數)}}? 加上|safe:防止xss攻擊
<!DOCTYPE html> <html> <head lang="en"><meta charset="UTF-8"><title></title> </head> <body><h1>自定義函數</h1>{{ww()|safe}}</body> </html> View Code在后臺如果使用
from flask import Flask,render_template app = Flask(__name__)def wupeiqi():return '<h1>Wupeiqi</h1>'@app.route('/login', methods=['GET', 'POST']) def login():return render_template('login.html', ww=wupeiqi)app.run() View CodeMarkup:后臺設置xss攻擊
宏定義:就是定義一塊html,定義的這一塊就是就是一個函數
{% macre 函數名(參數)%}? {% endmacre %}
<!DOCTYPE html> <html lang="en"> <head><meta charset="UTF-8"><title>Title</title> </head> <body>{% macro input(name, type='text', value='') %}<input type="{{ type }}" name="{{ name }}" value="{{ value }}">{% endmacro %}{{ input('n1') }}{% include 'tp.html' %}<h1>asdf{{ v.k1}}</h1> </body> </html> View Code六 請求和響應
請求和響應都是從flask為念中導入的
request:請求
response:響應
jsonify:響應的數據類型不是字符串類型,就是用這個將響應的內容轉成字符串。
from flask import Flaskfrom flask import requestfrom flask import render_templatefrom flask import redirectfrom flask import make_responseapp = Flask(__name__)@app.route('/login.html', methods=['GET', "POST"])def login():# 請求相關信息# request.method# request.args# request.form# request.values# request.cookies# request.headers# request.path# request.full_path# request.script_root# request.url# request.base_url# request.url_root# request.host_url# request.host# request.files# obj = request.files['the_file_name']# obj.save('/var/www/uploads/' + secure_filename(f.filename))# 響應相關信息# return "字符串"# return render_template('html模板路徑',**{})# return redirect('/index.html')# response = make_response(render_template('index.html'))# response是flask.wrappers.Response類型# response.delete_cookie('key')# response.set_cookie('key', 'value')# response.headers['X-Something'] = 'A value'# return responsereturn "內容"if __name__ == '__main__':app.run() View Code七 Session和Cookie
session在使用前必須要有app.sceret_key加密,就相當于加鹽
session['名']=字段:設置session
應用demo:實例,使用裝飾器寫一個用戶認證
思路:裝飾器,一個函數可以加上多個裝飾器,反向查找的名稱不允許重名:endpoint
session.pop:刪除一個session
我們這里使用的session是flask內置的使用加密的cookie來保存數據的。
基本使用:
from flask import Flask, session, redirect, url_for, escape, requestapp = Flask(__name__)@app.route('/') def index():if 'username' in session:return 'Logged in as %s' % escape(session['username'])return 'You are not logged in'@app.route('/login', methods=['GET', 'POST']) def login():if request.method == 'POST':session['username'] = request.form['username']return redirect(url_for('index'))return '''<form action="" method="post"><p><input type=text name=username><p><input type=submit value=Login></form>'''@app.route('/logout') def logout():# remove the username from the session if it's theresession.pop('username', None)return redirect(url_for('index'))# set the secret key. keep this really secret: app.secret_key = 'A0Zr98j/3yX R~XHH!jmN]LWX/,?RT'基本使用 View Code自定義session
pip3 install Flask-Sessionrun.pyfrom flask import Flaskfrom flask import sessionfrom pro_flask.utils.session import MySessionInterfaceapp = Flask(__name__)app.secret_key = 'A0Zr98j/3yX R~XHH!jmN]LWX/,?RT'app.session_interface = MySessionInterface()@app.route('/login.html', methods=['GET', "POST"])def login():print(session)session['user1'] = 'alex'session['user2'] = 'alex'del session['user2']return "內容"if __name__ == '__main__':app.run()session.py#!/usr/bin/env python# -*- coding:utf-8 -*-import uuidimport jsonfrom flask.sessions import SessionInterfacefrom flask.sessions import SessionMixinfrom itsdangerous import Signer, BadSignature, want_bytesclass MySession(dict, SessionMixin):def __init__(self, initial=None, sid=None):self.sid = sidself.initial = initialsuper(MySession, self).__init__(initial or ())def __setitem__(self, key, value):super(MySession, self).__setitem__(key, value)def __getitem__(self, item):return super(MySession, self).__getitem__(item)def __delitem__(self, key):super(MySession, self).__delitem__(key)class MySessionInterface(SessionInterface):session_class = MySessioncontainer = {}def __init__(self):import redisself.redis = redis.Redis()def _generate_sid(self):return str(uuid.uuid4())def _get_signer(self, app):if not app.secret_key:return Nonereturn Signer(app.secret_key, salt='flask-session',key_derivation='hmac')def open_session(self, app, request):"""程序剛啟動時執行,需要返回一個session對象"""sid = request.cookies.get(app.session_cookie_name)if not sid:sid = self._generate_sid()return self.session_class(sid=sid)signer = self._get_signer(app)try:sid_as_bytes = signer.unsign(sid)sid = sid_as_bytes.decode()except BadSignature:sid = self._generate_sid()return self.session_class(sid=sid)# session保存在redis中# val = self.redis.get(sid)# session保存在內存中val = self.container.get(sid)if val is not None:try:data = json.loads(val)return self.session_class(data, sid=sid)except:return self.session_class(sid=sid)return self.session_class(sid=sid)def save_session(self, app, session, response):"""程序結束前執行,可以保存session中所有的值如:保存到resit寫入到用戶cookie"""domain = self.get_cookie_domain(app)path = self.get_cookie_path(app)httponly = self.get_cookie_httponly(app)secure = self.get_cookie_secure(app)expires = self.get_expiration_time(app, session)val = json.dumps(dict(session))# session保存在redis中# self.redis.setex(name=session.sid, value=val, time=app.permanent_session_lifetime)# session保存在內存中 self.container.setdefault(session.sid, val)session_id = self._get_signer(app).sign(want_bytes(session.sid))response.set_cookie(app.session_cookie_name, session_id,expires=expires, httponly=httponly,domain=domain, path=path, secure=secure)自定義Session View Code第三方session
from flask import Flask, session, redirect from flask.ext.session import Sessionapp = Flask(__name__) app.debug = True app.secret_key = 'asdfasdfasd'app.config['SESSION_TYPE'] = 'redis' from redis import Redis app.config['SESSION_REDIS'] = Redis(host='192.168.0.94',port='6379') Session(app)@app.route('/login') def login():session['username'] = 'alex'return redirect('/index')@app.route('/index') def index():name = session['username']return nameif __name__ == '__main__':app.run()第三方session View Code八 閃現
flash:向某一個地方設置一個值
category:設置值的分類
get_flashed_messages:從某一個地方獲取多個值,并且清除
他們也是基于app.secret_key實現的
from flask import Flask,flash,get_flashed_messagesapp = Flask(__name__) app.secret_key = 'asdfasdf' @app.route('/get') def get():# 從某個地方獲取設置過的所有值,并清除。data = get_flashed_messages()print(data)return 'Hello World!'@app.route('/set') def set():# 向某個地方設置一個值flash('阿斯蒂芬')return 'Hello World!'if __name__ == '__main__':app.run() View Codecategory_filter:只取一類的值
from flask import Flask,flash,get_flashed_messages,request,redirectapp = Flask(__name__) app.secret_key = 'asdfasdf'@app.route('/index') def index():# 從某個地方獲取設置過的所有值,并清除。val = request.args.get('v')if val == 'oldboy':return 'Hello World!'flash('超時錯誤',category="x1")return "ssdsdsdfsd"# return redirect('/error') @app.route('/error') def error():"""展示錯誤信息:return:"""data = get_flashed_messages(category_filter=['x1'])if data:msg = data[0]else:msg = "..."return "錯誤信息:%s" %(msg,)if __name__ == '__main__':app.run() View Code??? request.query_string.get('字段'):獲取到所有的url
request.args.get('字段'):獲取get請求后面的值
什么是閃現:設置不管多少次的值,是基于session實現的,只要一次取到全部的值,并且清除
閃現的用法:應用于臨時數據的操作,比如:顯示錯誤信息等等
from flask import Flask,session,Session,flash,get_flashed_messages,redirect,render_template,request app = Flask(__name__) app.secret_key ='sdfsdfsdf'@app.route('/users') def users():# 方式一# msg = request.args.get('msg','')# 方式二# msg = session.get('msg')# if msg:# del session['msg']# 方式三v = get_flashed_messages()print(v)msg = ''return render_template('users.html',msg=msg)@app.route('/useradd') def user_add():# 在數據庫中添加一條數據# 假設添加成功,在跳轉到列表頁面時,顯示添加成功# 方式一# return redirect('/users?msg=添加成功')# 方式二# session['msg'] = '添加成功'# 方式三flash('添加成功')return redirect('/users')if __name__ == '__main__':app.run(debug=True) View Code九 藍圖(blueprint)
什么是藍圖:功能分類,還可以定義自己的模板和路由分發。就是為了構造目錄的。
藍圖的特性:不僅能將不同功能的app進行分來,并且還可以定義自己的模板路徑可以實現路由的分發。可以實現每一個程序構造自己的app。
Blueprint:創建藍圖? ('藍圖名','__name__')
url_prefix:為下面所有函數的使用機上一個前綴,可以為統一的某一類加上前綴
template_folder:定義自己的模板路徑(html文件勒頸)
app.register_blueprint:將藍圖注冊到app
小型應用程序:示例
大型應用程序:示例
小中型:
manage.py
import fcrm if __name__ == '__main__': fcrm.app.run()__init__.py(只要一導入fcrm就會執行__init__.py文件)
from flask import Flask #導入accout 和order from fcrm.views import accout from fcrm.views import order app = Flask(__name__) print(app.root_path) #根目錄 app.register_blueprint(accout.accout) #吧藍圖注冊到app里面,accout.accout是創建的藍圖對象 app.register_blueprint(order.order)accout.py
from flask import Blueprint,render_template accout = Blueprint("accout",__name__) @accout.route('/accout') def xx(): return "accout" @accout.route("/login") def login(): return render_template("login.html")order.py
from flask import Blueprint order = Blueprint("order",__name__) @order.route('/order') def register(): #注意視圖函數的名字不能和藍圖對象的名字一樣 return "order使用藍圖時需要注意的
大型:
?
十 請求的擴展
類似于django的中間件
@app.defore_request:定制請求函數,每次請求都會執行有這個裝飾器的函數,每次都是從上到下執行的
request.url:拿到正要運行的url
@app.after_request:定制響應的函數,每次響應執行這個裝飾器的函數,每次都是從下到上執行的
請求哈響應可以有多個,如果請求給攔截了,但是所有的響應都會執行
@app.errorhandler:定制錯誤的信息。
@ app.template_global:為模板定制函數,也就是自定義模板
@app.before_first_request:只有第一次請求來了才執行這個函數
from flask import Flask,render_template,request,redirect,session,url_for app = Flask(__name__) app.debug = True app.secret_key = 'siuljskdjfs'@app.before_request def process_request1(*args,**kwargs):print('process_request1 進來了')@app.before_request def process_request2(*args,**kwargs):print('process_request2 進來了')@app.after_request def process_response1(response):print('process_response1 走了')return response@app.after_request def process_response2(response):print('process_response2 走了')return response@app.errorhandler(404) def error_404(arg):return "404錯誤了"@app.before_first_request def first(*args,**kwargs):pass@app.route('/index',methods=['GET']) def index():print('index函數')return "Index"if __name__ == '__main__':app.run() View Code基于中間件實現用戶的認證登陸
@app.before_requestdef process_request(*args,**kwargs):if request.path == '/login':return Noneuser = session.get('user_info')if user:return Nonereturn redirect('/login') View Code模板中定制方法:
@app.template_global()def sb(a1, a2):return a1 + a2{{sb(1,2)}} @app.template_filter()def db(a1, a2, a3):return a1 + a2 + a3{{ 1|db(2,3)}} View Code十一 中間件
中間件:在這里就是一個請求的入口
每次請求進來都會執行app的call方法。
from flask import Flaskapp = Flask(__name__)@app.route('/') def index():return 'Hello World!'class Md(object):def __init__(self,old_wsgi_app):self.old_wsgi_app = old_wsgi_appdef __call__(self, environ, start_response):print('開始之前')ret = self.old_wsgi_app(environ, start_response)print('結束之后')return retif __name__ == '__main__':app.wsgi_app = Md(app.wsgi_app)app.run() View Code十二 上下文處理
threading.local:為每一個線程開辟一個單獨的內存空間來保存他自己的值
import threading# class Foo(): # def __init__(self): # self.name=0 # # local_value=Foo() local_value=threading.local() def func(num):local_value.name=numimport timetime.sleep(1)print(local_value.name,threading.current_thread().name)for i in range(20):th=threading.Thread(target=func,args=(i,),name='線程%s'%i)th.start() View Coderequest:
情況一:單線程和單進程的情況下不會有問題,應為自己使用為完畢過后,就會自動的清空request。
情況二:單進程和多線程,threading.local對象
情況三:但進程和單線程多個協程,theading.local對象就會出問題。
解決方法:
以后不支持協程:就可以使用內置的threading.local對象
支持協程:自定義類似threading.local對象的功能支持協程
_thread.get_ident:獲取線程的一個唯一標識
greenlet.getcurrent:獲取協程額唯一標識
自定義支持協程:
""" {1368:{} } """import threading try:from greenlet import getcurrent as get_ident # 協程 except ImportError:try:from thread import get_identexcept ImportError:from _thread import get_ident # 線程class Local():def __init__(self):self.storage={}self.get_ident=get_identdef set(self,k,v):ident=self.get_ident()origin=self.storage.get(ident)if not origin:origin={k:v}else:origin[k]=vself.storage[ident]=origindef get(self,k):ident=self.get_ident()origin=self.storage.get(ident)if not origin:return Nonereturn origin.get(k,None)local_values=Local()def task(num):local_values.set('name',num)import timetime.sleep(1)print(local_values.get('name'),threading.current_thread().name)for i in range(20):th=threading.Thread(target=task,args=(i,),name='線程%s'%i)th.start() View Code反射實例:
class Foo(object):def __init__(self):object.__setattr__(self,'storage',{})def __setattr__(self, key, value):self.storage={'k1:v1'}print(key,value)def __getattr__(self, item):print(item)return 'df'obj=Foo() # obj.x = 123print(obj) View Code自定義支持協程的flask:使用反射實現
""" {1368:{} } """ import flask.globals import threading try:from greenlet import getcurrent as get_ident # 協程 except ImportError:try:from thread import get_identexcept ImportError:from _thread import get_ident # 線程class Local():def __init__(self):object.__setattr__(self,'__storage__',{})object.__setattr__(self,'__ident_func__',get_ident)def __setattr__(self, name, value):ident=self.__ident_func__()storage=self.__storage__try:storage[ident][name]=valueexcept KeyError:storage[ident]={name:value}def __getattr__(self, name):try:return self.__storage__[self.__ident_func__()][name]except KeyError:raise AttributeError(name)def __delattr__(self, name):try:del self.__storage__[self.__ident_finc__()][name]except KeyError:raise AttributeError(name)local_values=Local()def task(num):local_values.name=numimport timetime.sleep(1)print(local_values.name,threading.current_thread().name) for i in range(20):th=threading.Thread(target=task,args=(i,),name='線程%s'%i)th.start() View Code? 補充:
偏函數:functools模塊
functools.partial:創建一個新的函數,主要是為了給原函數傳入參數
# import functools # # def func(a1,a2): # print(a1+a2) # # # new_func=functools.partial(func,12) # # new_func(21) View Code面向對象的__add__方法補充:當把面向對象中的所有__函數__實現時,對象做任何操作時,都會執行其中對應的方法。
__add__:誰在前面就會調用誰的__add__方法
# class Foo(): # def __init__(self,num): # self.num=num # def __add__(self,other): # return Foo(self.num+other.num) # obj1=Foo(11) # obj2=Foo(22) # v=obj1+obj2 # print(v) View Code拼接列表中的值:
itertools.chain:傳入函數,產生一個新的函數對象的列表,主要幫助我們做列表元素的拼接
實例1:
from itertools import chain# def a1(x): # return x+1 # # func_list=[a1,lambda x:x+1] # # def a2(y): # return y+10 # # func_list_1=chain([a2],func_list) # for func in func_list_1: # print(func) View Code實例2:
# v1=[1,2,3] # v2=[4,5,6] # # for l in chain(v1,v2): # chain 主要是做列表的拼接 # print(l) View Code上下文源碼流程:https://www.processon.com/diagraming/5ab8c9f0e4b0d165d5b83fbb
談談flask的上下文管理:
- 與django相比是兩種不同的實現方式。- django/tornado是通過傳參數形式- flask是通過上下文管理兩種都可以實現,只不過試下方式不一樣。- 上下文管理:- threading.local/Local類,其中創建了一個字典{greelet做唯一標識:存數據} 保證數據隔離- 請求進來:- 請求相關所有數據封裝到了RequestContext中。- 再講RequestContext對象添加到Local中(通過LocalStack將對象添加到Local對象中)- 使用,調用request- 調用此類方法 request.method、print(request)、request+xxx 會執行LocalProxy中對應的方法- 函數- 通過LocalStack去Local中獲取值。- 請求終止- 通過LocalStack的pop方法 Local中將值異常。 View Code補充:
再將對象封裝到Local中Flask可以傳入任何的字符串參數 View Code請求上下文:請求上下文封裝的就是RequestContext對象
request:是LocalProxy對象
這個對象實例化事傳入了一個函數,還有一個request參數
以后執行偏函數partail(_Lookup_req_object,'request')時,自動傳遞request參數
目標:去Local中獲取ctx,然后再在ctx中獲取request
ctx.push:里面做的事,將ctx通過LocalStack添加到Local中
session:經過push之后,session里面已經有值了,seif.session幫助我獲取session信息
應用上下文:應用上下文封裝的是AppContext
AppContext里面封裝了app對象
app.context:創建了app對象
app.app_ctx_global_class:相當于一個全局變量
app和g
g:每個請求周期都會創建一個用于在請求周期中傳遞值的一個容器。只有一次生命周期可用,因為一次請求周期后第二次就會重新創建
print(g):執行g的實例對象
請求到來 ,有人來訪問:
# 將請求相關的數據environ封裝到了RequestContext對象中# 再講對象封裝到local中(每個線程/每個協程獨立空間存儲)# ctx.app # 當前APP的名稱# ctx.request # Request對象(封裝請求相關東西)# ctx.session # 空_request_ctx_stack.local = {唯一標識:{"stack":[ctx, ]},唯一標識:{"stack":[ctx, ]},}# app_ctx = AppContext對象# app_ctx.app# app_ctx.g _app_ctx_stack.local = {唯一標識:{"stack":[app_ctx, ]},唯一標識:{"stack":[app_ctx, ]},} View Code 使用:print打印request,session.g,current_app的時候,都會執行他們相對應的對象的__str__
他們之間不同的是偏函數不一樣
終止,全部pop
問題:
如果他是多線程的時候是如何實現的。
請求到來的時候就是兩個Local,但是沒有值,一共只有兩個Local,進來一個線程創建自己的唯一標識。不過進來多少的線程,都只用這兩個Local View Codeflask的local中保存數據時,使用列表創建出來的棧。為什么用棧?
- 如果寫web程序,web運行環境;棧中永遠保存1條數據(可以不用棧)。- 寫腳本獲取app信息時,可能存在app上下文嵌套關系。from flask import Flask,current_app,globals,_app_ctx_stackapp1 = Flask('app01')app1.debug = False # 用戶/密碼/郵箱# app_ctx = AppContext(self):# app_ctx.app# app_ctx.g app2 = Flask('app02')app2.debug = True # 用戶/密碼/郵箱# app_ctx = AppContext(self):# app_ctx.app# app_ctx.g with app1.app_context():# __enter__方法 -> push -> app_ctx添加到_app_ctx_stack.local# {<greenlet.greenlet object at 0x00000000036E2340>: {'stack': [<flask.ctx.AppContext object at 0x00000000037CA438>]}}print(_app_ctx_stack._local.__storage__)print(current_app.config['DEBUG'])with app2.app_context():# {<greenlet.greenlet object at 0x00000000036E2340>: {'stack': [<flask.ctx.AppContext object at 0x00000000037CA438> ]}}print(_app_ctx_stack._local.__storage__)print(current_app.config['DEBUG'])print(current_app.config['DEBUG']) View Code 多app應用:不僅可以使用藍圖進行分發,還可以使用app進行分發
DispatchMiddleware:可以進行路由分發。
在這里面沒有app.run,直接可以run_simple啟動
問題:web訪問多app,上線問管理是如何實現的
請求進來,為棧添加的還是一個值
問題:為什么使用棧,離線腳本
如果寫web程序,或者web運行環境:棧中永遠保存一條數據(這個可以不使用棧)
寫腳本獲取app信息的時候,可能會存在上下文嵌套關系。這時有可能要用到棧
問題:問題:Web訪問多app應用時,上下文管理是如何實現?
補充:
遇到with就會執行__enter__方法,這個方法返回值,as后面那個值就是返回值當指執行完畢之后 ,自動調用類的__exit__方法 from flask import Flask,current_app,globals,_app_ctx_stackapp1 = Flask('app01') app1.debug = False # 用戶/密碼/郵箱 # app_ctx = AppContext(self): # app_ctx.app # app_ctx.g app2 = Flask('app02') app2.debug = True # 用戶/密碼/郵箱 # app_ctx = AppContext(self): # app_ctx.app # app_ctx.g with app1.app_context(): # # __enter__方法 -> push -> app_ctx添加到_app_ctx_stack.localprint(_app_ctx_stack._local.__storage__)print(current_app.config['DEBUG'])with app1.app_context():print(_app_ctx_stack._local.__storage__)print(current_app.config['DEBUG'])print(current_app.config['DEBUG'])with app1.app_context():print(_app_ctx_stack._local.__storage__)print(current_app.config['DEBUG']) with app1.app_context():print(_app_ctx_stack._local.__storage__)print(current_app.config['DEBUG']) View Code
補充:永遠兩個Local對象
實現細節:
ResquestContext對象通過LocalStark添加到Local中
導入的request是一個LocalProxy對象,然后在通過偏函數調用了LocakStack,在調用Local
RequestContext的auto_pop,在執行LocalStack,再到Local中移除
十三 數據庫的連接池
前夕:
django的常用數據庫:
ORM:django默認的數據庫
pymysql模塊:導入mysql數據庫,python2和python3版本都有這個模塊
MySQLdb:一樣,也是導入mysql數據庫,這個模塊只有python2版本才有
flask/其他:
pymysql:導mysql數據庫
MySQLdb:導入mysql數據庫
SQLAchemy:
是ORM的一款框架,可以導入mysql數據庫(pymysql / MySQLdb)
原生SQL:
from flask import Flaskapp = Flask(__name__)@app.route("/") def hello():import pymysqlCONN = pymysql.connect(host='127.0.0.1',port=3306,user='root',password='123',database='pooldb',charset='utf8')cursor = CONN.cursor()cursor.execute('select * from tb1')result = cursor.fetchall()cursor.close()print(result)return "Hello World"if __name__ == '__main__':app.run() View Code from flask import Flaskapp = Flask(__name__) import pymysql CONN = pymysql.connect(host='127.0.0.1',port=3306,user='root',password='123',database='pooldb',charset='utf8')@app.route("/") def hello():cursor = CONN.cursor()cursor.execute('select * from tb1')result = cursor.fetchall()cursor.close()print(result)return "Hello World"if __name__ == '__main__':app.run()######加鎖######################### from flask import Flask import threading app = Flask(__name__) import pymysql CONN = pymysql.connect(host='127.0.0.1',port=3306,user='root',password='123',database='pooldb',charset='utf8')@app.route("/") def hello():with threading.Lock():cursor = CONN.cursor()cursor.execute('select * from tb1')result = cursor.fetchall()cursor.close()print(result)return "Hello World"if __name__ == '__main__':app.run() View Code 問題:
解決:
不能為每一個用戶創建一個鏈接
創建一定數量的連接池,只要連接池有空的,才會進來,不然就會等著。
使用UBDtils模塊:
下載網站:https://pypi.python.org/pypi/DBUtils
pip install UBDtils
如果安裝到虛擬環境下面,需要先切換到虛擬環境下面
使用:
模式一:
為每一個線程創建一個鏈接,即使縣城調用close方法,也不會關閉掉
占用連接池不放,浪費了連接池
import pymysql from DBUtils.PersistentDB import PersistentDBPOOL = PersistentDB(creator=pymysql, # 使用鏈接數據庫的模塊maxusage=None, # 一個鏈接最多被重復使用的次數,None表示無限制setsession=[], # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."]ping=0, # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = alwayscloseable=False,# 如果為False時, conn.close() 實際上被忽略,供下次使用,再線程關閉時,才會自動關閉鏈接。如果為True時, conn.close()則關閉鏈接,那么再次調用pool.connection時就會報錯,因為已經真的關閉了連接(pool.steady_connection()可以獲取一個新的鏈接)threadlocal=None, # 本線程獨享值得對象,用于保存鏈接對象,如果鏈接對象被重置host='127.0.0.1',port=3306,user='root',password='123',database='pooldb',charset='utf8' )def func():conn = POOL.connection(shareable=False)cursor = conn.cursor()cursor.execute('select * from tb1')result = cursor.fetchall()cursor.close()conn.close()func() View Code 模式二:
設置最大限制創建連接池的數量,進來一個線程,就創建一個連接池
如果超過連接池的限制數量,就會在那里等著有空的連接池釋放出來才能夠下一個線程鏈接進去
import time import pymysql import threading from DBUtils.PooledDB import PooledDB, SharedDBConnection POOL = PooledDB(creator=pymysql, # 使用鏈接數據庫的模塊maxconnections=6, # 連接池允許的最大連接數,0和None表示不限制連接數mincached=2, # 初始化時,鏈接池中至少創建的空閑的鏈接,0表示不創建maxcached=5, # 鏈接池中最多閑置的鏈接,0和None不限制maxshared=3, # 鏈接池中最多共享的鏈接數量,0和None表示全部共享。PS: 無用,因為pymysql和MySQLdb等模塊的 threadsafety都為1,所有值無論設置為多少,_maxcached永遠為0,所以永遠是所有鏈接都共享。blocking=True, # 連接池中如果沒有可用連接后,是否阻塞等待。True,等待;False,不等待然后報錯maxusage=None, # 一個鏈接最多被重復使用的次數,None表示無限制setsession=[], # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."]ping=0,# ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = alwayshost='127.0.0.1',port=3306,user='root',password='123',database='pooldb',charset='utf8' )def func():# 檢測當前正在運行連接數的是否小于最大鏈接數,如果不小于則:等待或報raise TooManyConnections異常# 否則# 則優先去初始化時創建的鏈接中獲取鏈接 SteadyDBConnection。# 然后將SteadyDBConnection對象封裝到PooledDedicatedDBConnection中并返回。# 如果最開始創建的鏈接沒有鏈接,則去創建一個SteadyDBConnection對象,再封裝到PooledDedicatedDBConnection中并返回。# 一旦關閉鏈接后,連接就返回到連接池讓后續線程繼續使用。conn = POOL.connection()# print(th, '鏈接被拿走了', conn1._con)# print(th, '池子里目前有', pool._idle_cache, '\r\n') cursor = conn.cursor()cursor.execute('select * from tb1')result = cursor.fetchall()conn.close()func() View Code工作實用:
import pymysqlfrom DBUtils.PooledDB import PooledDBPOOL = PooledDB(creator=pymysql, # 使用鏈接數據庫的模塊maxconnections=6, # 連接池允許的最大連接數,0和None表示不限制連接數mincached=2, # 初始化時,鏈接池中至少創建的空閑的鏈接,0表示不創建maxcached=5, # 鏈接池中最多閑置的鏈接,0和None不限制maxshared=3, # 鏈接池中最多共享的鏈接數量,0和None表示全部共享。PS: 無用,因為pymysql和MySQLdb等模塊的 threadsafety都為1,所有值無論設置為多少,_maxcached永遠為0,所以永遠是所有鏈接都共享。blocking=True, # 連接池中如果沒有可用連接后,是否阻塞等待。True,等待;False,不等待然后報錯maxusage=None, # 一個鏈接最多被重復使用的次數,None表示無限制setsession=[], # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."]ping=0,# ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = alwayshost='127.0.0.1',port=3306,user='root',password='123',database='pooldb',charset='utf8')"""class SQLHelper(object):@staticmethoddef fetch_one(sql,args):conn = POOL.connection()cursor = conn.cursor()cursor.execute(sql, args)result = cursor.fetchone()conn.close()return result@staticmethoddef fetch_all(self,sql,args):conn = POOL.connection()cursor = conn.cursor()cursor.execute(sql, args)result = cursor.fetchone()conn.close()return result# 以后使用:result = SQLHelper.fetch_one('select * from xxx',[])print(result)""" View Code?十四 信號
什么是信號:signal 一種處理異步時間的方法。信號是POSIX系統的信號,由硬件或軟件觸發,再有操作系統內核發給應用程序的中斷形式。POSIX由一系列的信號集。
什么是信號量:semaphore 一種進程同步的機制。信號量是POSIX進程間通信的工具,在它上面定義了一系列操作原語,簡單地講它可以在進程間進行通信。
flask自己沒有信號,要依賴blinker這個模塊 安裝 pip install blinker
內置信號:
信號放哪里了:放在了signals文件里面
signals.request_started.conncet(函數名) :注冊函數
等待請求的到來
在視圖函數執行之前觸法信號的
是怎么觸發信號的?
signals.request_started.send:方法觸發信號
send是在full_dispatch_request方法中執行的
信號的源碼流程:
a. before_first_requestb. 觸發 request_started 信號c. before_requestd. 模板渲染渲染前的信號 before_render_template.send(app, template=template, context=context)rv = template.render(context) # 模板渲染渲染后的信號 template_rendered.send(app, template=template, context=context)e. after_requestf. session.save_session()g. 觸發 request_finished信號如果上述過程出錯:觸發錯誤處理信號 got_request_exception.send(self, exception=e)h. 觸發信號 request_tearing_down由信號引發的源碼流程:找擴展點 View Code?十五 flask-session插件
Flask中的session處理機制(內置:將session保存在加密cookie中實現)
內置的session:
請求剛到來:獲取隨機字符串,存在則去“數據庫”中獲取原來的個人數據,否則創建一個空容器。 --> 內存:對象(隨機字符串,{放置數據的容器})
# 1. obj = 創建SecureCookieSessionInterface() # 2. obj = open_session(self.request) = SecureCookieSession() # self.session = SecureCookieSession()對象。 self.session = self.app.open_session(self.request) View Code 視圖:操作內存中 對象(隨機字符串,{放置數據的容器})
?響應:內存對象(隨機字符串,{放置數據的容器})
將數據保存到“數據庫”
把隨機字符串寫在用戶cookie中。
自定義:
請求剛到來:
# 創建特殊字典,并添加到Local中。 # 調用關系: # self.session_interface.open_session(self, request) # 由于默認app中的session_interface=SecureCookieSessionInterface() # SecureCookieSessionInterface().open_session(self, request) # 由于默認app中的session_interface=MySessionInterFace() # MySessionInterFace().open_session(self, request) self.session = self.app.open_session(self.request) View Code調用:
from flask import Flask,sessionapp = Flask(__name__) app.secret_key = 'suijksdfsd'import json class MySessionInterFace(object):def open_session(self,app,request):return {}def save_session(self, app, session, response):response.set_cookie('session_idfsdfsdfsdf',json.dumps(session))def is_null_session(self, obj):"""Checks if a given object is a null session. Null sessions arenot asked to be saved.This checks if the object is an instance of :attr:`null_session_class`by default."""return Falseapp.session_interface = MySessionInterFace()@app.route('/') def index():# 特殊空字典# 在local的ctx中找到session# 在空字典中寫值# 在空字典中獲取值session['xxx'] = 123return 'Index'# # 一旦請求到來 # app.__call__ # app.wsgi_app # app.session_interface # app.open_sessionif __name__ == '__main__':app.run() View Code session -> LocalProxy -> 偏函數 -> LocalStack -> Local
請求終止:
flask-session組件
使用:
方式一
from redis import Redis conn = Redis() app.session_interface = RedisSessionInterface(conn,key_prefix='__',use_signer=False) View Code方式二
from redis import Redis from flask.ext.session import Session app.config['SESSION_TYPE'] = 'redis' app.config['SESSION_REDIS'] = Redis(host='192.168.0.94',port='6379') Session(app) View Code @app.route('/') def index(): session['xxx'] = 123 return 'Index'if __name__ == '__main__':app.run() View Code 源碼:
? ? 流程
問題:設置cookie時,如何設定關閉瀏覽器則cookie失效。
response.set_cookie('k','v',exipre=None)
十六 wtforms組建
安裝:pip3 install wtforms
使用:
from flask import Flask, render_template, request, redirect from wtforms import Form from wtforms.fields import core from wtforms.fields import html5 from wtforms.fields import simple from wtforms import validators from wtforms import widgetsapp = Flask(__name__, template_folder='templates') app.debug = Trueclass RegisterForm(Form):name = simple.StringField(label='用戶名',validators=[validators.DataRequired()],widget=widgets.TextInput(),render_kw={'class': 'form-control'},default='alex')pwd = simple.PasswordField(label='密碼',validators=[validators.DataRequired(message='密碼不能為空.')],widget=widgets.PasswordInput(),render_kw={'class': 'form-control'})pwd_confirm = simple.PasswordField(label='重復密碼',validators=[validators.DataRequired(message='重復密碼不能為空.'),validators.EqualTo('pwd', message="兩次密碼輸入不一致")],widget=widgets.PasswordInput(),render_kw={'class': 'form-control'})email = html5.EmailField(label='郵箱',validators=[validators.DataRequired(message='郵箱不能為空.'),validators.Email(message='郵箱格式錯誤')],widget=widgets.TextInput(input_type='email'),render_kw={'class': 'form-control'})gender = core.RadioField(label='性別',choices=((1, '男'),(2, '女'),),coerce=int # “1” “2” )city = core.SelectField(label='城市',choices=(('bj', '北京'),('sh', '上海'),))hobby = core.SelectMultipleField(label='愛好',choices=((1, '籃球'),(2, '足球'),),coerce=int)favor = core.SelectMultipleField(label='喜好',choices=((1, '籃球'),(2, '足球'),),widget=widgets.ListWidget(prefix_label=False),option_widget=widgets.CheckboxInput(),coerce=int,default=[1, 2])def __init__(self, *args, **kwargs):super(RegisterForm, self).__init__(*args, **kwargs)self.favor.choices = ((1, '籃球'), (2, '足球'), (3, '羽毛球'))def validate_pwd_confirm(self, field):"""自定義pwd_confirm字段規則,例:與pwd字段是否一致:param field::return:"""# 最開始初始化時,self.data中已經有所有的值if field.data != self.data['pwd']:# raise validators.ValidationError("密碼不一致") # 繼續后續驗證raise validators.StopValidation("密碼不一致") # 不再繼續后續驗證 @app.route('/register', methods=['GET', 'POST']) def register():if request.method == 'GET':form = RegisterForm(data={'gender': 2,'hobby':[1,]}) # initialreturn render_template('register.html', form=form)else:form = RegisterForm(formdata=request.form)if form.validate():print('用戶提交數據通過格式驗證,提交的值為:', form.data)else:print(form.errors)return render_template('register.html', form=form)if __name__ == '__main__':app.run() View Code源碼流程:
實現方式:
1. 自動生成HTMLclass LoginForm(Form):# 字段(內部包含正則表達式)name = simple.StringField(label='用戶名',validators=[validators.DataRequired(message='用戶名不能為空.'),validators.Length(min=6, max=18, message='用戶名長度必須大于%(min)d且小于%(max)d')],widget=widgets.TextInput(), # 頁面上顯示的插件render_kw={'class': 'form-control'})# 字段(內部包含正則表達式)pwd = simple.PasswordField(label='密碼',validators=[validators.DataRequired(message='密碼不能為空.'),validators.Length(min=8, message='用戶名長度必須大于%(min)d'),validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",message='密碼至少8個字符,至少1個大寫字母,1個小寫字母,1個數字和1個特殊字符')],widget=widgets.PasswordInput(),render_kw={'class': 'form-control'})def __iter__(self):return iter([self.name,self.pwd])# 方式一:obj = LoginForm()print(obj.name) # 調用字段的__str__# 方式二:obj = LoginForm()for item in obj:print(item) # 調用字段的__str__ View Code校驗:
a. 后臺定義好正則 b. 用戶發來數據 c. 對數據進行校驗 View Code 源碼實現:自動生成HTML文件
? ? 解釋:metaclass
類繼承:只要這個指定了Metaclass,那么這個類以及他的子類都指定同一個metaclass - MetaClass作用:用來指定當前類由誰來創建(默認type創建)。- 使用metaclassclass Foo(metaclass=type):pass class Foo(object):__metaclass__ = type- 類繼承class MyType(type):def __init__(self,*args,**kwargs):print('init')super(MyType,self).__init__(*args,**kwargs)def __call__(self, *args, **kwargs):print('call本質:調用類的__new__,再調用類的__init__')return super(MyType,self).__call__( *args, **kwargs)class Foo(metaclass=MyType):passclass Bar(Foo):passobj = Bar()- 問題:1. 什么意思?# 類由type來創建class Foo(metaclass=type)# 繼承Typeclass Foo(type)2. Flask多線程:服務端開多線程 View Code
其他方式使用:
class MyType(type):def __init__(self,*args,**kwargs):print('init')super(MyType,self).__init__(*args,**kwargs)def __call__(self, *args, **kwargs):print('call')return super(MyType,self).__call__(*args,**kwargs)# Base=MyType('Base',(object,),{}) 是有MyType創建; metaclass=MyType # # class Foo(Base): # pass # # Foo() # 1. type可以創建類metaclass=type;MyType也可以創建類metaclass=MyType # 2. Base = MyType('Base', (object,), {}) --> # class Base(metaclass=MyType): # pass # class Foo(Base): # passclass Foo(MyType("Base",(object,),{})):# 第一個是類名,第二個就是他的父類,第三個就是屬性passFoo() View Code class MyType(type):def __init__(self,*args,**kwargs):print('init')super(MyType,self).__init__(*args,**kwargs)def __call__(self, *args, **kwargs):print('call')return super(MyType,self).__call__(*args,**kwargs)def with_metaclass(obj):return MyType('XX',(obj,),{})class Foo(with_metaclass(object)):passFoo() View Code """ 1. 什么意思?# 類由type來創建 class Foo(metaclass=type) # 繼承Type class Foo(type)"""class Foo(object):pass obj = Foo() # 對象是由類創建# 一切皆對象,類由type創建 class Foo(object):passFoo = type('Foo',(object,),{})# 一切皆對象,類由MyType創建 class MyType(type):pass Foo = MyType('Foo',(object,),{})class Foo(object,metaclass=MyType):pass# 一切皆對象,類由MyType創建 class MyType(type):def __init__(self, *args, **kwargs):print('init')super(MyType, self).__init__(*args, **kwargs)def __call__(cls, *args, **kwargs):print('call')return super(MyType, cls).__call__(*args, **kwargs)Foo = MyType('Foo',(object,),{})class Foo(object,metaclass=MyType):passFoo() View Code實例:form = LoginForm()
#!/usr/bin/env python # -*- coding:utf-8 -*- from flask import Flask, render_template, request, redirect from wtforms import Form from wtforms.fields import simple from wtforms import validators from wtforms import widgetsapp = Flask(__name__, template_folder='templates')app.debug = Trueclass MyForm(Form):'''創建字段,內部包含正則表達式'''name=simple.StringField(label='用戶名',validators=[validators.DataRequired(message='用戶名不能為空'),validators.Length(min=6,max=18,message='用戶名的長度不能小于%(min)d不能大于%(max)d')],widget=widgets.TextInput(),render_kw={'class': 'form-control'})password=simple.PasswordField(label='密碼',validators=[validators.DataRequired(message='密碼不能為空'),validators.Length(max=18,message='用戶名不能大于%(max)'),validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",message='密碼至少8個字符,至少1個大寫字母,1個小寫字母,1個數字和1個特殊字符')],widget=widgets.PasswordInput(),render_kw={'class': 'form-control'})@app.route('/login', methods=['GET', 'POST']) def login():if request.method=='GET':form=MyForm()return render_template('login.html',form=form)else:form = MyForm(formdata=request.form)if form.validate():print('用戶提交數據通過格式驗證,提交的值為:', form.data)else:return render_template('login.html', form=form) if __name__ == '__main__':app.run() View Code驗證:form.validate()
import time import threading import sqlalchemy from sqlalchemy import create_engine from sqlalchemy.engine.base import Engine View Code補充:
為什么要在類里面定義方法:主要是為對象的數據進行二次加工
類的對象不能直接被for循環,若想被循環,在類里面加上__iter__方法
十七 SQLAlchmey ORM框架
目標:類/對象操作 -> SQL -> pymysql、MySQLdb -> 再在數據庫中執行。
基本使用:這個不常見,
class MyForm(Form):'''創建字段,內部包含正則表達式'''name=simple.StringField(label='用戶名',validators=[validators.DataRequired(message='用戶名不能為空'),validators.Length(min=6,max=18,message='用戶名的長度不能小于%(min)d不能大于%(max)d')],widget=widgets.TextInput(),render_kw={'class': 'form-control'})password=simple.PasswordField(label='密碼',validators=[validators.DataRequired(message='密碼不能為空'),validators.Length(max=18,message='用戶名不能大于%(max)'),validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",message='密碼至少8個字符,至少1個大寫字母,1個小寫字母,1個數字和1個特殊字符')],widget=widgets.PasswordInput(),render_kw={'class': 'form-control'}) View Code方式一:
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1?charset=utf8",max_overflow=2, # 超過連接池大小外最多創建的連接pool_size=5, # 連接池大小pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) )conn = engine.raw_connection()cursor = conn.cursor()cursor.execute("select * from t1")result = cursor.fetchall()cursor.close()conn.close() View Code方式二:
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1?charset=utf8",max_overflow=0, # 超過連接池大小外最多創建的連接pool_size=5, # 連接池大小pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) )def task(arg):conn = engine.raw_connection()cursor = conn.cursor()cursor.execute(#"select * from t1""select sleep(2)")result = cursor.fetchall()cursor.close()conn.close()for i in range(20):t = threading.Thread(target=task, args=(i,))t.start() View CodeORM:
models.py #!/usr/bin/env python# -*- coding:utf-8 -*-import datetimefrom sqlalchemy import create_enginefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, IndexBase = declarative_base()class Users(Base):__tablename__ = 'users' # 數據庫表名稱id = Column(Integer, primary_key=True) # id 主鍵name = Column(String(32), index=True, nullable=False) # name列,def init_db():"""根據類創建數據庫表:return: """engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",max_overflow=0, # 超過連接池大小外最多創建的連接pool_size=5, # 連接池大小pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) )Base.metadata.create_all(engine)def drop_db():"""根據類刪除數據庫表:return: """engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",max_overflow=0, # 超過連接池大小外最多創建的連接pool_size=5, # 連接池大小pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) )Base.metadata.drop_all(engine)if __name__ == '__main__':#drop_db()#init_db() app.py #!/usr/bin/env python# -*- coding:utf-8 -*-from sqlalchemy.orm import sessionmakerfrom sqlalchemy import create_enginefrom models import Usersengine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)Connection = sessionmaker(bind=engine)# 每次執行數據庫操作時,都需要創建一個Connectioncon = Connection()# ############# 執行ORM操作 #############obj1 = Users(name="alex1")con.add(obj1)# 提交事務 con.commit()# 關閉session con.close() View Code? 詳細信息:
創建表:
import datetime from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, IndexBase = declarative_base()class Users(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True)name = Column(String(32), index=True, nullable=False)email = Column(String(32), unique=True)ctime = Column(DateTime, default=datetime.datetime.now) # 3期師兄extra = Column(Text, nullable=True)__table_args__ = (# UniqueConstraint('id', 'name', name='uix_id_name'),# Index('ix_id_name', 'name', 'email'), )class Hobby(Base):__tablename__ = 'hobby'id = Column(Integer, primary_key=True)caption = Column(String(50), default='籃球')class Person(Base):__tablename__ = 'person'nid = Column(Integer, primary_key=True)name = Column(String(32), index=True, nullable=True)hobby_id = Column(Integer, ForeignKey("hobby.id"))class b2g(Base):__tablename__ = 'b2g'id = Column(Integer, primary_key=True, autoincrement=True)girl_id = Column(Integer, ForeignKey('girl.id'))boy_id = Column(Integer, ForeignKey('boy.id'))class Girl(Base):__tablename__ = 'girl'id = Column(Integer, primary_key=True)name = Column(String(64), unique=True, nullable=False)class Boy(Base):__tablename__ = 'boy'id = Column(Integer, primary_key=True, autoincrement=True)hostname = Column(String(64), unique=True, nullable=False)engine = create_engine("mysql+pymysql://root:0410@127.0.0.1:3306/sqlachemy?charset=utf8",max_overflow=0, # 超過連接池大小外最多創建的連接pool_size=5, # 連接池大小pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) )Base.metadata.create_all(engine) # Base.metadata.drop_all(engine) View Code對于表的增刪改查:
#!/usr/bin/env python # -*- coding:utf-8 -*- import time import threadingfrom sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine from sqlalchemy.sql import textfrom db import Users, Hostsengine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine)session = Session()# ################ 添加 ################ """ obj1 = Users(name="wupeiqi") session.add(obj1)session.add_all([Users(name="wupeiqi"),Users(name="alex"),Hosts(name="c1.com"), ]) session.commit() """# ################ 刪除 ################ """ session.query(Users).filter(Users.id > 2).delete() session.commit() """ # ################ 修改 ################ """ session.query(Users).filter(Users.id > 0).update({"name" : "099"}) session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False) session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, synchronize_session="evaluate") session.commit() """ # ################ 查詢 ################ """ r1 = session.query(Users).all() r2 = session.query(Users.name.label('xx'), Users.age).all() r3 = session.query(Users).filter(Users.name == "alex").all() r4 = session.query(Users).filter_by(name='alex').all() r5 = session.query(Users).filter_by(name='alex').first() r6 = session.query(Users).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(Users.id).all() r7 = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='ed').all() """session.close() View Code查看的其他操作:
# 條件 ret = session.query(Users).filter_by(name='alex').all() ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all() ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all() ret = session.query(Users).filter(Users.id.in_([1,3,4])).all() ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all() ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all() from sqlalchemy import and_, or_ ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all() ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all() ret = session.query(Users).filter(or_(Users.id < 2,and_(Users.name == 'eric', Users.id > 3),Users.extra != "")).all()# 通配符 ret = session.query(Users).filter(Users.name.like('e%')).all() ret = session.query(Users).filter(~Users.name.like('e%')).all()# 限制 ret = session.query(Users)[1:2]# 排序 ret = session.query(Users).order_by(Users.name.desc()).all() ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()# 分組 from sqlalchemy.sql import funcret = session.query(Users).group_by(Users.extra).all() ret = session.query(func.max(Users.id),func.sum(Users.id),func.min(Users.id)).group_by(Users.name).all()ret = session.query(func.max(Users.id),func.sum(Users.id),func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()# 連表 ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()ret = session.query(Person).join(Favor).all()ret = session.query(Person).join(Favor, isouter=True).all()# 組合 q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union(q2).all()q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union_all(q2).all() View Code原生的sql語句:
#!/usr/bin/env python # -*- coding:utf-8 -*- import time import threadingfrom sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine from sqlalchemy.sql import text from sqlalchemy.engine.result import ResultProxy from db import Users, Hostsengine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine)session = Session()# 查詢 # cursor = session.execute('select * from users') # result = cursor.fetchall()# 添加 cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'}) session.commit() print(cursor.lastrowid)session.close() View Code多表操作:
一對多: #!/usr/bin/env python # -*- coding:utf-8 -*- import time import threadingfrom sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine from sqlalchemy.sql import text from sqlalchemy.engine.result import ResultProxy from db import Users, Hosts, Hobby, Personengine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) session = Session() # 添加 """ session.add_all([Hobby(caption='乒乓球'),Hobby(caption='羽毛球'),Person(name='張三', hobby_id=3),Person(name='李四', hobby_id=4), ])person = Person(name='張九', hobby=Hobby(caption='姑娘')) session.add(person)hb = Hobby(caption='人妖') hb.pers = [Person(name='文飛'), Person(name='博雅')] session.add(hb)session.commit() """# 使用relationship正向查詢 """ v = session.query(Person).first() print(v.name) print(v.hobby.caption) """# 使用relationship反向查詢 """ v = session.query(Hobby).first() print(v.caption) print(v.pers) """session.close()多對多: #!/usr/bin/env python # -*- coding:utf-8 -*- import time import threadingfrom sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine from sqlalchemy.sql import text from sqlalchemy.engine.result import ResultProxy from db import Users, Hosts, Hobby, Person, Group, Server, Server2Groupengine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) session = Session() # 添加 """ session.add_all([Server(hostname='c1.com'),Server(hostname='c2.com'),Group(name='A組'),Group(name='B組'), ]) session.commit()s2g = Server2Group(server_id=1, group_id=1) session.add(s2g) session.commit()gp = Group(name='C組') gp.servers = [Server(hostname='c3.com'),Server(hostname='c4.com')] session.add(gp) session.commit()ser = Server(hostname='c6.com') ser.groups = [Group(name='F組'),Group(name='G組')] session.add(ser) session.commit() """# 使用relationship正向查詢 """ v = session.query(Group).first() print(v.name) print(v.servers) """# 使用relationship反向查詢 """ v = session.query(Server).first() print(v.hostname) print(v.groups) """session.close() View Code其他操作:
#!/usr/bin/env python # -*- coding:utf-8 -*- import time import threadingfrom sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine from sqlalchemy.sql import text, func from sqlalchemy.engine.result import ResultProxy from db import Users, Hosts, Hobby, Person, Group, Server, Server2Groupengine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) session = Session()# 關聯子查詢 subqry = session.query(func.count(Server.id).label("sid")).filter(Server.id == Group.id).correlate(Group).as_scalar() result = session.query(Group.name, subqry) """ SELECT `group`.name AS group_name, (SELECT count(server.id) AS sid FROM server WHERE server.id = `group`.id) AS anon_1 FROM `group` """# 原生SQL """ # 查詢 cursor = session.execute('select * from users') result = cursor.fetchall()# 添加 cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'}) session.commit() print(cursor.lastrowid) """session.close()其他 View Code基于scoped_session實現線程安全:
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session from models import Usersengine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine)""" # 線程安全,基于本地線程實現每個線程用同一個session # 特殊的:scoped_session中有原來方法的Session中的一下方法:public_methods = ('__contains__', '__iter__', 'add', 'add_all', 'begin', 'begin_nested','close', 'commit', 'connection', 'delete', 'execute', 'expire','expire_all', 'expunge', 'expunge_all', 'flush', 'get_bind','is_modified', 'bulk_save_objects', 'bulk_insert_mappings','bulk_update_mappings','merge', 'query', 'refresh', 'rollback','scalar' ) """ session = scoped_session(Session)# ############# 執行ORM操作 ############# obj1 = Users(name="alex1") session.add(obj1)# 提交事務 session.commit() # 關閉session session.close() View Code多線程執行實例:
#!/usr/bin/env python # -*- coding:utf-8 -*- import time import threadingfrom sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine from db import Usersengine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine)def task(arg):session = Session()obj1 = Users(name="alex1")session.add(obj1)session.commit()for i in range(10):t = threading.Thread(target=task, args=(i,))t.start()多線程執行示例 View Codeflask_sqlalchemy:Flask-SQLAlchemy(文件和目錄的管理)
Flask和SQLAlchemy的管理 - db = SQLAlchemy()- 包含配置- 包含ORM基類- 包含create_all- engine- 創建連接# 目錄結構保存好 View Codeflask_sqlalchemy實例:https://pan.baidu.com/s/1IL68-68tBDluDtqsB1NS1g
補充:
3. pipreqspip3 install pipreqspipreqs ./ View Code?十八 flask_script和flask_migrate
flask_script:功能相當于django中的manage文件,用于啟動flask項目使用的,python manage.py runserver
flask_migrate:相當于django中的數據庫遷移,makemigrations/migrate -> migrate/upgrade
批量導入:xlrd/xlwt
#!/usr/bin/env python # -*- coding:utf-8 -*- """ 生成依賴文件:pipreqs ./""" from sansa import create_app,db from flask_script import Manager from flask_migrate import Migrate,MigrateCommandapp = create_app() manager = Manager(app) migrate = Migrate(app, db)""" # 數據庫遷移命名python manage.py db initpython manage.py db migratepython manage.py db upgrade """ manager.add_command('db', MigrateCommand)@manager.command def custom(arg):"""自定義命令python manage.py custom 123:param arg::return:"""print(arg)@manager.option('-n', '--name', dest='name') @manager.option('-u', '--url', dest='url') def cmd(name, url):"""自定義命令執行: python manage.py cmd -n wupeiqi -u http://www.oldboyedu.com:param name::param url::return:"""print(name, url)if __name__ == '__main__':manager.run()""" 1. 運行程序時:python manage.py runserver """ View Code?補充:
flask和django的其他導入靜態文件
?
轉載于:https://www.cnblogs.com/fangjie0410/p/8608536.html
總結