当 Mars 遇上 RAPIDS:用 GPU 以并行的方式加速数据科学
背景
在數據科學世界,Python 是一個不可忽視的存在,且有愈演愈烈之勢。而其中主要的使用工具,包括 Numpy、Pandas 和 Scikit-learn 等。
Numpy
Numpy?是數值計算的基礎包,內部提供了多維數組(ndarray)這樣一個數據結構,用戶可以很方便地在任意維度上進行數值計算。
我們舉一個蒙特卡洛方法求解 Pi 的例子。這背后的原理非常簡單,現在我們有個半徑為1的圓和邊長為2的正方形,他們的中心都在原點?,F在我們生成大量的均勻分布的點,讓這些點落在正方形內,通過簡單的推導,我們就可以知道,Pi 的值 = 落在圓內的點的個數 / 點的總數 * 4。
這里要注意,就是隨機生成的點的個數越多,結果越精確。
用 Numpy 實現如下:
import numpy as npN = 10 ** 7 # 1千萬個點data = np.random.uniform(-1, 1, size=(N, 2)) # 生成1千萬個x軸和y軸都介于-1和1間的點 inside = (np.sqrt((data ** 2).sum(axis=1)) < 1).sum() # 計算到原點的距離小于1的點的個數 pi = 4 * inside / N print('pi: %.5f' % pi)可以看到,用 Numpy 來進行數值計算非常簡單,只要寥寥數行代碼,而如果讀者習慣了 Numpy 這種面相數組的思維方式之后,無論是代碼的可讀性還是執行效率都會有巨大提升。
pandas
pandas?是一個強大的數據分析和處理的工具,它其中包含了海量的 API 來幫助用戶在二維數據(DataFrame)上進行分析和處理。
pandas 中的一個核心數據結構就是 DataFrame,它可以簡單理解成表數據,但不同的是,它在行和列上都包含索引(Index),要注意這里不同于數據庫的索引的概念,它的索引可以這么理解:當從行看 DataFrame 時,我們可以把 DataFrame 看成行索引到行數據的這么一個字典,通過行索引,可以很方便地選中一行數據;列也同理。
我們拿?movielens 的數據集?作為簡單的例子,來看 pandas 是如何使用的。這里我們用的是 Movielens 20M Dataset.
import pandas as pdratings = pd.read_csv('ml-20m/ratings.csv') ratings.groupby('userId').agg({'rating': ['sum', 'mean', 'max', 'min']})通過一行簡單的?pandas.read_csv?就可以讀取 CSV 數據,接著按 userId 做分組聚合,求 rating 這列在每組的總和、平均、最大、最小值。
“食用“ pandas 的最佳方式,還是在 Jupyter notebook 里,以交互式的方式來分析數據,這種體驗會讓你不由感嘆:人生苦短,我用 xx(😉)
scikit-learn
scikit-learn?是一個 Python 機器學習包,提供了大量機器學習算法,用戶不需要知道算法的細節,只要通過幾個簡單的 high-level 接口就可以完成機器學習任務。當然現在很多算法都使用深度學習,但 scikit-learn 依然能作為基礎機器學習庫來串聯整個流程。
我們以 K-最鄰近算法為例,來看看用 scikit-learn 如何完成這個任務。
import pandas as pd from sklearn.neighbors import NearestNeighborsdf = pd.read_csv('data.csv') # 輸入是 CSV 文件,包含 20萬個向量,每個向量10個元素 nn = NearestNeighbors(n_neighbors=10) nn.fit(df) neighbors = nn.kneighbors(df)fit接口就是 scikit-learn 里最常用的用來學習的接口。可以看到整個過程非常簡單易懂。
Mars——Numpy、pandas 和 scikit-learn 的并行和分布式加速器
Python 數據科學棧非常強大,但它們有如下幾個問題:
為了解決這幾個問題,Mars?被我們開發出來,Mars 在?MaxCompute?團隊內部誕生,它的主要目標就是讓 Numpy、pandas 和 scikit-learn 等數據科學的庫能夠并行和分布式執行,充分利用多核和新的硬件。
Mars 的開發過程中,我們核心關注的幾點包括:
當然這些是我們的目標,也是我們一直努力的方向。
Mars tensor:Numpy 的并行和分布式加速器
上面說過,我們的目標之一是,只要會用 Numpy 等數據科學包,就會用 Mars。我們直接來看代碼,還是以蒙特卡洛為例。變成 Mars 的代碼是什么樣子呢?
import mars.tensor as mtN = 10 ** 10data = mt.random.uniform(-1, 1, size=(N, 2)) inside = (mt.sqrt((data ** 2).sum(axis=1)) < 1).sum() pi = (4 * inside / N).execute() print('pi: %.5f' % pi)可以看到,區別就只有兩處:import numpy as np?變成?import mars.tensor as mt?,后續的?np.?都變成?mt.?;pi 在打印之前調用了一下?.execute()?方法。
也就是默認情況下,Mars 會按照聲明式的方式,代碼本身移植的代價極低,而在真正需要一個數據的時候,通過?.execute()?去觸發執行。這樣能最大限度得優化性能,以及減少中間過程內存消耗。
這里,我們還將數據的規模擴大了 1000 倍,來到了 100 億個點。之前 1/1000 的數據量的時候,在我的筆記本上需要 757ms;而現在數據擴大一千倍,光?data?就需要 150G 的內存,這用 Numpy 本身根本無法完成。而使用 Mars,計算時間只需要 3min 44s,而峰值內存只需要 1G 左右。假設我們認為內存無限大,Numpy 需要的時間也就是之前的 1000 倍,大概是 12min 多,可以看到 Mars 充分利用了多核的能力,并且通過聲明式的方式,極大減少了中間內存占用。
前面說到,我們試圖讓聲明式和命令式兼得,而使用命令式的風格,只需要在代碼的開始配置一個選項即可。
import mars.tensor as mt from mars.config import optionsoptions.eager_mode = True # 打開 eager mode 后,每一次調用都會立即執行,行為和 Numpy 就完全一致N = 10 ** 7data = mt.random.uniform(-1, 1, size=(N, 2)) inside = (mt.linalg.norm(data, axis=1) < 1).sum() pi = 4 * inside / N # 不需要調用 .execute() 了 print('pi: %.5f' % pi.fetch()) # 目前需要 fetch() 來轉成 float 類型,后續我們會加入自動轉換Mars DataFrame:pandas 的并行和分布式加速器
看過怎么樣輕松把 Numpy 代碼遷移到 Mars tensor ,想必讀者也知道怎么遷移 pandas 代碼了,同樣也只有兩個區別。我們還是以 movielens 的代碼為例。
import mars.dataframe as mdratings = md.read_csv('ml-20m/ratings.csv') ratings.groupby('userId').agg({'rating': ['sum', 'mean', 'max', 'min']}).execute()Mars Learn:scikit-learn 的并行和分布式加速器
Mars Learn 也同理,這里就不做過多闡述了。但目前 Mars learn 支持的 scikit-learn 算法還不多,我們也在努力移植的過程中,這需要大量的人力和時間,歡迎感興趣的同學一起參與。
import mars.dataframe as md from mars.learn.neighbors import NearestNeighborsdf = md.read_csv('data.csv') # 輸入是 CSV 文件,包含 20萬個向量,每個向量10個元素 nn = NearestNeighbors(n_neighbors=10) nn.fit(df) # 這里 fit 的時候也會整體觸發執行,因此機器學習的高層接口都是立即執行的 neighbors = nn.kneighbors(df).fetch() # kneighbors 也已經觸發執行,只需要 fetch 數據這里要注意的是,對于機器學習的?fit、predict?等高層接口,Mars Learn 也會立即觸發執行,以保證語義的正確性。
RAPIDS:GPU 上的數據科學
相信細心的觀眾已經發現,GPU 好像沒有被提到。不要著急,這就要說到?RAPIDS。
在之前,雖然 CUDA 已經將 GPU 編程的門檻降到相當低的一個程度了,但對于數據科學家們來說,在 GPU 上處理 Numpy、pandas 等能處理的數據無異于天方夜譚。幸運的是,NVIDIA 開源了 RAPIDS 數據科學平臺,它和 Mars 的部分思想高度一致,即使用簡單的 import 替換,就可以將 Numpy、pandas 和 scikit-learn 的代碼移植到 GPU 上。
其中,RAPIDS cuDF 用來加速 pandas,而 RAPIDS cuML 用來加速 scikit-learn。
對于 Numpy 來說,CuPy?已經很好地支持用 GPU 來加速了,這樣 RAPIDS 也得以把重心放在數據科學的其他部分。
CuPy:用 GPU 加速 Numpy
還是蒙特卡洛求解 Pi。
import cupy as cpN = 10 ** 7data = cp.random.uniform(-1, 1, size=(N, 2)) inside = (cp.sqrt((data ** 2).sum(axis=1)) < 1).sum() pi = 4 * inside / N print('pi: %.5f' % pi)在我的測試中,它將 CPU 的 757ms,降到只有 36ms,提升超過 20 倍,可以說效果非常顯著。這正是得益于 GPU 非常適合計算密集型的任務。
RAPIDS cuDF:用 GPU 加速 pandas
將?import pandas as pd?替換成?import cudf,GPU 內部如何并行,CUDA 編程這些概念,用戶都不再需要關心。
import cudfratings = cudf.read_csv('ml-20m/ratings.csv') ratings.groupby('userId').agg({'rating': ['sum', 'mean', 'max', 'min']})運行時間從 CPU 上的 18s 提升到 GPU 上的 1.66s,提升超過 10 倍。
RAPIDS cuML:用 GPU 加速 scikit-learn
同樣是 k-最鄰近問題。
import cudf from cuml.neighbors import NearestNeighborsdf = cudf.read_csv('data.csv') nn = NearestNeighbors(n_neighbors=10) nn.fit(df) neighbors = nn.kneighbors(df)運行時間從 CPU 上 1min52s,提升到 GPU 上 17.8s。
Mars 和 RAPIDS 結合能帶來什么?
RAPIDS 將 Python 數據科學帶到了 GPU,極大地提升了數據科學的運行效率。它們和 Numpy 等一樣,是命令式的。通過和 Mars 結合,中間過程將會使用更少的內存,這使得數據處理量更大;Mars 也可以將計算分散到多機多卡,以提升數據規模和計算效率。
在 Mars 里使用 GPU 也很簡單,只需要在對應函數上指定?gpu=True。例如創建 tensor、讀取 CSV 文件等都適用。
import mars.tensor as mt import mars.dataframe as mda = mt.random.uniform(-1, 1, size=(1000, 1000), gpu=True) df = md.read_csv('ml-20m/ratings.csv', gpu=True)下圖是用 Mars 分別在 Scale up 和 Scale out 兩個維度上加速蒙特卡洛計算 Pi 這個任務。一般來說,我們要加速一個數據科學任務,可以有這兩種方式,Scale up 是指可以使用更好的硬件,比如用更好的 CPU、更大的內存、使用 GPU 替代 CPU等;Scale out 就是指用更多的機器,用分布式的方式提升效率。
可以看到在一臺 24 核的機器上,Mars 計算需要 25.8s,而通過分布式的方式,使用 4 臺 24 核的機器的機器幾乎以線性的時間提升。而通過使用一個 NVIDIA TESLA V100 顯卡,我們就能將單機的運行時間提升到 3.98s,這已經超越了4臺 CPU 機器的性能。通過再將單卡拓展到多卡,時間進一步降低,但這里也可以看到,時間上很難再線性擴展了,這是因為 GPU 的運行速度提升巨大,這個時候網絡、數據拷貝等的開銷就變得明顯。
性能測試
我們使用了?https://github.com/h2oai/db-benchmark?的數據集,測試了三個數據規模的 groupby 和 一個數據規模的 join。而我們主要對比了 pandas 和?DASK。DASK 和 Mars 的初衷很類似,也是試圖并行和分布式化 Python 數據科學,但它們的設計、實現、分布式都存在較多差異,這個后續我們再撰文進行詳細對比。
測試機器配置是 500G 內存、96 核、NVIDIA V100 顯卡。Mars 和 DASK 在 GPU 上都使用 RAPIDS 執行計算。
Groupby
數據有三個規模,分別是 500M、5G 和 20G。
查詢也有三組。
查詢一
df = read_csv('data.csv') df.groupby('id1').agg({'v1': 'sum'})查詢二
df = read_csv('data.csv') df.groupby(['id1', 'id2']).agg({'v1': 'sum'})查詢三
df = read_csv('data.csv') df.gropuby(['id6']).agg({'v1': 'sum', 'v2': 'sum', 'v3': 'sum'})數據大小 500M,性能結果
數據大小 5G,性能結果
數據大小 20G,性能結果
數據大小到 20G 時,pandas 在查詢2會內存溢出,得不出結果。
可以看到,隨著數據增加,Mars 的性能優勢會愈發明顯。
得益于 GPU 的計算能力,GPU 運算性能相比于 CPU 都有數倍的提升。如果單純使用 RAPIDS cuDF,由于顯存大小的限制,數據來到 5G 都難以完成,而由于 Mars 的聲明式的特點,中間過程對顯存的使用大幅得到優化,所以整組測試來到 20G 都能輕松完成。這正是 Mars + RAPIDS 所能發揮的威力。
Join
測試查詢:
x = read_csv('x.csv') y = read_csv('y.csv') x.merge(y, on='id1')測試數據 x 為500M,y 包含10行數據。
總結
RAPIDS 將 Python 數據科學帶到了 GPU,極大提升了數據分析和處理的效率。Mars 的注意力更多放在并行和分布式。相信這兩者的結合,在未來會有更多的想象空間。
Mars 誕生于?MaxCompute?團隊,MaxCompute 原名 ODPS,是一種快速、完全托管的EB級數據倉庫解決方案。Mars 即將通過 MaxCompute 提供服務,購買了 MaxCompute 服務的用戶屆時可以開箱即用體驗 Mars 服務。敬請期待。
原文鏈接
本文為云棲社區原創內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的当 Mars 遇上 RAPIDS:用 GPU 以并行的方式加速数据科学的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【人工智能】AI如何把招人效率提高四成
- 下一篇: 阿里云推出全新内存增强型实例re6,性能