《Python Cookbook 3rd》笔记(4.13):创建数据处理管道
創建數據處理管道
問題
你想以數據管道 (類似 Unix 管道) 的方式迭代處理數據。比如,你有個大量的數據需要處理,但是不能將它們一次性放入內存中。
解法
生成器函數是一個實現管道機制的好辦法。為了演示,假定你要處理一個非常大的日志文件目錄:
foo/access-log-012007.gzaccess-log-022007.gzaccess-log-032007.gz...access-log-012008 bar/access-log-092007.bz2...access-log-022008假設每個日志文件包含這樣的數據:
124.115.6.12 - - [10/Jul/2012:00:18:50 -0500] "GET /robots.txt ..." 200 71 210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /ply/ ..." 200 11875 210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /favicon.ico ..." 404 369 61.135.216.105 - - [10/Jul/2012:00:20:04 -0500] "GET /blog/atom.xml ..." 304 - ...為了處理這些文件,你可以定義一個由多個執行特定任務獨立任務的簡單生成器函數組成的容器。就像這樣:
import os import fnmatch import gzip import bz2 import re def gen_find(filepat, top):'''Find all filenames in a directory tree that match a shell wildcard pattern'''for path, dirlist, filelist in os.walk(top):for name in fnmatch.filter(filelist, filepat):yield os.path.join(path,name)def gen_opener(filenames):'''Open a sequence of filenames one at a time producing a file object.The file is closed immediately when proceeding to the next iteration.'''for filename in filenames:if filename.endswith('.gz'):f = gzip.open(filename, 'rt')elif filename.endswith('.bz2'):f = bz2.open(filename, 'rt')else:f = open(filename, 'rt')yield ff.close()def gen_concatenate(iterators):'''Chain a sequence of iterators together into a single sequence.'''for it in iterators:yield from itdef gen_grep(pattern, lines):'''Look for a regex pattern in a sequence of lines'''pat = re.compile(pattern)for line in lines:if pat.search(line):yield line現在你可以很容易的將這些函數連起來創建一個處理管道。比如,為了查找包含單詞 python 的所有日志行,你可以這樣做:
lognames = gen_find('access-log*', 'www') files = gen_opener(lognames) lines = gen_concatenate(files) pylines = gen_grep('(?i)python', lines) for line in pylines:print(line)如果將來的時候你想擴展管道,你甚至可以在生成器表達式中包裝數據。比如,下面這個版本計算出傳輸的字節數并計算其總和。
lognames = gen_find('access-log*', 'www') files = gen_opener(lognames) lines = gen_concatenate(files) pylines = gen_grep('(?i)python', lines) bytecolumn = (line.rsplit(None,1)[1] for line in pylines) bytes = (int(x) for x in bytecolumn if x != '-') print('Total', sum(bytes))討論
以管道方式處理數據可以用來解決各類其他問題,包括解析,讀取實時數據,定時輪詢等。
為了理解上述代碼,重點是要明白 yield 語句作為數據的生產者而 for 循環語句作為數據的消費者。當這些生成器被連在一起后,每個 yield 會將一個單獨的數據元素傳遞給迭代處理管道的下一階段。在例子最后部分, sum() 函數是最終的程序驅動者,每次從生成器管道中提取出一個元素。
這種方式一個非常好的特點是每個生成器函數很小并且都是獨立的。這樣的話就很容易編寫和維護它們了。很多時候,這些函數如果比較通用的話可以在其他場景重復使用。并且最終將這些組件組合起來的代碼看上去非常簡單,也很容易理解。
使用這種方式的內存效率也不得不提。上述代碼即便是在一個超大型文件目錄中也能工作的很好。事實上,由于使用了迭代方式處理,代碼運行過程中只需要很小很小的內存。
在調用 gen concatenate() 函數的時候你可能會有些不太明白。這個函數的目的是將輸入序列拼接成一個很長的行序列。 itertools.chain() 函數同樣有類似的功能,但是它需要將所有可迭代對象最為參數傳入。在上面這個例子中,你可能會寫類似這樣的語句 lines = itertools.chain(*files) ,使得 gen_opener() 生成器能被全部消費掉。但由于 gen_opener() 生成器每次生成一個打開過的文件,等到下一個迭代步驟時文件就關閉了,因此 china() 在這里不能這樣使用。上面的方案可以避免這種情況。
gen_concatenate() 函數中出現過 yield from 語句,它將 yield 操作代理到父生成器上去。語句 yield from it 簡單的返回生成器 it 所產生的所有值。關于這個我們在 4.14 小節會有更進一步的描述。
最后還有一點需要注意的是,管道方式并不是萬能的。有時候你想立即處理所有數據。然而,即便是這種情況,使用生成器管道也可以將這類問題從邏輯上變為工作流的處理方式。
David Beazley 在他的 Generator Tricks for Systems Programmers 教程中對于這種技術有非常深入的講解。可以參考這個教程獲取更多的信息。
總結
以上是生活随笔為你收集整理的《Python Cookbook 3rd》笔记(4.13):创建数据处理管道的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 机器学习总结(17)-XGBoost
- 下一篇: Python(14)-模块