用 Python 高效处理大文件
前言
為了進行并行處理,我們將任務劃分為子單元。它增加了程序處理的作業數量,減少了整體處理時間。
例如,如果你正在處理一個大的CSV文件,你想修改一個單列。我們將把數據以數組的形式輸入函數,它將根據可用的進程數量,一次并行處理多個值。這些進程是基于你的處理器內核的數量。
(文末送讀者福利)
在這篇文章中,我們將學習如何使用multiprocessing、joblib和tqdm Python包減少大文件的處理時間。這是一個簡單的教程,可以適用于任何文件、數據庫、圖像、視頻和音頻。
(文末送讀者福利)
開始
我們將使用來自 Kaggle 的 US Accidents (2016 - 2021) 數據集,它包括280萬條記錄和47個列。
https://www.kaggle.com/datasets/sobhanmoosavi/us-accidents我們將導入multiprocessing、joblib和tqdm用于并行處理,pandas用于數據導入,re、nltk和string用于文本處理。
# Parallel Computingimport multiprocessing as mpfrom joblib import Parallel, delayedfrom tqdm.notebook import tqdm# Data Ingestion import pandas as pd# Text Processing import re from nltk.corpus import stopwordsimport string在我們開始之前,讓我們通過加倍cpu_count()來設置n_workers。正如你所看到的,我們有8個workers。
n_workers = 2 * mp.cpu_count()print(f"{n_workers} workers are available")>>> 8 workers are available下一步,我們將使用pandas read_csv函數讀取大型CSV文件。然后打印出dataframe的形狀、列的名稱和處理時間。
%%time file_name="../input/us-accidents/US_Accidents_Dec21_updated.csv" df = pd.read_csv(file_name)print(f"Shape:{df.shape}\n\nColumn Names:\n{df.columns}\n") 輸出: Shape:(2845342, 47)Column Names:Index(['ID', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng', 'End_Lat', 'End_Lng', 'Distance(mi)', 'Description', 'Number', 'Street', 'Side', 'City', 'County', 'State', 'Zipcode', 'Country', 'Timezone', 'Airport_Code', 'Weather_Timestamp', 'Temperature(F)', 'Wind_Chill(F)', 'Humidity(%)', 'Pressure(in)', 'Visibility(mi)', 'Wind_Direction', 'Wind_Speed(mph)', 'Precipitation(in)', 'Weather_Condition', 'Amenity', 'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway', 'Roundabout', 'Station', 'Stop', 'Traffic_Calming', 'Traffic_Signal', 'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight', 'Astronomical_Twilight'], dtype='object')CPU times: user 33.9 s, sys: 3.93 s, total: 37.9 s Wall time: 46.9 s處理文本
clean_text是一個用于處理文本的簡單函數。我們將使用nltk.copus獲得英語停止詞,并使用它來過濾掉文本行中的停止詞。之后,我們將刪除句子中的特殊字符和多余的空格。它將成為確定串行、并行和批處理的處理時間的基準函數。
def clean_text(text): # Remove stop wordsstops = stopwords.words("english")text = " ".join([word for word in text.split() if word not in stops])# Remove Special Characterstext = text.translate(str.maketrans('', '', string.punctuation))# removing the extra spacestext = re.sub(' +',' ', text)return text串行處理
對于串行處理,我們可以使用pandas的.apply()函數,但是如果你想看到進度條,你需要為pandas激活tqdm,然后使用.progress_apply()函數。
我們將處理280萬條記錄,并將結果保存回 “Description” 列中。
%%time tqdm.pandas()df['Description'] = df['Description'].progress_apply(clean_text)輸出
高端處理器串行處理280萬行花了9分5秒。
100% 🟩🟩🟩🟩 2845342/2845342 [09:05<00:00, 5724.25it/s]CPU times: user 8min 14s, sys: 53.6 s, total: 9min 7s Wall time: 9min 5s多進程處理
有多種方法可以對文件進行并行處理,我們將了解所有這些方法。multiprocessing是一個內置的python包,通常用于并行處理大型文件。
我們將創建一個有8個workers的多處理池,并使用map函數來啟動進程。為了顯示進度條,我們將使用tqdm。
map函數由兩部分組成。第一個部分需要函數,第二個部分需要一個參數或參數列表。
%%time p = mp.Pool(n_workers) df['Description'] = p.map(clean_text,tqdm(df['Description']))輸出
我們的處理時間幾乎提高了3倍。處理時間從9分5秒下降到3分51秒。
100% 🟩🟩🟩🟩 2845342/2845342 [02:58<00:00, 135646.12it/s]CPU times: user 5.68 s, sys: 1.56 s, total: 7.23 s Wall time: 3min 51s并行處理
我們現在將學習另一個Python包來執行并行處理。在本節中,我們將使用joblib的Parallel和delayed來復制map函數。
1、Parallel需要兩個參數:n_job = 8和backend = multiprocessing。
2、然后,我們將在delayed函數中加入clean_text。
3、創建一個循環,每次輸入一個值。
下面的過程是相當通用的,你可以根據你的需要修改你的函數和數組。我曾用它來處理成千上萬的音頻和視頻文件,沒有任何問題。
建議:使用 "try: "和 "except: "添加異常處理。
def text_parallel_clean(array):result = Parallel(n_jobs=n_workers,backend="multiprocessing")(delayed(clean_text)(text) for text in tqdm(array))return result在text_parallel_clean()中添加“Description”列。
%%time df['Description'] = text_parallel_clean(df['Description'])輸出
我們的函數比多進程處理Pool多花了13秒。即使如此,并行處理也比串行處理快4分59秒。
100% 🟩🟩🟩🟩 2845342/2845342 [04:03<00:00, 10514.98it/s]CPU times: user 44.2 s, sys: 2.92 s, total: 47.1 s Wall time: 4min 4s并行批量處理
有一個更好的方法來處理大文件,就是把它們分成若干批,然后并行處理。讓我們從創建一個批處理函數開始,該函數將在單一批次的值上運行clean_function。
批量處理函數
def proc_batch(batch):return [clean_text(text)for text in batch]將文件分割成批
下面的函數將根據workers的數量把文件分成多個批次。在我們的例子中,我們得到8個批次。
def batch_file(array,n_workers):file_len = len(array)batch_size = round(file_len / n_workers)batches = [array[ix:ix+batch_size]for ix in tqdm(range(0, file_len, batch_size))]return batchesbatches = batch_file(df['Description'],n_workers)>>> 100% 8/8 [00:00<00:00, 280.01it/s]運行并行批處理
最后,我們將使用Parallel和delayed來處理批次。
%%time batch_output = Parallel(n_jobs=n_workers,backend="multiprocessing")(delayed(proc_batch)(batch) for batch in tqdm(batches))df['Description'] = [j for i in batch_output for j in i]輸出
我們已經改善了處理時間。這種技術在處理復雜數據和訓練深度學習模型方面非常有名。
100% 🟩🟩🟩🟩 8/8 [00:00<00:00, 2.19it/s]CPU times: user 3.39 s, sys: 1.42 s, total: 4.81 s Wall time: 3min 56stqdm 并發
tqdm將多處理帶到了一個新的水平。它簡單而強大。
process_map需要:
1、函數名稱
2、Dataframe 列名
3、max_workers
4、chucksize與批次大小類似。我們將用workers的數量來計算批處理的大小,或者你可以根據你的喜好來添加這個數字。
輸出
通過一行代碼,我們得到了最好的結果:
100% 🟩🟩🟩🟩 2845342/2845342 [03:48<00:00, 1426320.93it/s]CPU times: user 7.32 s, sys: 1.97 s, total: 9.29 s Wall time: 3min 51s結論
我們需要找到一個平衡點,它可以是串行處理,并行處理,或批處理。如果你正在處理一個較小的、不太復雜的數據集,并行處理可能會適得其反。
在這個教程中,我們已經了解了各種處理大文件的Python包,它們允許我們對數據函數進行并行處理。
如果你只處理一個表格數據集,并且想提高處理性能,那么建議你嘗試Dask、datatable和RAPIDS。
讀者福利:知道你對Python感興趣,便準備了這套python學習資料
對于0基礎小白入門:
如果你是零基礎小白,想快速入門Python是可以考慮的。
一方面是學習時間相對較短,學習內容更全面更集中。
二方面還可以找到適合自己的學習方案
包括:Python永久使用安裝包、Python web開發,Python爬蟲,Python數據分析,人工智能、機器學習等學習教程。帶你從零基礎系統性的學好Python!
零基礎Python學習資源介紹
👉Python學習路線匯總👈
Python所有方向的技術點做的整理,形成各個領域的知識點匯總,它的用處就在于,你可以按照上面的知識點去找對應的學習資源,保證自己學得較為全面。(學習教程文末領取哈)
👉Python必備開發工具👈
溫馨提示:篇幅有限,已打包文件夾,獲取方式在:文末
👉Python學習視頻600合集👈
觀看零基礎學習視頻,看視頻學習是最快捷也是最有效果的方式,跟著視頻中老師的思路,從基礎到深入,還是很容易入門的。
👉實戰案例👈
光學理論是沒用的,要學會跟著一起敲,要動手實操,才能將自己的所學運用到實際當中去,這時候可以搞點實戰案例來學習。
👉100道Python練習題👈
檢查學習結果。
👉面試刷題👈
資料領取
這份完整版的Python全套學習資料已為大家備好,朋友們如果需要可以微信掃描下方二維碼添加,輸入"領取資料" 可免費領取全套資料【有什么需要協作的還可以隨時聯系我】朋友圈也會不定時的更新最前言python知識。
這世界上賺錢成本最低的就是:用知識投資大腦
人生什么時候學習都不晚,晚的是你一直想學卻一直沒有行動,而導致大量內耗
最后祝你學習愉快
好文推薦
了解python的前景:https://blog.csdn.net/weixin_49891576/article/details/127187029
python有什么用:https://blog.csdn.net/weixin_49891576/article/details/127125308
總結
以上是生活随笔為你收集整理的用 Python 高效处理大文件的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 小技巧 - 禁止复制的网站如何破解复制功
- 下一篇: 各种滤波电路及原理