python 多进程与多线程配合拷贝文件目录
生活随笔
收集整理的這篇文章主要介紹了
python 多进程与多线程配合拷贝文件目录
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
版本一:使用shutil進(jìn)行拷貝
1 # -*- coding: utf-8 -*- 2 # @author: Tele 3 # @Time : 2019/04/02 下午 3:09 4 # 待改進(jìn): 5 # 1.拷貝邏輯使用原生的io 6 # 2.針對大文件在進(jìn)程內(nèi)部實現(xiàn)多線程方式進(jìn)行拷貝 7 8 9 import time 10 import re 11 import os 12 import shutil 13 import multiprocessing 14 15 16 # 遍歷文件夾 17 def walk_file(file): 18 file_list = list() 19 for root, dirs, files in os.walk(file): 20 # 遍歷文件 21 for f in files: 22 file_list.append(f) 23 return file_list 24 25 26 # 計算文件數(shù)量 27 def get_file_count(dir): 28 return len(walk_file(dir)) 29 30 31 def copy(src, target, queue): 32 target_number = 1 33 if os.path.isdir(src): 34 target_number = get_file_count(src) 35 shutil.copytree(src, target) 36 else: 37 shutil.copyfile(src, target) 38 # 將拷貝完成的文件數(shù)量放入隊列中 39 queue.put(target_number) 40 41 42 def copy_dir(src, desc): 43 total_number = get_file_count(src) 44 # 分隔符檢測 45 src = check_separator(src) 46 desc = check_separator(desc) 47 # print("src:",src) 48 # print("desc:",desc) 49 50 file_dir_list = [src + "/" + i for i in os.listdir(src)] 51 if os.path.exists(desc): 52 shutil.rmtree(desc) 53 pool = multiprocessing.Pool(3) 54 55 # 創(chuàng)建隊列 56 queue = multiprocessing.Manager().Queue() 57 58 # 一個文件/目錄開啟一個進(jìn)程去拷貝 59 for f_name in file_dir_list: 60 target = desc + "/" + f_name[index_list("/", f_name)[1] + 1:] 61 # print(target) 62 # 創(chuàng)建target目錄 63 parent_path = os.path.split(target)[0] 64 if not os.path.exists(parent_path): 65 os.makedirs(parent_path) 66 pool.apply_async(copy, args=(f_name, target, queue,)) 67 68 start = time.time() 69 pool.close() 70 # pool.join() 71 count = 0 72 while True: 73 count += queue.get() 74 # 格式化輸出時兩個%輸出一個%,不換行,每次定位到行首,實現(xiàn)覆蓋 75 print("\r拷貝進(jìn)度為 %.2f %%" % (count * 100 / total_number), end="") 76 if count >= total_number: 77 break 78 end = time.time() 79 print() 80 print("耗時-----", (end - start), "s") 81 82 83 # 查找指定字符出現(xiàn)的全部索引位置 84 def index_list(c, s): 85 return [i.start() for i in re.finditer(c, s)] 86 87 88 # 檢測目錄結(jié)尾是否有 "/" 89 def check_separator(path): 90 if path.rindex("/") == len(path) - 1: 91 return path[0:path.rindex("/")] 92 return path 93 94 95 def main(): 96 copy_dir("f:/ftp_mypc/", "e:/ftp_mypc/") 97 98 99 if __name__ == '__main__': 100 main()這樣做仍然有些小問題,對于大文件可以在進(jìn)程內(nèi)部采用多線程的方式,可以看到使用shutil進(jìn)行拷貝時我們沒有辦法實現(xiàn)字節(jié)切割,于是有了下面的版本二
?版本二:
1 # -*- coding: utf-8 -*- 2 # @author: Tele 3 # @Time : 2019/04/02 下午 3:09 4 # 使用多進(jìn)程拷貝文件夾,對于大文件進(jìn)程內(nèi)部又使用了多線程進(jìn)行拷貝 5 # 使用進(jìn)程池實現(xiàn)多進(jìn)程時,使用的消息隊列要使用multiprocessing.Manager().Queue()創(chuàng)建 6 7 import time 8 import re 9 import os 10 import shutil 11 import multiprocessing 12 import math 13 from concurrent.futures import ThreadPoolExecutor, wait 14 15 # 設(shè)置單個文件的最大值:209715200 200M 16 MAX_SINGLE_FILE_SIZE = 209715200 17 mutex = multiprocessing.Lock() 18 executor = ThreadPoolExecutor(max_workers=3) 19 20 21 # 遍歷文件夾 22 def walk_file(file): 23 file_list = list() 24 for root, dirs, files in os.walk(file): 25 # 遍歷文件 26 for f in files: 27 file_list.append(f) 28 29 # 空文件夾處理 30 for d in dirs: 31 if len(os.listdir(os.path.join(root, d))) == 0: 32 file_list.append(d) 33 return file_list 34 35 36 # 計算文件數(shù)量 37 def get_file_count(dir): 38 return len(walk_file(dir)) 39 40 41 def copy(src, target, queue): 42 target_number = 1 43 buffer = 1024 44 # 文件夾 45 if os.path.isdir(src): 46 target_number = get_file_count(src) 47 for root, dirs, files in os.walk(src): 48 # 遍歷文件 49 for f in files: 50 drive = os.path.splitdrive(target)[0] 51 target = drive + os.path.splitdrive(os.path.join(root, f))[1] 52 copy_single_file(buffer, os.path.join(root, f), target) 53 # 空文件夾 54 for d in dirs: 55 drive = os.path.splitdrive(target)[0] 56 target = drive + os.path.splitdrive(os.path.join(root, d))[1] 57 # 檢查文件的層級目錄 58 if not os.path.exists(target): 59 os.makedirs(target) 60 else: 61 copy_single_file(buffer, src, target) 62 # 將拷貝完成的文件數(shù)量放入隊列中 63 queue.put(target_number) 64 65 66 # 拷貝單文件 67 def copy_single_file(buffer, src, target): 68 file_size = os.path.getsize(src) 69 rs = open(src, "rb") 70 71 # 檢查文件的層級目錄 72 parent_path = os.path.split(target)[0] 73 if not os.path.exists(parent_path): 74 os.makedirs(parent_path) 75 76 ws = open(target, "wb") 77 # 小文件直接讀寫 78 if file_size <= MAX_SINGLE_FILE_SIZE: 79 while True: 80 content = rs.read(buffer) 81 ws.write(content) 82 if len(content) == 0: 83 break 84 ws.flush() 85 else: 86 # 設(shè)置每個線程拷貝的字節(jié)數(shù) 50M 87 PER_THREAD_SIZE = 52428800 88 # 構(gòu)造參數(shù)并執(zhí)行 89 task_list = list() 90 for i in range(math.ceil(file_size / PER_THREAD_SIZE)): 91 byte_size = PER_THREAD_SIZE 92 # 最后一個線程拷貝的字節(jié)數(shù)應(yīng)該是取模 93 if i == math.ceil(file_size / PER_THREAD_SIZE) - 1: 94 byte_size = file_size % PER_THREAD_SIZE 95 start = i * PER_THREAD_SIZE + i 96 t = executor.submit(copy_file_thread, start, byte_size, rs, ws) 97 task_list.append(t) 98 wait(task_list) 99 if rs: 100 rs.close() 101 if ws: 102 ws.close() 103 104 105 # 多線程拷貝 106 def copy_file_thread(start, byte_size, rs, ws): 107 mutex.acquire() 108 buffer = 1024 109 count = 0 110 rs.seek(start) 111 ws.seek(start) 112 while True: 113 if count + buffer <= byte_size: 114 content = rs.read(buffer) 115 count += len(content) 116 write(content, ws) 117 else: 118 content = rs.read(byte_size % buffer) 119 count += len(content) 120 write(content, ws) 121 break 122 # global total_count 123 # total_count += byte_size 124 # print("\r拷貝進(jìn)度為%.2f %%" % (total_count * 100 / file_size), end="") 125 mutex.release() 126 127 128 def write(content, ws): 129 ws.write(content) 130 ws.flush() 131 132 133 def copy_dir(src, desc): 134 # 獲得待拷貝的文件總數(shù)(含空文件夾) 135 total_number = get_file_count(src) 136 # 分隔符檢測 137 src = check_separator(src) 138 desc = check_separator(desc) 139 # print("src:",src) 140 # print("desc:",desc) 141 142 file_dir_list = [src + "/" + i for i in os.listdir(src)] 143 if os.path.exists(desc): 144 shutil.rmtree(desc) 145 146 # 進(jìn)程池 147 pool = multiprocessing.Pool(3) 148 149 # 創(chuàng)建隊列 150 queue = multiprocessing.Manager().Queue() 151 152 # 一個文件/目錄開啟一個進(jìn)程去拷貝 153 for f_name in file_dir_list: 154 target = os.path.splitdrive(desc)[0] + "/" + os.path.splitdrive(f_name)[1] 155 # target = desc + "/" + f_name[index_list("/", f_name)[1] + 1:] 156 # print(target) 157 # 創(chuàng)建target目錄 158 parent_path = os.path.split(target)[0] 159 if not os.path.exists(parent_path): 160 os.makedirs(parent_path) 161 pool.apply_async(copy, args=(f_name, target, queue)) 162 163 start = time.time() 164 pool.close() 165 # pool.join() 166 count = 0 167 while True: 168 count += queue.get() 169 # 格式化輸出時兩個%輸出一個%,不換行,每次定位到行首,實現(xiàn)覆蓋 170 print("\r當(dāng)前進(jìn)度為 %.2f %%" % (count * 100 / total_number), end="") 171 if count >= total_number: 172 break 173 174 executor.shutdown() 175 end = time.time() 176 print() 177 print("耗時-----", (end - start), "s") 178 179 180 # 查找指定字符出現(xiàn)的全部索引位置 181 def index_list(c, s): 182 return [i.start() for i in re.finditer(c, s)] 183 184 185 # 檢測目錄結(jié)尾是否有 "/" 186 def check_separator(path): 187 if path.rindex("/") == len(path) - 1: 188 return path[0:path.rindex("/")] 189 return path 190 191 192 def main(): 193 copy_dir("f:/ftp_mypc/", "e:/ftp_mypc/") 194 195 196 if __name__ == '__main__': 197 main()?
轉(zhuǎn)載于:https://www.cnblogs.com/tele-share/p/10656811.html
總結(jié)
以上是生活随笔為你收集整理的python 多进程与多线程配合拷贝文件目录的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python编写代码自动运行程序_利用P
- 下一篇: devcon 用法2