python多线程队列和池_Python3 从零单排28_线程队列进程池线程池
1.線程隊列
線程隊列有三種:先進先出,后進先出,按優先級進出,具體如下:
1 importqueue2
3 #先進先出
4 q = queue.Queue(3)5
6 q.put(1)7 q.put(2)8 q.put(3)9 #q.put(4) # 再放阻塞,等待隊列消費
10 #q.put(4,block = False) # 不阻塞,強制放數據,如果滿的情況下直接報錯 等價與 q.put_nowait(4)
11 #q.put(4,block = True) # 阻塞,等待放數據,如果滿的情況下阻塞,默認是True
12 #q.put(4, block=True, timeout=3) # 阻塞等待3秒,3秒還在阻塞,強制放數據,滿的情況下報錯
13 print(q.full())14 print(q.empty())15
16 print(q.get())17 print(q.get())18 print(q.get())19 #print(q.get()) # 再拿阻塞,等待隊列新增數據 block timeout同put
20 print(q.full())21 print(q.empty())22
23
24 #后進先出 同堆棧原理
25 q = queue.LifoQueue(3)26
27 q.put(1)28 q.put(2)29 q.put(3)30 #q.put(4) # 再放阻塞,等待隊列消費
31 #q.put(4,block = False) # 不阻塞,強制放數據,如果滿的情況下直接報錯 等價與 q.put_nowait(4)
32 #q.put(4,block = True) # 阻塞,等待放數據,如果滿的情況下阻塞,默認是True
33 #q.put(4, block=True, timeout=3) # 阻塞等待3秒,3秒還在阻塞,強制放數據,滿的情況下報錯
34 print(q.full())35 print(q.empty())36
37 print(q.get())38 print(q.get())39 print(q.get())40 #print(q.get()) # 再拿阻塞,等待隊列新增數據 block timeout同put
41 print(q.full())42 print(q.empty())43
44 #優先級進出 優先級越小的先出
45 q = queue.PriorityQueue(3)46
47 q.put([50, 1])48 q.put([20, 2])49 q.put([30, 3])50 #q.put([50, 4]) # 再放阻塞,等待隊列消費
51 print(q.full())52 print(q.empty())53
54 print(q.get())55 print(q.get())56 print(q.get())57 #print(q.get()) # 再拿阻塞,等待隊列新增數據 block timeout同put
58 print(q.full())59 print(q.empty())
View Code
2.進程池&線程池
在剛開始學多進程或多線程時,我們迫不及待地基于多進程或多線程實現并發的套接字通信。
然而這種實現方式的致命缺陷是:服務的開啟的進程數或線程數都會隨著并發的客戶端數目地增多而增多,這會對服務端主機帶來巨大的壓力,甚至于不堪重負而癱瘓。
于是我們必須對服務端開啟的進程數或線程數加以控制,讓機器在一個自己可以承受的范圍內運行,這就是進程池或線程池的用途。
例如進程池,就是用來存放進程的池子,本質還是基于多進程,只不過是對開啟進程的數目加上了限制。
2.1基本用法:
1、submit(fn, *args, **kwargs)
異步提交任務
2、map(func, *iterables, timeout=None, chunksize=1)
取代for循環submit的操作
3、shutdown(wait=True)
相當于進程池的pool.close()+pool.join()操作
wait=True,等待池內所有任務執行完畢回收完資源后才繼續
wait=False,立即返回,并不會等待池內的任務執行完畢
但不管wait參數為何值,整個程序都會等到所有任務執行完畢
submit和map必須在shutdown之前
4、result(timeout=None)
取得結果
5、add_done_callback(fn)
回調函數
1 from concurrent.futures importProcessPoolExecutor, ThreadPoolExecutor2 from threading importcurrent_thread3 importtime, random, os4
5
6 defsayhi(name):7 print("%s say hi... pid:%s; current_thread:%s" %(name, os.getpid(), current_thread().getName()))8 time.sleep(random.randint(1, 3))9 print("%s say bye... pid:%s; current_thread:%s" %(name, os.getpid(), current_thread().getName()))10
11
12 if __name__ == "__main__":13 #pool = ProcessPoolExecutor(3) # 實例化進程池,指定最大進程數為3
14 pool = ThreadPoolExecutor(3) #實例化線程池,指定最大線程數為3
15 for i in range(10):16 pool.submit(sayhi, "xg%s" %i,)17 #關閉pool的submit功能,不可以再丟進程或線程進線程池。
18 pool.shutdown(wait=True) #此刻統計當前pool里的所有進程或線程數,每運行完一個-1,直到等于0時,往下運行代碼。等同于進程線程的join
19 print("all over!")
View Code
2.2同步回調? 開啟的多線程變成了串行,拿到第一個線程的執行結果才繼續往下繼續運行
1 #釣魚大賽,參賽者釣魚,然后稱重。
2 from concurrent.futures importThreadPoolExecutor3 importtime, random, os4
5
6 deffishing(name):7 print("%s is fishing..." %name)8 time.sleep(random.randint(2, 5))9 fish = random.randint(5, 15) * "m"
10 res = {"name": name, "fish": fish}11 returnres12
13
14 defweigh(res):15 name = res["name"]16 size = len(res["fish"])17 print("%s 釣到的魚大小為 %s kg" %(name, size))18
19
20 if __name__ == "__main__":21 pool = ThreadPoolExecutor(3)22 res1 = pool.submit(fishing, "xt").result() #同步拿結果,拿到結果才繼續往下走
23 weigh(res1)24 res2 = pool.submit(fishing, "dj").result()25 weigh(res2)26 res3 = pool.submit(fishing, "hh").result()27 weigh(res3)
View Code
2.3異步回調
1 from concurrent.futures importThreadPoolExecutor2 importtime, random, os3
4
5 deffishing(name):6 print("%s is fishing..." %name)7 time.sleep(random.randint(2, 5))8 fish = random.randint(5, 15) * "m"
9 res = {"name": name, "fish": fish}10 returnres11
12
13 defweigh(pool_obj):14 res = pool_obj.result() #拿到線程對象的運行結果,因為是線程運行完才會調用weigh,所以馬上能拿到結果
15 name = res["name"]16 size = len(res["fish"])17 print("%s 釣到的魚大小為 %s kg" %(name, size))18
19
20 if __name__ == "__main__":21 pool = ThreadPoolExecutor(3)22 pool.submit(fishing, "xt").add_done_callback(weigh) #當線程執行完后,將線程對象當參數傳給weigh
23 pool.submit(fishing, "dj").add_done_callback(weigh)24 pool.submit(fishing, "hh").add_done_callback(weigh)
View Code
2.4map用法
1 from concurrent.futures importThreadPoolExecutor,ProcessPoolExecutor2
3 importos,time,random4 deftask(n):5 print('%s is runing' %os.getpid())6 time.sleep(random.randint(1,3))7 return n**2
8
9 if __name__ == '__main__':10
11 executor=ThreadPoolExecutor(max_workers=3)12
13 #for i in range(11):
14 #future=executor.submit(task,i)
15
16 executor.map(task,range(1,12)) #map取代了for+submit
View Code
3.queue實現的線程池
套接字服務端代碼:
1 importsocket, queue2 from threading importThread, currentThread3
4
5 server = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)6 server.bind(("127.0.0.1", 8089))7 server.listen(1000)8 #pool = ThreadPoolExecutor(2)
9 q = queue.Queue(1000)10
11
12 defrec_data():13 whileTrue:14 conn =q.get()15 whileTrue:16 try:17 res = conn.recv(1024)18 if not res:break
19 res =res.upper()20 conn.send(res)21 print("server cunrrent thread: %s" %currentThread().getName())22 exceptException as e:23 print(e)24 conn.close()25 q.task_done()26 break
27
28
29 defstart():30 print("starting...")31 for i in range(2):32 t = Thread(target=rec_data)33 t.daemon =True34 t.start()35 whileTrue:36 conn, addr =server.accept()37 q.put(conn)38 #pool.submit(rec_data, conn, addr)
39
40
41 if __name__ == "__main__":42 start()
View Code
上面的配套客戶端代碼:
1 importsocket2 from threading importThread, currentThread3
4
5 defsend_msg():6 client = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)7 client.connect(("127.0.0.1", 8089))8 count = 1
9 whileTrue:10 msg = input(">>>>:")11 #msg = "%s say hello %s " % (currentThread().getName(), count)
12 print("%s trying to send datas..." %currentThread().getName())13 client.send(msg.encode())14 print("%s trying to resv datas..." %currentThread().getName())15 res = client.recv(1024)16 count += 1
17 print(res)18
19
20 if __name__ == "__main__":21 #for i in range(100):
22 #t = Thread(target=send_msg)
23 #t.start()
24 send_msg()
View Code
4.基于線程的FTP服務器
1.在之前開發的FTP基礎上,開發支持多并發的功能
2.不能使用SocketServer模塊,必須自己實現多線程
3.必須用到隊列Queue模塊,實現線程池
4.允許配置最大并發數,比如允許只有10個并發用戶
實現功能如下:
用戶加密認證
允許同時多用戶登錄(用到并發編程的知識,選做)
每個用戶有自己的家目錄,且只能訪問自己的家目錄
對用戶進行磁盤配額,每個用戶的可用空間不同(選做)
允許用戶在ftp server上隨意切換目錄
允許用戶查看當前目錄下的文件
允許上傳和下載文件,并保證文件的一致性
文件傳輸過程中顯示進度條
服務端代碼:
1 #-*- coding: utf-8 -*-
2 #@Time : 2018/12/14 9:11
3 #@Author : Xiao
4
5
6 importsocket7 importos8 importsys9 importstruct10 importjson11 importhashlib12 importconfigparser13 importsubprocess14 importqueue15 from threading importThread16 from conf.settings import *
17
18
19 classMyserver(object):20 server_socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)21 server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)22 server_socket.bind((SERVER_IP, SERVER_PORT))23 server_socket.listen(MAX_CONNECT)24 q = queue.Queue(MAX_QUEUE) #初始化隊列大小,最對只能有多少個等待鏈接
25
26 def __init__(self, conn):27 """實例化時自動啟動文件服務"""
28 self.conn = conn #接收到的客戶連接信息
29 self.client_addr = None #接收到的客戶連接地址信息
30 self.header_len_bytes = None #發送數據的報文頭信息
31 self.users = self.get_users() #拿到用戶信息,登錄的時候用來判斷
32 self.online = 0 #用戶的登錄狀態,每次接收到客戶端的命令都要判斷用戶是否已經登錄,未登錄需要先登錄
33 self.home = None #用戶的家目錄,用戶登錄的時候會更新家目錄
34 self.cur = None #用戶的當前目錄,用戶登錄的時候當前目錄就是家目錄,隨著用戶cd命令執行更新
35 self.quota = 0 #用戶的家目錄的大小限制,用戶上傳時會先判斷目錄大小是否夠接收文件
36 print("starting....")37
38 @staticmethod39 defget_md5(var):40 """"加密,鹽值為123456"""
41 salt = "123456"
42 new_var = salt +var43 m =hashlib.md5()44 m.update(new_var.encode())45 returnm.hexdigest()46
47 @staticmethod48 defget_users():49 '''拿到用戶基礎信息'''
50 """"初始化用戶信息"""
51 users =configparser.ConfigParser()52 users.read(DATA_PATH)53 returnusers54
55 @property56 defget_code(self):57 """拿到客戶端上傳文件的操作結果"""
58 code_len_bytes = self.conn.recv(4)59 code_len = struct.unpack("i", code_len_bytes)[0] #struct.unpack解壓數據,得到數據頭信息長度
60 code_str = self.conn.recv(code_len).decode("utf-8") #根據上面的長度接收數據頭信息
61 code_dic = json.loads(code_str, encoding="utf-8")62 code = code_dic["code"]63 msg = code_dic["msg"]64 returncode, msg65
66 @property67 defget_msg(self):68 """拿到客戶端發來的請求"""
69 header_len = struct.unpack("i", self.header_len_bytes)[0] #struct.unpack解壓數據,得到數據頭信息長度
70 header_str = self.conn.recv(header_len).decode("utf-8") #根據上面的長度接收數據頭信息
71 header = json.loads(header_str, encoding="utf-8")72 msg_size = header["msg_size"] #根據數據頭信息得到本次要接收的數據大小
73 msg = self.conn.recv(msg_size).decode("utf-8")74 returnmsg75
76 defsend_code(self,code_dic):77 """發送本次請求的結果狀態,客戶端根據這些狀態做下一步操作"""
78 res_code_bytes = bytes(json.dumps(code_dic), encoding='utf-8')79 res_code_len_bytes = struct.pack("i", len(res_code_bytes))80 self.conn.send(res_code_len_bytes)81 self.conn.send(res_code_bytes)82
83 def_login(self, user_info):84 """登陸邏輯,登錄成功后初始化用戶的家目錄、當前目錄、登錄狀態等信息"""
85 info_lis = user_info.split(",")86 if len(info_lis) == 2:87 user, pwd =info_lis88 pwd =self.get_md5(pwd)89 if user in self.users and pwd == self.users[user].get("password"):90 code_dic = {"code": "0", "msg": "login success!"}91 self.send_code(code_dic)92 self.home = os.path.join(HOME_PATH, self.users[user].get("home_dir"))93 self.cur =self.home94 self.quota = float(self.users[user].get("quota"))*(1024**3)95 self.online = 1
96 returnTrue97 code_dic = {"code": "1", "msg": "error username or password!"}98 self.send_code(code_dic)99
100 def_sz(self, file_name):101 """發送文件給客戶端102 1.首先拿到客戶端的確認信息,是否上傳103 2.確定后,判斷文件是否存在,并告訴客戶端接下來是傳文件還是通知文件不存在104 3.文件存在則開始發送文件105 """
106 res_code, res_msg =self.get_code107 if res_code == "0":108 file_path =os.path.join(self.cur, file_name)109 if os.path.exists(file_path) andos.path.isfile(file_path):110 code_dic = {"code": "0", "msg": "start to download %s" %file_name}111 self.send_code(code_dic)112 file_size =os.path.getsize(file_path)113 header = {"file_size": file_size, "file_name": file_name, "md5": "123456"}114 header_bytes = bytes(json.dumps(header), encoding='utf-8')115 header_len_bytes = struct.pack("i", len(header_bytes))116 self.conn.send(header_len_bytes)117 self.conn.send(header_bytes)118 with open(file_path, "rb") as f:119 for line inf:120 self.conn.send(line)121 else:122 code_dic = {"code": "1", "msg": "%s is a directory or file doesn't exist!" %file_name}123 self.send_code(code_dic)124
125 def_rz(self, file_name):126 """保存來自文件127 1.首先確認客戶端是否要傳文件128 2.確定后,判斷文件是否存在,并告訴客戶端接下來是接收文件還是通知文件已存在129 3.用戶當前目錄文件不存在則開始接收文件130 """
131 res_code, res_msg =self.get_code132 if res_code == "0":133 file_name =os.path.basename(file_name)134 file_abspath =os.path.join(self.cur, file_name)135 if notos.path.exists(file_abspath):136 res_code = {"code": "0", "msg": "start to upload file %s..." %file_name}137 self.send_code(res_code)138 header_len_bytes = self.conn.recv(4) #接收4個字節的數據頭信息
139 header_len = struct.unpack("i", header_len_bytes)[0] #struct.unpack解壓數據,得到數據頭信息長度
140 header_str = self.conn.recv(header_len).decode("utf-8") #根據上面的長度接收數據頭信息
141 header = json.loads(header_str, encoding="utf-8")142 file_size = header["file_size"] #根據數據頭信息得到本次要接收的數據大小
143 empty_size =float(self.quota) -os.path.getsize(self.home)144 if empty_size <145 res_code='{"code":' space left to accept file self.send_code else:148 recv_size="0151" with open as f:152 while file_size:>
153 line = self.conn.recv(1024)154 f.write(line) #將每次接收到的數據拼接
155 recv_size += len(line) #實時記錄當前接收到的數據長度
156 else:157 res_code = {"code": "1", "msg": "%s is already exists..." %file_name}158 self.send_code(res_code)159
160 def_ls(self, dirname):161 """
162 1.接收客戶端需要查看的是哪個目錄163 2.判斷目錄是否存在,存在繼續往下走,不存在則直接告訴客戶端失敗,目錄不存在164 3.執行命令,如果服務器端是linux系統,則用ls,如果是windows則用dir165 4.如果命令執行結果為空,返回客戶端當前目錄下沒有文件166 5.如果不為空,則開始發送目錄下的文件夾或文件信息167 """
168 new_dirname = os.path.join(self.cur, dirname) if dirname != "." elseself.cur169 print(new_dirname)170 cmd = "dir" if sys.platform.lower().startswith("win") else "ls"
171 if os.path.exists(new_dirname) and notos.path.isfile(new_dirname):172 res = subprocess.Popen("%s %s" % (cmd, new_dirname), shell=True, stdout=subprocess.PIPE)173 out =res.stdout.read()174 ifout:175 print(out.decode("GBK"))176 res_code = {"code": "0", "msg": "%s dir has follow files or dirs..." %dirname}177 self.send_code(res_code)178 header = {"file_size": len(out)}179 header_bytes = bytes(json.dumps(header), encoding='utf-8')180 header_len_bytes = struct.pack("i", len(header_bytes))181 self.conn.send(header_len_bytes)182 self.conn.send(header_bytes)183 self.conn.send(out)184 else:185 res_code = {"code": "3", "msg": "%s current dir is empty..." %dirname}186 self.send_code(res_code)187 else:188 res_code = {"code": "1", "msg": "%s no such directory" %dirname}189 self.send_code(res_code)190
191 def_cd(self, dirname):192 """
193 1.接收到客戶端的需要進入的目錄信息,跟當前目錄進行拼接194 2.如果目錄存在,則修改當前目錄的變量值,如果不存在則告訴客戶端,目錄不存在195 """
196 new_dirname =os.path.join(self.cur, dirname)197 if os.path.exists(new_dirname) and notos.path.isfile(new_dirname):198 res_code = {"code": "0", "msg": "切換成功,當前目錄為 %s" %dirname}199 self.send_code(res_code)200 self.cur =new_dirname201 else:202 res_code = {"code": "1", "msg": "切換失敗, %s 目錄不存在" %dirname}203 self.send_code(res_code)204
205 defcomunication(self):206 """
207 通信主程序,每個實例都是通過這個方法和客戶端通信的。208 1.先判斷用戶的是否登陸,如果沒有登陸而且請求不是login,則返回客戶端讓其登陸,如果已登陸則往下走209 2.判斷用戶請求的方法是否正確,不正確則返回客戶端,請求方法有誤,如果方法存在則往下走210 3.調用具體的方法211 """
212 whileTrue:213 try:214 self.header_len_bytes = self.conn.recv(4) #接收4個字節的數據頭信息
215 if notself.header_len_bytes:216 break
217 msg =self.get_msg218 print(msg)219 method, args = msg.split(" ", 1)220 if not self.online and method != "login":221 res_code = {"code": "2", "msg": "please login first!"}222 self.send_code(res_code)223 elif hasattr(self, "_%s" % method.lower()) andargs:224 res_code = {"code": "0", "msg": "wait moment,it's working now"}225 self.send_code(res_code)226 func = getattr(self, "_%s" %method.lower())227 func(args)228 else:229 res_code = {"code": "1", "msg": "error request %s !" %msg}230 self.send_code(res_code)231 exceptException as e:232 print(e)233 self.q.task_done()234 self.conn.close()235 break
236
237 @classmethod238 defstart(cls):239 """
240 循環拿隊列q里的鏈接,拿到一個實例化一個,然后啟動comunication方法,開始和客戶端交互241 """
242 whileTrue:243 conn =cls.q.get()244 client =Myserver(conn)245 client.comunication()246
247 @classmethod248 defcreate_thread(cls):249 '''
250 開啟多線程,線程數為settings設置的最大并發數251 '''
252 for i inrange(MAX_RUN):253 t = Thread(target=cls.start)254 t.daemon =True255 t.start()256
257 @classmethod258 defrun(self):259 """
260 啟動,循環等鏈接,每來一個鏈接就賽到隊列里261 """
262 self.create_thread()263 whileTrue:264 print("waiting for connection...")265 conn, client_addr =self.server_socket.accept()266 self.q.put(conn)267
268
269 if __name__ == "__main__":270 Myserver.run()
View Code
客戶端代碼:
1 #-*- coding: utf-8 -*-
2 #@Time : 2018/12/14 9:11
3 #@Author : Xiao
4
5
6 importsocket7 importstruct8 importjson9 importos10 from settings import *
11
12
13 classMyclient(object):14 def __init__(self):15 self.server_socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)16 self.server_socket.connect((SERVER_IP, SERVER_PORT)) #主動初始化TCP服務器連接
17
18 defsend_msg(self, msg_bytes):19 """發送本次請求的指令,服務端根據指令返回數據"""
20 header = {"msg_size": len(msg_bytes)}21 header_bytes = json.dumps(header).encode("utf-8")22 header_size = struct.pack("i", len(header_bytes))23 self.server_socket.send(header_size)24 self.server_socket.send(header_bytes)25 self.server_socket.send(msg_bytes)26
27 defsend_code(self, code_dic):28 """發送上傳文件的結果狀態,服務端根據這些狀態做下一步操作"""
29 res_code_bytes = bytes(json.dumps(code_dic), encoding='utf-8')30 res_code_len_bytes = struct.pack("i", len(res_code_bytes))31 self.server_socket.send(res_code_len_bytes)32 self.server_socket.send(res_code_bytes)33
34 @property35 defget_code(self):36 """拿到服務端關于本次指令的操作結果"""
37 code_len_bytes = self.server_socket.recv(4)38 code_len = struct.unpack("i", code_len_bytes)[0] #struct.unpack解壓數據,得到數據頭信息長度
39 code_str = self.server_socket.recv(code_len).decode("utf-8") #根據上面的長度接收數據頭信息
40 code_dic = json.loads(code_str, encoding="utf-8")41 code = code_dic["code"]42 msg = code_dic["msg"]43 returncode, msg44
45 deflogin(self):46 """登陸函數"""
47 count =048 while count < 3:49 user = input("your name:").strip()50 password = input("your password:").strip()51 if user andpassword:52 msg = "login %s,%s" %(user, password)53 msg_bytes = msg.encode("utf-8")54 self.send_msg(msg_bytes)55 res_code, res_msg =self.get_code56 print(res_msg)57 if res_code == "0":58 login_code, login_msg =self.get_code59 print(login_msg)60 if login_code == "0":61 Myclient.online = 1
62 break
63 count += 1
64 else:65 print("賬號或密碼不能為空!")66 else:67 exit("too many login!")68
69 def_sz(self, file_name):70 """下載文件71 1.首先判斷客戶端本地是否存在文件,存在則告訴服務端不用傳了,不存在則往下走72 2.接收服務器端返回的文件操作結果,是否可以下載,不可以現在則打印服務端的msg,可以下載則開始接收文件73 """
74 file_abspath =os.path.join(DOWNLOAD_PATH, file_name)75 if notos.path.exists(file_abspath):76 data_code = {"code": "0", "msg": file_name}77 self.send_code(data_code)78 res_code, res_msg =self.get_code79 print(res_msg)80 if res_code == "0":81 header_len_bytes = self.server_socket.recv(4) #接收4個字節的數據頭信息
82 header_len = struct.unpack("i", header_len_bytes)[0] #struct.unpack解壓數據,得到數據頭信息長度
83 header_str = self.server_socket.recv(header_len).decode("utf-8") #根據上面的長度接收數據頭信息
84 header = json.loads(header_str, encoding="utf-8")85 file_size = header["file_size"] #根據數據頭信息得到本次要接收的數據大小
86 recv_size =087 tmp_size =088 per_size = file_size/50
89 num = 1
90 with open(file_abspath, "wb") as f:91 while recv_size < file_size: #當接收到的數據小于本次數據長度時就一直接收
92 if tmp_size >per_size:93 rate = str((recv_size / file_size)*100)[:5] + "%"
94 print("%s %s" % ("#"*num, rate))95 tmp_size =096 num += 1
97 line = self.server_socket.recv(1024)98 f.write(line) #將每次接收到的數據拼接
99 recv_size += len(line) #實時記錄當前接收到的數據長度
100 tmp_size +=len(line)101 print("%s %s" % ("#" * (num+1), "100.00%"))102 print("download file %s success!" %file_name)103 else:104 res_code = {"code": "1", "msg": file_name}105 self.send_code(res_code)106 print("%s is already exists" %file_name)107
108 def_rz(self, file_path):109 """上傳文件110 1.首先判斷客戶端本地是否存在文件,不存在則告訴服務端不傳了,存在則往下走111 2.接收服務器端返回的文件操作結果,是否可以上傳,不可以現在則打印服務端的msg,可以上傳則開始發送文件112 """
113 if os.path.exists(file_path) andos.path.isfile(file_path):114 file_name =os.path.basename(file_path)115 res_code = {"code": "0", "msg": file_name}116 self.send_code(res_code)117 res_code, res_msg =self.get_code118 print(res_msg)119 if res_code == "0":120 file_size =os.path.getsize(file_path)121 header = {"file_size": file_size, "file_name": file_name, "md5": "123456"}122 header_bytes = bytes(json.dumps(header), encoding='utf-8')123 header_len_bytes = struct.pack("i", len(header_bytes))124 self.server_socket.send(header_len_bytes)125 self.server_socket.send(header_bytes)126 res_code, res_msg =self.get_code127 print(res_msg)128 upload_size =0129 tmp_size =0130 per_size = file_size / 50
131 num = 1
132 if res_code == "0":133 with open(file_path, "rb") as f:134 for line inf:135 if tmp_size >per_size:136 rate = str((upload_size / file_size) * 100)[:5] + "%"
137 print("%s %s" % ("#" *num, rate))138 tmp_size =0139 num += 1
140 self.server_socket.send(line)141 upload_size +=len(line)142 tmp_size +=len(line)143 print("%s %s" % ("#" * (num + 1), "100.00%"))144 print("upload file %s success!" %file_name)145 else:146 res_code = {"code": "1", "msg": "no such file: %s" %file_path}147 self.send_code(res_code)148 print("上傳文件不存在!")149
150 def_ls(self, dirname):151 """查看當前文件夾下的文件或目錄!"""
152 res_code, res_msg =self.get_code153 print(res_msg)154 if res_code == "0":155 header_len_bytes = self.server_socket.recv(4)156 header_len = struct.unpack("i", header_len_bytes)[0]157 header_str = self.server_socket.recv(header_len).decode("utf-8")158 header = json.loads(header_str, encoding="utf-8")159 file_size = header["file_size"]160 recv_size =0161 res = b''
162 while recv_size <163 res self.server_socket.recv recv_size="len(res)165" print>
168 def_cd(self, dirname):169 """切換目錄"""
170 res_code, res_msg =self.get_code171 print(res_msg)172
173 defrun(self):174 """反復向服務器發送請求,當請求的返回操作碼為0成功時,調用相應屬性,失敗時不做任何處理直接循環"""
175 self.login()176 whileTrue:177 msg = input(">>>>:").strip()178 if msg.lower() == "ls":179 msg = msg + "."
180 msg_lis = msg.split(" ", 1)181 if len(msg_lis) == 2 and hasattr(self, "_%s" %msg_lis[0].lower()):182 method =msg_lis[0].lower()183 args = msg_lis[1]184 msg_bytes = msg.encode("utf-8")185 self.send_msg(msg_bytes)186 res_code, res_msg =self.get_code187 print(res_msg)188 if res_code == "2":189 self.login()190 if res_code == "0":191 func = getattr(self, "_%s" %method.lower())192 func(args)193 else:194 print("輸入格式不正確,請重新輸入!")195
196 def __del__(self):197 '''析構函數,當程序結束時自動關閉socket'''
198 self.server_socket.close()199
200
201 if __name__ == "__main__":202 try:203 client =Myclient()204 client.run()205 exceptException as e:206 print(e)
View Code
163>145>總結
以上是生活随笔為你收集整理的python多线程队列和池_Python3 从零单排28_线程队列进程池线程池的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: pb9调用http发短信post_远程服
- 下一篇: python3数据库表关联_Django