python 进程
進程 Process
- 正在執行中的程序稱為進程。進程的執行會占用內存等資源。多個進程同時執行時,每個進程的執行都需要由操作系統按一定的算法(RR調度、優先數調度算法等)分配內存空間
創建一個進程第一種方式
# process模塊是一個創建進程的模塊,借助這個模塊,就可以完成進程的創建。 from multiprocessing import Processdef func(n):# 子進程函數# 獲取當前進程idprint("當前進程id:", os.getpid())# 獲取父進程idprint("父進程id:", os.getppid())for i in range(10):time.sleep(2)print("子進程", n)# 子進程中的程序相當于import的主進程中的程序,那么import的時候會不會執行你import的那個文件的程序啊,前面學的,是會執行的,所以出現了兩次打印 print("-----------------")# Windows下寫代碼開啟子進程時,必須寫上if __name__ == ‘__main__’: if __name__ == "__main__":# 首先我運行當前這個test.py文件,運行這個文件的程序,那么就產生了進程,這個進程我們稱為主進程# 將函數注冊到一個進程中,此時還沒有啟動進程,只是創建了一個進程對象。并且func是不加括號的。p = Process(target=func, args=("傳參",)) # args指定的為傳給target函數的位置參數,是一個元組形式,必須有逗號# 告訴操作系統,給我開啟一個進程,func這個函數就被我們新開的這個進程執行了,# 而這個進程是我主進程運行過程中創建出來的,所以稱這個新創建的進程為主進程的子進程,而主進程又可以稱為這個新進程的父進程。p.start() # start并不是直接就去執行了,我們知道進程有三個狀態,進程會進入進程的三個狀態,就緒,(被調度,也就是時間片切換到它的時候)執行,阻塞,并且在這個三個狀態之間不斷的轉換,等待cpu執行時間片到了。p.json() # 等待子進程執行結束 主進程 才能繼續往下執行# 獲取當前進程idprint("當前進程id:", os.getpid())# 獲取父進程id Pycharm 進程的IDprint("父進程id Pycharm 進程的ID:", os.getppid())# 這是主進程的程序,上面開啟的子進程的程序是和主進程的程序同時運行的,我們稱為異步for i in range(10):time.sleep(1)print("父進程")process 類中的 參數
group # 未使用 值始終為 None target # 表示調用對象,即子進程要執行的任務 args # 表示調用對象的 參數元祖 args=(..., ) kwargs # 表示調用對象的字典 kwargs={"name":'張',} name # 為子進程的名字 定義子進程的名字進程對象的 方法
from multiprocessing import Process p = Process(target=func,)p.start() # 啟動進程 p.json() # 等待子進程執行結束 主進程 才能繼續往下執行 p.Terminate() # 關閉進程 不是自己關閉 而是 給操作系統 發送了一個關閉進程的信號 p.is_alive() # 查看進程是否還活著 p.daemon = True # 設置進程為守護進程 寫在 start()之前 子進程會跟父進程一起結束 p.name # 進程的名字 p.pid # 進程的 id創建進行的第二種方式:
- 自己定義一個類,繼承Process類,必須寫一個run方法,想傳參數,自行寫init方法,然后執行super父類的init方法
驗證進程間的空間隔離
import time from multiprocessing import Process# 進程之間是空間隔離的,不共享資源 global_num = 100def func1():global global_numglobal_num = 0print('子進程全局變量>>>', global_num)if __name__ == '__main__':p1 = Process(target=func1, )p1.start()time.sleep(1)print('主進程的全局變量>>>', global_num)僵和孤兒進程
- 進程結束后資源回收 主進程不會管子進程 自己結束
- 使用 p.json() 主進程會等待子進程結束后 才結束
同步鎖 Lock
同步效率低,但是保證了數據安全 重點
搶票案例
import random import json import time from multiprocessing import Process, Lockdef quang(i, lock):print("等待搶票")time.sleep(1)lock.acquire() # 鎖頭 只有一把 with open("aaa", "r") as f:dic = json.load(f)if dic["piao"] > 0:time.sleep(random.random())dic["piao"] -= 1with open("aaa", "w") as f1:json.dump(dic, f1)print("%s強盜了" % i)else:print("%s沒票了" % i)lock.release() # 還鎖 if __name__ == "__main__":lo = Lock()for i in range(10):p = Process(target=quang, args=(i, lo))p.start()
信號量 Semaphore
阿斯蒂芬
import random import time from multiprocessing import Process, Semaphoredef dbj(i, s):# 信息 入口s.acquire() print("%s號顧客來洗腳了" % i)time.sleep(random.randrange(2, 7))# 信息 出口s.release()if __name__ == "__main__":# 信息# 創建一個計數器,每次acquire就減1,直到減到0,那么上面的任務只有4個在同時異步的執行,后面的進程需要等待.s = Semaphore(5)for i in range(20):p = Process(target=dbj, args=(i, s))p.start()
事件 Event
通過事件來完成紅路燈
import random import time from multiprocessing import Process, Eventdef hld(e):while 1:print("紅燈了!!")time.sleep(3)# 將事件狀態改為 Truee.set()print("綠燈了")time.sleep(5)# 將事件狀態改為 Falsee.clear()def car(i, e):# e.is_set() 查看事件狀態 True Falseif e.is_set():print("%s號車直接通過" % i)else:print("%s號車等紅燈" % i)# 等待 如果 狀態為 True 時 向后執行e.wait()print("%s號車綠燈通過" % i)if __name__ == "__main__":e = Event()p1 = Process(target=hld, args=(e,))p1.start()for i in range(1000):time.sleep(random.random())p = Process(target=car, args=(i, e))p.start() # 方法 e.is_set() # 查看事件狀態(通過改變事件狀態來控制事件其他進程的運行) e.set() # 將事件 改為 True e.clear() # 將事件改為 False e.wait() # 等待 如果 狀態為 True 時 向后執行
進程間通信IPC
水電費
import time from multiprocessing import Process,Queuedef girl(q):print('來自boy的信息',q.get())print('來自校領導的凝視',q.get()) def boy(q):q.put('約嗎')if __name__ == '__main__':q = Queue(5)boy_p = Process(target=boy,args=(q,))girl_p = Process(target=girl,args=(q,))boy_p.start()girl_p.start()time.sleep(1)q.put('好好工作,別亂搞')
隊列 Queue #重點
先進先出
import random import time from multiprocessing import Process, Event, Queueq = Queue(3)q.put(1) # 將對象放入隊列中 會有細微的 延遲 q.put(2) print("隊列是否已經滿了", q.full()) # 隊列是否已經滿了 q.put(3) print("隊列是否已經滿了", q.full()) # q.put(3)print(q.get()) # 取數據 print("隊列是否空了", q.empty()) # 隊列是否空了 print(q.get()) print(q.get()) print("隊列是否空了", q.empty()) print(q.get()) print(q.get(False)) # 判斷隊列是否已經空了 空了就報錯 queue.Empty q.qsize() # 獲取隊列當前大小 就是已存在的數據個數 不可靠while 1:try:print(q.get(False))except:print("11111")breakdef boy(q):q.put("約嗎")def girl(q):while 1:try:print(q.get(False)) # == q.get_nowait() 如果隊列空則報錯except:passif __name__ == "__main__":q = Queue(5)boy = Process(target=boy, args=(q,))girl = Process(target=girl, args=(q,))boy.start()girl.start()time.sleep(1)q.put("好好學習")
生產者消費者模型
解耦 緩沖 降低生產者與消費者之間的 耦合性
import time from multiprocessing import Process, Queuedef producer(p):for i in range(1, 11):time.sleep(1)print("生產了包子%s" % i)# 將生產的包子添加到隊列中p.put("包子%s" % i)# 生產結束后在隊列末尾 添加一個空信號p.put(None)def consumer(p,i):while 1:time.sleep(1.5)# 循環 取出所有元素pp = p.get()if pp:print("%s吃了" % i, pp)else:print("%s吃完了" % i)# 將空信息還回去p.put(None)breakif __name__ == "__main__":q = Queue(10)# 創建pro_p = Process(target=producer, args=(q,))pro_p.start()for i in range(2):con_p = Process(target=consumer, args=(q,i))con_p.start()# p2.join()# p.put(None)
JoinableQueue
JoinableQueue的生產者消費者模型
- import time from multiprocessing import Process, JoinableQueue# 生產者 def producer(q):for i in range(10):time.sleep(0.5)# 創建10 個包子裝進隊列中q.put("包子%s號" % i)print("生產了 包子%s" % i)# 等待隊列中所有內容被取走后 關閉本進程q.join()print("包子賣完了")# 消費者 def consumer(q):while 1:time.sleep(1)# 循環吃包子print("吃了 %s" % q.get())# 每取 一個元素 就給 q.join 傳遞一個信號 記錄取出個數q.task_done() if __name__ == "__main__":# 實現 JoinableQueue 隊列 對象q = JoinableQueue(20)# 將 生產者加入進程pro_p = Process(target=producer, args=(q,))pro_p.start()# 將 消費者 加入進程con_p = Process(target=consumer, args=(q,))# 將消費者設置成守護進程 隨 住程序一起關閉con_p.daemon = Truecon_p.start()# 等待進程 執行完閉 主程序才能關閉pro_p.join()print("關閉程序!")
管道 pipe
管道是不安全的
- from multiprocessing import Process, Pipe, Manager, Lock, Pooldef func(conn2):try:print(conn2.recv())print(conn2.recv())except EOFError:print("管道已經關閉了")conn2.close()if __name__ == '__main__':try:conn1, conn2 = Pipe() # 在創建 Process 對象之前創建管道p = Process(target=func, args=(conn2,))p.start()conn1.send("asdad")conn1.close()conn1.recv()p.join()except OSError:print("管道關閉>>>>>>>>>>")# 方法 recv() 接收 send() 發送 #- 管道默認是雙工的 # 設置為 單工 參數: duplex=False 改為單工 conn1 發送 conn2 接收 如果另一端已經關閉 則 recv() 接收會報錯
數據共享 Manager
多進程同時操作一個文件的數據 不加鎖就會出現錯誤數據
共享: 可以將一個數據傳遞到進程中 在不同的 作用于中 進程可對其進行修改
import time, os from multiprocessing import Process, Manager, Lock'''資源共享'''def func(mm):mm["name"] = "張飛"if __name__ == '__main__':m = Manager()mm = m.dict({"name": "aaaa"})print(mm)p = Process(target=func, args=(mm,))p.start()p.join()print(mm)def func(m_d, ml):with ml:m_d["count"] -= 1if __name__ == '__main__':m = Manager()ml = Lock()m_d = m.dict({"count": 100})lis = []for i in range(20):p = Process(target=func, args=(m_d, ml))p.start()lis.append(p)[i.join() for i in lis]print(m_d)
進程池 Pool
- import time from multiprocessing import Process,Pooldef func(n):for i in range(5):n = n + iprint(n)if __name__ == '__main__':#驗證一下傳參pool = Pool(4)pool.map(func,range(100)) #map自帶join功能,異步執行任務,參數是可迭代的p_list = []for i in range(200):p1 = Process(target=func,args=(i,))p1.start()p_list.append(p1)[p.join() for p in p_list]
-
def func(n):print(n)time.sleep(1)return n * nif __name__ == '__main__':pool = Pool(4) # 進程池 的個數lis = []for i in range(10):# res = p.apply(fun,args=(i,)) #同步執行的方法,他會等待你的任務的返回結果,# 異步運行,根據進程池中有的進程數,每次最多3個子進程在異步執行,并且可以執行不同的任務,傳送任意的參數了。# 返回結果之后,將結果放入列表,歸還進程,之后再執行新的任務# 需要注意的是,進程池中的三個進程不會同時開啟或者同時結束# 而是執行完一個就釋放一個進程,這個進程就去接收新的任務。ret = pool.apply_async(func, args=(i,)) # 異步執行的方法,他會等待你的任務的返回結果,# print(ret.get())lis.append(ret)# print(lis)pool.close() # 不是關閉進程池,而是不允許再有其他任務來使用進程池pool.join() # 這是感知進程池中任務的方法,等待進程池的任務全部執行完pool.ready() # 如果調用完成 返回 Truepool.terminate() # 立即終止所有進程for i in lis:print(i.get()) #使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需getprint("主程序結束")
map 傳參
import time from multiprocessing import Process,Pooldef func(n):print(n)if __name__ == '__main__':pool = Pool(4)# pool.map(func,range(100)) #參數是可迭代的pool.map(func,['sb',(1,2)]) #參數是可迭代的# pool.map(func,range(100)) #map自帶join功能,異步執行任務,參數是可迭代的回調函數 callback
回調函數的形參只能有一個 如果執行函數有多個返回值 那么 回調函數 接收的是一個元祖 包含所有執行函數的返回值
import time, os from multiprocessing import Process, Pooldef func1(n):print(os.getpid())n += 10return ndef func2(nn):# 回調函數 接收 func1 的返回值print(os.getpid())print(nn)return 10if __name__ == '__main__':pool = Pool()print(os.getpid())lis = []c = pool.apply_async(func1, args=(1,), callback=func2) # 將func1 反回的結果交給func2 執行pool.close() # 不是關閉進程池,而是不允許再有其他任務來使用進程池pool.join() # 這是感知進程池中任務的方法,等待進程池的任務全部執行完
多進程爬蟲
from multiprocessing import Process, Pool from urllib.request import urlopen import ssl, re # ?掉數字簽名證書 ssl._create_default_https_context = ssl._create_unverified_contexturls = ['https://www.baidu.com','https://www.python.org','https://www.openstack.org','https://help.github.com/','http://www.sina.com.cn/' ] def func1(path):# 打開網址condent = urlopen(path)# 返回網頁源代碼return condent.read().decode("utf-8")def func2(content):com = re.compile(r"<a(?P<aaa>.*?)</a>")cont = re.findall(com, content)print(cont)if __name__ == '__main__':pool = Pool()for path in urls:tar = pool.apply_async(func1, args=(path,), callback=func2)pool.close()pool.join()轉載于:https://www.cnblogs.com/zhang-zi-yi/p/10755810.html
總結
- 上一篇: package 与 package-lo
- 下一篇: tomcat发布网站的三种方式