数据库和ORMS:使用SQLAlchemy与数据库通信
生活随笔
收集整理的這篇文章主要介紹了
数据库和ORMS:使用SQLAlchemy与数据库通信
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
文章目錄
- 1. 環(huán)境安裝
- 2. 使用SQLAlchemy與SQL數(shù)據(jù)庫通信
- 2.1 創(chuàng)建表
- 2.2 連接數(shù)據(jù)庫
- 2.3 insert、select
- 2.4 update、delete
- 2.5 relationships
- 2.6 用Alembic進(jìn)行數(shù)據(jù)庫遷移
learn from 《Building Data Science Applications with FastAPI》
1. 環(huán)境安裝
docker 安裝 MongoDB 服務(wù)
docker run -d --name fastapi-mongo -p 27017:27017 mongo:4.42. 使用SQLAlchemy與SQL數(shù)據(jù)庫通信
安裝 pip install databases[sqlite]
2.1 創(chuàng)建表
# models.pyimport sqlalchemy from datetime import datetime from typing import Optional from pydantic import BaseModel, Fieldmetadata = sqlalchemy.MetaData() # 創(chuàng)建元數(shù)據(jù)對象posts = sqlalchemy.Table( # 創(chuàng)建表對象'posts', # 表名metadata, # 元數(shù)據(jù)對象# 列對象(列名,類型,其他選項(xiàng))sqlalchemy.Column('id', sqlalchemy.Integer, primary_key=True, autoincrement=True),sqlalchemy.Column('publication_date', sqlalchemy.DateTime(), nullable=False),sqlalchemy.Column('title', sqlalchemy.String(255), nullable=False),sqlalchemy.Column('text', sqlalchemy.Text(), nullable=False), )class PostBase(BaseModel):title: strtext: strpublication_date: datetime = Field(dafault_factory=datetime.now)class PostPartialUpdate(BaseModel):text: Optional[str] = Nonecontent: Optional[str] = Noneclass PostCreate(PostBase):passclass PostDB(PostBase):id: int2.2 連接數(shù)據(jù)庫
# _*_ coding: utf-8 _*_ # @Time : 2022/3/8 9:28 # @Author : Michael # @File : database.py # @desc : import sqlalchemy from databases import Database DB_URL = 'sqlite:///cp6_sqlalchemy.db' database = Database(DB_URL) sqlalchemy_engine = sqlalchemy.create_engine(DB_URL)def get_database() -> Database:return database2.3 insert、select
# _*_ coding: utf-8 _*_ # @Time : 2022/3/8 9:40 # @Author : Michael # @File : app.py # @desc :from typing import List, Tuple import uvicorn from databases import Database from fastapi import Depends, FastAPI, HTTPException, Query, statusfrom database import get_database, sqlalchemy_engine from models import metadata, posts, PostDB, PostCreate, PostPartialUpdateapp = FastAPI()@app.on_event('startup') # 啟動(dòng)的時(shí)候執(zhí)行數(shù)據(jù)庫連接 async def startup():await get_database().connect()metadata.create_all(sqlalchemy_engine)@app.on_event("shutdown") # 關(guān)閉的時(shí)候執(zhí)行數(shù)據(jù)庫斷開連接 async def shutdown():await get_database().disconnect()async def pagination(skip: int = Query(0, ge=0),limit: int = Query(10, ge=0),) -> Tuple[int, int]:capped_limit = min(100, limit)return (skip, capped_limit)async def get_post_or_404(id: int, database: Database = Depends(get_database)) -> PostDB:select_query = posts.select().where(posts.c.id == id)raw_post = await database.fetch_one(select_query)if raw_post is None:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)return PostDB(**raw_post)# 開始插入數(shù)據(jù) @app.post("/posts/", response_model=PostDB, status_code=status.HTTP_201_CREATED) async def create_post(post: PostCreate, db: Database = Depends(get_database)) -> PostDB:# 創(chuàng)建插入語句,不必手寫sqlinsert_query = posts.insert().values(post.dict())# 執(zhí)行插入語句命令post_id = await db.execute(insert_query)post_db = await get_post_or_404(post_id, db)return post_db@app.get("/posts/{id}", response_model=PostDB) async def get_post(post: PostDB = Depends(get_post_or_404)) -> PostDB:return post@app.get("/posts") async def list_posts(pagination: Tuple[int, int] = Depends(pagination),database: Database = Depends(get_database),) -> List[PostDB]:skip, limit = paginationselect_query = posts.select().offset(skip).limit(limit)rows = await database.fetch_all(select_query)results = [PostDB(**row) for row in rows]return resultsif __name__ == '__main__':uvicorn.run(app='app:app', host="127.0.0.1", port=8001, reload=True, debug=True)
2.4 update、delete
# update @app.patch("/posts/{id}", response_model=PostDB) async def update_post(post_update: PostPartialUpdate,post: PostDB = Depends(get_post_or_404),database: Database = Depends(get_database)) -> PostDB:update_query = (posts.update().where(posts.c.id == post.id).values(post_update.dict(exclude_unset=True)))await database.execute(update_query)post_db = await get_post_or_404(post.id, database)return post_db # delete @app.delete("/posts/{id}",status_code=status.HTTP_204_NO_CONTENT) async def delete_post(post: PostDB = Depends(get_post_or_404),database: Database = Depends(get_database)) -> None:delete_query = posts.delete().where(posts.c.id == post.id)await database.execute(delete_query)2.5 relationships
models.py 編寫新的表
comments = sqlalchemy.Table("comments",metadata,sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True, autoincrement=True),# 定義連接的外鍵sqlalchemy.Column("post_id", sqlalchemy.ForeignKey("posts.id", ondelete="CASCADE"), nullable=False),sqlalchemy.Column("publication_date", sqlalchemy.DateTime(), nullable=False),sqlalchemy.Column("content", sqlalchemy.Text(), nullable=False), )class CommentBase(BaseModel):post_id: intpublication_date: datetime = Field(default_factory=datetime.now)content: strclass CommentCreate(CommentBase):passclass CommentDB(CommentBase):id: intapp.py 添加內(nèi)容
from typing import List, Mapping, Tuple, cast from models import metadata, posts, PostDB, PostCreate, PostPartialUpdate, comments, CommentCreate, CommentDB@app.post("/comments", response_model=CommentDB, status_code=status.HTTP_201_CREATED) async def create_comment(comment: CommentCreate, database: Database = Depends(get_database) ) -> CommentDB:# 選取post表單數(shù)據(jù)select_post_query = posts.select().where(posts.c.id == comment.post_id)post = await database.fetch_one(select_post_query)if post is None:raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Post {id} does not exist")# 插入comment 語句insert_query = comments.insert().values(comment.dict())comment_id = await database.execute(insert_query)# 查詢 commentselect_query = comments.select().where(comments.c.id == comment_id)raw_comment = cast(Mapping, await database.fetch_one(select_query))return CommentDB(**raw_comment)
獲取一個(gè)post的全部comments
models.py
class PostPublic(PostDB):comments: Optional[List[CommentDB]] = Noneapp.py
# _*_ coding: utf-8 _*_ # @Time : 2022/3/8 9:40 # @Author : Michael # @File : app.py # @desc :from typing import List, Mapping, Tuple, cast import uvicorn from databases import Database from fastapi import Depends, FastAPI, HTTPException, Query, statusfrom database import get_database, sqlalchemy_engine from models import metadata, posts, PostDB, PostCreate, PostPartialUpdate, comments, CommentCreate, CommentDB, \PostPublicapp = FastAPI()@app.on_event('startup') # 啟動(dòng)的時(shí)候執(zhí)行數(shù)據(jù)庫連接 async def startup():await get_database().connect()metadata.create_all(sqlalchemy_engine)@app.on_event("shutdown") # 關(guān)閉的時(shí)候執(zhí)行數(shù)據(jù)庫斷開連接 async def shutdown():await get_database().disconnect()async def pagination(skip: int = Query(0, ge=0),limit: int = Query(10, ge=0), ) -> Tuple[int, int]:capped_limit = min(100, limit)return (skip, capped_limit)async def get_post_or_404(id: int, database: Database = Depends(get_database)) -> PostPublic:select_query = posts.select().where(posts.c.id == id)raw_post = await database.fetch_one(select_query)if raw_post is None:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)# 編號(hào)為id的post的所有commentsselect_post_comment_query = comments.select().where(comments.c.post_id == id)raw_comments = await database.fetch_all(select_post_comment_query)comments_list = [CommentDB(**row) for row in raw_comments]return PostPublic(**raw_post, comments=comments_list)# 開始插入數(shù)據(jù) @app.post("/posts", response_model=PostDB, status_code=status.HTTP_201_CREATED) async def create_post(post: PostCreate, db: Database = Depends(get_database)) -> PostPublic:# 創(chuàng)建插入語句,不必手寫sqlinsert_query = posts.insert().values(post.dict())# 執(zhí)行插入語句命令post_id = await db.execute(insert_query)post_db = await get_post_or_404(post_id, db)return post_db@app.get("/posts") async def list_posts(pagination: Tuple[int, int] = Depends(pagination),database: Database = Depends(get_database), ) -> List[PostDB]:skip, limit = paginationselect_query = posts.select().offset(skip).limit(limit)rows = await database.fetch_all(select_query)results = [PostDB(**row) for row in rows]return results@app.get("/posts/{id}", response_model=PostPublic) async def get_post(post: PostPublic = Depends(get_post_or_404)) -> PostPublic:return post@app.get("/posts") async def list_posts(pagination: Tuple[int, int] = Depends(pagination),database: Database = Depends(get_database), ) -> List[PostDB]:skip, limit = paginationselect_query = posts.select().offset(skip).limit(limit)rows = await database.fetch_all(select_query)results = [PostDB(**row) for row in rows]return results# update @app.patch("/posts/{id}", response_model=PostPublic) async def update_post(post_update: PostPartialUpdate,post: PostPublic = Depends(get_post_or_404),database: Database = Depends(get_database)) -> PostPublic:update_query = (posts.update().where(posts.c.id == post.id).values(post_update.dict(exclude_unset=True)))await database.execute(update_query)post_db = await get_post_or_404(post.id, database)return post_db# delete @app.delete("/posts/{id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_post(post: PostPublic = Depends(get_post_or_404),database: Database = Depends(get_database)) -> None:delete_query = posts.delete().where(posts.c.id == post.id)await database.execute(delete_query)@app.post("/comments", response_model=CommentDB, status_code=status.HTTP_201_CREATED) async def create_comment(comment: CommentCreate, database: Database = Depends(get_database) ) -> CommentDB:select_post_query = posts.select().where(posts.c.id == comment.post_id)post = await database.fetch_one(select_post_query)if post is None:raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Post {comment.post_id} does not exist")insert_query = comments.insert().values(comment.dict())comment_id = await database.execute(insert_query)select_query = comments.select().where(comments.c.id == comment_id)raw_comment = cast(Mapping, await database.fetch_one(select_query))return CommentDB(**raw_comment)if __name__ == '__main__':uvicorn.run(app='app:app', host="127.0.0.1", port=8001, reload=True, debug=True)
2.6 用Alembic進(jìn)行數(shù)據(jù)庫遷移
pip install alembic終端輸入:
alembic init alembic初始化遷移環(huán)境,其中包括一組文件和目錄,Alembic將在其中存儲(chǔ)其配置和遷移文件,需要一起提交 git
在 env.py 中導(dǎo)入元數(shù)據(jù)
from web_python_dev.sqlalchemy1.models import metadatatarget_metadata = metadata編輯ini配置
開始遷移
alembic revision --autogenerate -m "Initial migration"之后會(huì)生成一個(gè)py文件
該代碼內(nèi)有兩個(gè)函數(shù):upgrade,downgrade用于數(shù)據(jù)遷移和回滾
數(shù)據(jù)的遷移和升級(jí)之前請做好備份和測試,防止丟失損壞
https://alembic.sqlalchemy.org/en/latest/index.html
總結(jié)
以上是生活随笔為你收集整理的数据库和ORMS:使用SQLAlchemy与数据库通信的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 信息的表示和处理
- 下一篇: Java 文件 IO 操作