参数服务器训练基本理论
參數服務器訓練基本理論
參數服務器訓練是分布式訓練領域普遍采用的編程架構,主要解決以下兩類問題:
? 模型參數過大:單機內存空間不足,需要采用分布式存儲。
? 訓練數據過多:單機訓練太慢,需要加大訓練節點,來提高并發訓練速度。
如圖所示,參數服務器主要包含Server和Worker兩個部分,其中Server負責參數的存儲和更新,而Worker負責訓練。簡單來說,參數服務器訓練的基本思路:當訓練數據過多,一個Worker訓練太慢時,可以引入多個Worker同時訓練,這時Worker之間需要同步模型參數。直觀想法是,引入一個Server,Server充當Worker間參數交換的媒介。當模型參數過大,以至于單機存儲空間不足時,或Worker過多導致一個Server是瓶頸時,就需要引入多個Server。
參數服務器訓練的具體流程如下:
? 將訓練數據均勻的分配給不同的Worker。
? 將模型參數分片,存儲在不同的Server上。
? Worker端:讀取一個minibatch訓練數據,從Server端拉取最新的參數,計算梯度,并根據分片上傳給不同的Server。
? Server端:接收Worker端上傳的梯度,根據優化算法更新參數。根據Server端每次參數更新是否需要等待所有Worker端的梯度,分為同步訓練和異步訓練兩種機制。
飛槳paddle的參數服務器框架,也是基于這種經典的參數服務器模式進行設計和開發的,在這基礎上進行了SGD(Stochastic Gradient Descent)算法的創新(GEO-SGD)。目前飛槳paddle支持3種模式,分別是同步訓練模式、異步訓練模式、GEO異步訓練模式,三者之間的差異如下圖所示。
當前經過大量的實驗驗證,最佳的方案是每臺機器上啟動Server和Worker兩個進程,而一個Worker進程中,可以包含多個用于訓練的線程。
同步訓練
Worker在訓練一個batch的數據后,會合并所有線程的梯度發給Server, Server在收到所有節點的梯度后,會統一進行梯度合并及參數更新。同步訓練的優勢在于Loss可以比較穩定的下降,缺點是整個訓練速度較慢,這是典型的木桶原理,速度的快慢取決于最慢的那個線程的訓練計算時間,在訓練較為復雜的模型時,即模型訓練過程中神經網絡訓練耗時,遠大于節點間通信耗時的場景下,推薦使用同步訓練模式。
異步訓練
在訓練一個batch的數據后,Worker的每個線程會發送梯度給Server。而Server不會等待接收所有節點的梯度,而是直接基于已收到的梯度進行參數更新。異步訓練去除了訓練過程中的等待機制,訓練速度得到了極大的提升,但是缺點也很明顯,那就是Loss下降不穩定,容易發生抖動。建議在個性化推薦(召回、排序)、語義匹配等數據量大的場景使用。 尤其是推薦領域的點擊率預估場景,該場景可能會出現千億甚至萬億規模的稀疏特征,而稀疏參數也可以達到萬億數量級,且需要小時級或分鐘級流式增量訓練。如果使用異步訓練模式,可以很好的滿足該場景的online-learning需求。
GEO異步訓練
GEO(Geometric Stochastic Gradient Descent)異步訓練是飛槳paddle自研的異步訓練模式,其最大的特點是將參數的更新從Server轉移到Worker上。每個Worker在本地訓練過程中,使用SGD優化算法更新本地模型參數,在訓練若干個batch的數據后,Worker將發送參數更新信息給Server。Server在接收后會通過加和方式更新保存的參數信息。所以顯而易見,在GEO異步訓練模式下,Worker不用再等待Server發來新的參數即可執行訓練,在訓練效果和訓練速度上有了極大的提升。但是此模式比較適合可以在單機內能完整保存的模型,在搜索、NLP等類型的業務上應用廣泛,推薦在詞向量、語義匹配等場景中使用。
運行策略的詳細描述可以參考文檔PaddlePaddle 參數服務器分布式訓練策略定義
基于分類模型的訓練示例
本文檔以二分類模型舉例,介紹單機訓練和參數服務器訓練(異步模式)兩種模式的詳細代碼,方便用戶快速了解兩種模式的具體差異。
本文涉及的所有源碼,可通過此鏈接獲取:https://github.com/seiriosPlus/Fleet/tree/distribtued_training/examples/distributed_ctr 建議親手操作,畢竟只有親手敲過的代碼才真正是自己的。
單機訓練示例
環境準備
訓練前,請確保:
? 已正確安裝飛槳paddle最新版本。安裝操作請參見飛槳paddle。
? 運行環境基于Linux,示例代碼支持Unbuntu及CentOS。
? 運行環境中Python版本高于2.7。
數據處理
數據集采用Display Advertising Challenge所用的Criteo數據集。該數據集包括兩部分:訓練集和測試集。訓練集包含一段時間內Criteo的部分流量,測試集則對應訓練數據后一天的廣告點擊流量。
數據預處理共包括兩步:
? 將原始訓練集按9:1劃分為訓練集和驗證集。
? 數值特征(連續特征)需進行歸一化處理,但需要注意的是,對每一個特征,歸一化時用到的最大值并不是用全局最大值,而是取排序后95%位置處的特征值作為最大值,同時保留極值。
模型設計
模型屬于二分類模型,網絡結構如下圖所示。輸入是N類稀疏特征,比如詞的id。通過查取embedding表(字典大小xM維的向量表),變換成N個M維向量。將所有NxM維向量連接在一起融合為一個向量。網絡由多個輸入數據層(paddle.static.data)、多個共享參數的嵌入層(paddle.nn.functional.embedding),若干個全連接層(paddle.static.fc),以及相應的分類任務的Loss計算和auc計算。經過多層全連接層+激活函數(relu)后,進行0/1分類。
數據輸入聲明
Criteo數據集分連續數據與離散(稀疏)數據,整體而言,數據輸入層包括三個,分別是:dense_input用于輸入連續數據,維度由超參數dense_feature_dim指定,數據類型是歸一化后的浮點型數據。sparse_input_ids用于記錄離散數據,在Criteo數據集中,共有26個slot,所以創建了名為C1~C26的26個稀疏參數輸入,并設置lod_level=1,代表其為變長數據,數據類型為整數;最后是每條樣本的label,代表了是否被點擊,數據類型是整數,0代表負樣例,1代表正樣例。
在飛槳paddle中數據輸入的聲明使用paddle.static.data(),會創建指定類型的占位符,數據IO會依據此定義進行數據的輸入
def input_data(self, params):
dense_input = paddle.static.data(name=“dense_input”,
shape=[params.dense_feature_dim],
dtype=“float32”)
sparse_input_ids = [paddle.static.data(name="C" + str(i),shape=[1],lod_level=1,dtype="int64") for i in range(1, 27)
]label = paddle.static.data(name="label", shape=[1], dtype="int64")inputs = [dense_input] + sparse_input_ids + [label]
return inputs
def net(self, inputs, params):
Embedding層
Embedding層的組網方式:Embedding層的輸入是sparse_input,shape由超參的sparse_feature_dim和embedding_size定義
指定is_sprase=True后,計算圖會將該參數視為稀疏參數,反向更新以及分布式通信時,都以稀疏的方式進行,會極大的提升運行效率,同時保證效果一致。
def embedding_layer(input):return paddle.nn.functional.embedding(input=input,is_sparse=params.is_sparse,size=[params.sparse_feature_dim, params.embedding_size]),)sparse_embed_seq = list(map(embedding_layer, inputs[1:-1]))# 各個稀疏的輸入通過Embedding層后,將其合并起來,置于一個list內,以方便進行concat的操作
concated = paddle.concat(sparse_embed_seq + inputs[0:1], axis=1)
將離散數據通過embedding查表得到的值,與連續數據的輸入進行concat操作,合為一個整體輸入,作為全鏈接層的原始輸入。共設計了3層FC,每層FC的輸出維度都為400,每層FC都后接一個relu激活函數,每層FC的初始化方式為符合正態分布的隨機初始化,標準差與上一層的輸出維度的平方根成反比。
fc1 = paddle.static.fc(input=concated,size=400,act="relu"
)
fc2 = paddle.static.fc(input=fc1,size=400,act="relu"
)
fc3 = paddle.static.fc(input=fc2,size=400,act="relu"
)
predict =paddle.static.fc(input=fc3,size=2,act="softmax"
)
Loss及Auc計算
預測的結果通過一個輸出shape為2的FC層給出,該FC層的激活函數softmax,會給出每條樣本分屬于正負樣本的概率。
每條樣本的損失由交叉熵給出,交叉熵的輸入維度為[batch_size,2],數據類型為float,label的輸入維度為[batch_size,1],數據類型為int。該batch的損失avg_cost是各條樣本的損失之和
同時還會計算預測的auc,auc的結果由paddle.static..auc()給出,該層的返回值有三個,分別是全局auc: auc_var,當前batch的auc: batch_auc_var,以及auc_states: auc_states,auc_states包含了batch_stat_pos, batch_stat_neg, stat_pos, stat_neg信息。
cost = paddle.nn.functional.cross_entropy(input=predict, label=inputs[-1])
avg_cost = paddle.sum(cost)
auc_var, batch_auc_var, _ = paddle.static.auc(input=predict,label=inputs[-1])return avg_cost, auc_var, batch_auc_var
模型訓練
def train(params):
# 引入模型的組網
ctr_model = CTR()
inputs = ctr_model.input_data(params)
avg_cost, auc_var, batch_auc_var = ctr_model.net(inputs,params)
# 選擇反向更新優化策略
optimizer = paddle.optimizer.Adam(params.learning_rate)
optimizer.minimize(avg_cost)# 創建訓練的執行器
exe = paddle.static.Executor(paddle.CPUPlace())
exe.run(paddle.static.default_startup_program())# 引入數據讀取
dataset = get_dataset(inputs,params)# 開始訓練
for epoch in range(params.epochs):# 啟動pyreader的異步訓練線程# PyRreader是飛槳paddle提供的簡潔易用的數據讀取API接口,支持同步數據讀取及異步數據讀取,用戶自行定義數據處理的邏輯后,以迭代器的方式傳遞給PyReader,完成訓練的數據讀取部分reader.start()batch_id = 0try:while True:# 獲取網絡中,所需的輸出,如loss、auc等loss_val, auc_val, batch_auc_val = exe.run(program=compiled_prog,fetch_list=[avg_cost.name, auc_var.name, batch_auc_var.name])loss_val = np.mean(loss_val)auc_val = np.mean(auc_val)batch_auc_val = np.mean(batch_auc_val)# 每隔10個Batch打印一次輸出if batch_id % 10 == 0 and batch_id != 0:logger.info("TRAIN --> pass: {} batch: {} loss: {} auc: {}, batch_auc: {}".format(epoch, batch_id,loss_val / params.batch_size, auc_val,batch_auc_val))batch_id += 1except paddle.core.EOFException::# 一次訓練完成后,要調用reset來將Reader恢復為初始狀態,為下一輪訓練準備reader.reset()logger.info("Train Success!")
參數服務器訓練示例
對于參數服務器訓練來說,訓練前也需要完成環境準備、數據處理、模型設計工作。其中,數據處理和模型設計與單機訓練完全相同,可以直接拿來使用。
環境準備
執行模型訓練前,需要確保運行環境滿足以下要求:
? 飛槳paddle參數服務器模式的訓練,目前只支持在Liunx環境下運行,推薦使用ubuntu或CentOS
? 飛槳paddle參數服務器模式的Python環境支持python 2.7及python 3.5+, 安裝和運行前請檢查版本是否符合要求
? 使用飛槳paddle的參數服務器分布式訓練,確保各自之間可以通過ip:port的方式訪問rpc服務,使用http/https代理會導致通信失敗
? 參數服務器使用RPC通信完成整個訓練流程,訓練節點存在于同一個機房、IDC會獲得更好的速度
? 飛槳paddle的參數服務器訓練支持多種訓練環境的啟動和運行,包括kubernetes/MPI/其他自定義環境等。
數據處理
參數服務器訓練的數據處理與單機訓練完全相同,這里不再重復贅述。
模型設計
參數服務器訓練的模型設計與單機訓練完全相同,這里不再重復贅述。
模型訓練
飛槳paddle的參數服務器中存在Worker和PServer兩種角色,下面會結合2X2的實際情況講述啟動流程。 飛槳paddle的參數服務器的訓練分為3個階段, 一是將PServer全部啟動, PServer會根據用戶定義的監聽端口啟動監聽服務,等待Worker連接;二是啟動全部Worker節點,Worker節點會根據配置的Pserver的端口號跟每一個PServer進行連接檢查,確保能夠順利連接后,進行參數的初始化和同步;三是啟動訓練流程,通過跟多個PServer的通信完成整個訓練流程。 假設有兩臺機器,想要在每臺機器上分別啟動一個server進程以及一個worker進程,完成2x2(2個參數服務器,2個訓練節點)的參數服務器模式分布式訓練,按照如下步驟操作。
啟動server
機器A,IP地址是10.89.176.11,通信端口是36000,配置如下環境變量后,運行訓練的入口程序:
export PADDLE_PSERVERS_IP_PORT_LIST=“10.89.176.11:36000,10.89.176.12:36000”
export TRAINING_ROLE=PSERVER
export POD_IP=10.89.176.11 # node A:10.89.176.11
export PADDLE_PORT=36000
export PADDLE_TRAINERS_NUM=2
python -u train.py --is_cloud=1
應能在日志中看到如下輸出:
server.cpp:1040] Check out http://10.89.176.11:36000 in web browser.
查看系統進程
8624 | ttys000 | 0:02.31 | python -u train.py --is_cloud=1
查看系統進程及端口占用:
python3.7 | 8624 | paddle | 8u | IPv6 | 0xe149b87d093872e5 | 0t0 | TCP | localhost:36000 (LISTEN)
也可以看到的server進程8624的確在36000端口開始了監聽,等待worker的通信。
機器B,IP地址是10.89.176.12,通信端口是36000,配置如下環境變量后,運行訓練的入口程序:
export PADDLE_PSERVERS_IP_PORT_LIST=“10.89.176.11:36000,10.89.176.12:36000”
export TRAINING_ROLE=PSERVER
export POD_IP=10.89.176.12 # node B: 10.89.176.12
export PADDLE_PORT=36000
export PADDLE_TRAINERS_NUM=2
python -u train.py --is_cloud=1
也可以看到相似的日志輸出與進程狀況。(進行驗證時,請務必確保IP與端口的正確性)
啟動worker
接下來分別在機器A與B上開啟訓練進程。配置如下環境變量并開啟訓練進程:
機器A:
export PADDLE_PSERVERS_IP_PORT_LIST=“10.89.176.11:36000,10.89.176.12:36000”
export TRAINING_ROLE=TRAINER
export PADDLE_TRAINERS_NUM=2
export PADDLE_TRAINER_ID=0 # node A:trainer_id = 0
python -u train.py --is_cloud=1
機器B:
export PADDLE_PSERVERS_IP_PORT_LIST=“10.89.176.11:36000,10.89.176.12:36000”
export TRAINING_ROLE=TRAINER
export PADDLE_TRAINERS_NUM=2
export PADDLE_TRAINER_ID=1 # node B: trainer_id = 1
python -u train.py --is_cloud=1
運行該命令時,若Pserver還未就緒,可在日志輸出中看到如下信息:
server not ready, wait 3 sec to retry…
not ready endpoints:[‘10.89.176.11:36000’, ‘10.89.176.12:36000’]
Worker進程將持續等待,直到Pserver開始監聽,或等待超時。
當Pserver都準備就緒后,可以在日志輸出看到如下信息:
I0317 11:38:48.099179 16719 communicator.cc:271] Communicator start
I0317 11:38:49.838711 16719 rpc_client.h:107] init rpc client with trainer_id 0
至此,分布式訓練啟動完畢,開始訓練。
參數服務器訓練數據切分
飛槳paddle的參數服務器訓練目前主要是數據并行模式。通過增加訓練節點來提高訓練數據的并行度的,需要對數據進行劃分,即將全部的訓練數據均勻的分成Worker個數份,每一個Worker需要分配全部訓練數據中的一份,每個Worker節點訓練自己的一份數據,參數由PServer端完成聚合和更新。要確保每個節點都能拿到數據,希望每個節點的數據同時滿足:各個節點數據無重復和各個節點數據數量均勻。
Fleet提供了split_files()的接口,輸入值是一個穩定的目錄List,隨后該函數會根據節點自身的編號拿到相應的數據文件列表,訓練數據在同一個目錄下,使用該接口,給各個進程(扮演不同的訓練節點)分配不同的數據文件。
file_list = [
str(args.train_files_path) + “/%s” % x
for x in os.listdir(args.train_files_path)
]
請確保每一個訓練節點都持有不同的訓練文件
當用本地多進程模擬分布式時,每個進程需要拿到不同的文件
使用 fleet.split_files 可以便捷的以文件為單位分配訓練樣本
files= fleet.split_files(file_list)
基于得到的files,每個節點開始獨立進行數據讀取和訓練。如果數據在HDFS上,可以根據files列表將數據下載會本地進行讀取。如果數據在本地,則可直接根據files列表進行讀取。
詳細訓練代碼示例
異步模式分布式訓練代碼的詳細說明如下所示。
# 根據環境變量確定當前機器/進程在分布式訓練中的角色分配Worker/PSERVER
# 然后使用 fleet api的 init()方法初始化這個節點
role = role_maker.PaddleCloudRoleMaker()
fleet.init(role)
# 設置分布式運行模式為異步(async),同時將參數進行切分,以分配到不同的節點
strategy = StrategyFactory.create_async_strategy()ctr_model = CTR()
inputs = ctr_model.input_data(params)
avg_cost, auc_var, batch_auc_var = ctr_model.net(inputs, params)
optimizer = paddle.optimizer.Adam(params.learning_rate)# 配置分布式的optimizer,傳入指定的strategy,構建program
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(avg_cost)# 根據節點角色,分別運行不同的邏輯
if fleet.is_server():# 初始化及運行參數服務器節點fleet.init_server()fleet.run_server()elif fleet.is_worker():# 初始化工作節點fleet.init_worker()exe = paddle.static.Executor(paddle.CPUPlace())# 初始化含有分布式流程的fleet.startup_programexe.run(fleet.startup_program)for epoch in range(params.epochs):# 啟動dataloader的異步訓練線程# DataLoader是飛槳paddle提供的簡潔易用的數據讀取API接口,支持同步數據讀取及異步數據讀取,用戶自行定義數據處理的邏輯后,以迭代器的方式傳遞給DataLoader,完成訓練的數據讀取部分reader.start()batch_id = 0try:while True:# 獲取網絡中,所需的輸出,如loss、auc等loss_val, auc_val, batch_auc_val = exe.run(program=compiled_prog,fetch_list=[avg_cost.name, auc_var.name, batch_auc_var.name])loss_val = np.mean(loss_val)auc_val = np.mean(auc_val)batch_auc_val = np.mean(batch_auc_val)# 每隔10個Batch打印一次輸出if batch_id % 10 == 0 and batch_id != 0:logger.info("TRAIN --> pass: {} batch: {} loss: {} auc: {}, batch_auc: {}".format(epoch, batch_id,loss_val / params.batch_size, auc_val,batch_auc_val))batch_id += 1except paddle.core.EOFException:# 一次訓練完成后,要調用reset來將Reader恢復為初始狀態,為下一輪訓練準備reader.reset()# 默認使用0號節點保存模型if fleet.is_first_worker():model_path = (str(params.model_path) + "/" + "epoch_" +str(epoch))fleet.save_persistables(executor=exe, dirname=model_path)fleet.stop_worker()
使用pyreader進行多輪訓練時,有一些固有的使用方法,如示例代碼所示,使用try & except捕獲異常的方式得到reader讀取完數據的信號,使用reset重置reader,以進行下一輪訓練的數據讀取。 執行exe.run()時,傳入的是CompiledProgram,同時可以通過加入fetch_list來直接獲取想要監控的變量。
運行:本地模擬分布式
運行方式有兩種方式。
方法一 運行local_cluster.sh腳本
運行local_cluster.sh腳本,設置啟動命令為sync:
sh local_cluster.sh sync
使用該腳本開啟分布式模擬訓練,默認啟用2x2的訓練模式。Worker與Pserver的運行日志,存放于./log/文件夾,保存的模型位于./model/。
方法二 運行飛槳paddle內置的一個啟動器launch_ps
在單機模擬多機訓練的啟動命令,飛槳paddle內置的一個啟動器launch_ps,用戶可以指定Worker和server的數量進行參數服務器任務的啟動。
python -m paddle.distributed.launch_ps --worker_num 2 --server_num 2 train.py
使用該腳本開啟分布式模擬訓練,也將啟用2個Worker x 2個server的訓練模式。Worker與Pserver的運行日志,存放于./logs/文件夾,保存的模型位于./model/。
開啟本地模擬分布式訓練后的日志輸出
使用快速驗證數據集,本地模擬同步模式的分布式訓練的理想輸出為:
pserver.0.log
INFO:file list: [‘train_data/part-1’]
get_pserver_program() is deprecated, call get_pserver_programs() to get pserver main and startup in a single call.
I1128 11:34:50.242866 459 grpc_server.cc:477] Server listening on 127.0.0.1:36011 successful, selected port: 36011
trainer.0.log
INFO:file list: [‘train_data/part-1’]
server not ready, wait 3 sec to retry…
not ready endpoints:[‘127.0.0.1:36012’]
I1128 11:34:53.424834 32649 rpc_client.h:107] init rpc client with trainer_id 0
I1128 11:34:53.526729 32649 parallel_executor.cc:423] The Program will be executed on CPU using ParallelExecutor, 2 cards are used, so 2 programs are executed in parallel.
I1128 11:34:53.537334 32649 parallel_executor.cc:287] Inplace strategy is enabled, when build_strategy.enable_inplace = True
I1128 11:34:53.541473 32649 parallel_executor.cc:370] Garbage collection strategy is enabled, when FLAGS_eager_delete_tensor_gb = 0
INFO:TRAIN --> pass: 0 batch: 10 loss: 0.588123535156 auc: 0.497622251208, batch_auc: 0.496669348982
INFO:TRAIN --> pass: 0 batch: 20 loss: 0.601480102539 auc: 0.501770208439, batch_auc: 0.520060819177
INFO:TRAIN --> pass: 0 batch: 30 loss: 0.581234985352 auc: 0.513533941098, batch_auc: 0.552742309157
INFO:TRAIN --> pass: 0 batch: 40 loss: 0.551335083008 auc: 0.523242733864, batch_auc: 0.586762885637
INFO:TRAIN --> pass: 0 batch: 50 loss: 0.532891052246 auc: 0.538684471661, batch_auc: 0.617389479234
INFO:TRAIN --> pass: 0 batch: 60 loss: 0.564157531738 auc: 0.552346798675, batch_auc: 0.628245358534
INFO:TRAIN --> pass: 0 batch: 70 loss: 0.547578674316 auc: 0.565243961316, batch_auc: 0.651260427476
INFO:TRAIN --> pass: 0 batch: 80 loss: 0.554214599609 auc: 0.57554000345, batch_auc: 0.648544028986
INFO:TRAIN --> pass: 0 batch: 90 loss: 0.549561889648 auc: 0.585579565556, batch_auc: 0.660180398731
INFO:epoch 0 finished, use time=40
INFO:Distribute Train Success!
模型預測
完成前面的單機訓練和參數服務器訓練完成后,需要在測試集上測試離線預測的結果,驗證模型的泛化能力。本節內容對應示例代碼中的infer.py。
單機訓練和參數服務器訓練均采用此操作進行模型預測。
構建預測網絡及加載模型參數
預測網絡與訓練網絡一致,無需更改,使用相同的方式構建inputs、loss、auc。加載參數使用paddle.io.load_vars()接口,從保存好的模型文件夾中加載同名參數。
paddle中對于program的獨立作用域限定, 在此作用域下配置的所有組網相關的操作均作用于test_program, startup_program上
with paddle.static.program_guard(test_program, startup_program):
paddle中對于參數命名的獨立作用域限定, 在此作用域下配置的組網,會重新開始編號,不會和其他網絡沖突
with paddle.static.unique_name.guard():inputs = ctr_model.input_data(params)loss, auc_var, batch_auc_var = ctr_model.net(inputs, params)exe = paddle.static.Executor(place)feeder =paddle.io.DataLoader(feed_list=inputs, place=place)paddle.static.load_vers(executor=exe,dirname=model_path,main_program=paddle.static.default_main_program())
在進行上述流程時,有一些需要關注的細節:
? 傳入的program不是default_main_program(),而是新建的空的program。因為在測試時,要從零開始,保證預測program的干凈,沒有其它的影響因素。
? startup_program = paddle.static.Program()
? test_program = paddle.static.Program()
? 在創建預測網絡時,加入了with paddle.static.unique_name.guard(): 作用是讓所有新建的參數的自動編號再次從零開始。飛槳paddle的參數Variable以變量名作為區分手段,保證變量名相同,就可以從保存的模型中找到對應參數。
飛槳paddle創建的臨時變量,編號會自動順延,如果沒有指定變量名,可以觀察到這一現象,比如:fc_1.w_0->fc_2.w_0,想要共享相同的參數,必需要保證編號可以對應。
獲取測試數據
測試數據的讀取使用同步模式中使用過的pyreader方法。
運行測試
為了快速驗證,僅取用測試數據集的一個part文件,進行測試。在代碼目錄下,鍵入以下命令,進行預測:
python -u infer.py &> test.log &
測試結果的日志位于test.log,僅訓練一個epoch后,在part-220上的的理想測試結果為:
2019-11-26 08:56:19,985 - INFO - Test model model/epoch_0
open file success
2019-11-26 08:56:20,323 - INFO - TEST --> batch: 0 loss: [0.5577456] auc: [0.61541704]
2019-11-26 08:56:37,839 - INFO - TEST --> batch: 100 loss: [0.5395161] auc: [0.6346397]
2019-11-26 08:56:55,189 - INFO - {‘loss’: 0.5571399, ‘auc’: array([0.6349838])}
2019-11-26 08:56:55,189 - INFO - Inference complete
因為快速驗證的訓練數據與測試數據極少,同時只訓練了一輪,所以遠遠沒有達到收斂,且初始化帶有隨機性,在您的環境下出現測試結果與示例輸出不一致是正常情況。
參數服務器訓練的性能調優
優化的目的是在給定數據集上,以最快速度訓練得到最優的效果。參數服務器訓練的性能調優分為速度提升和效果提升。
速度提升
參數服務器訓練涉及的訓練如下圖所示。
可以看出:訓練時間 = 數據讀取時間 + 節點訓練(網絡前向后向執行時間) + 通信時間 + 節點之間等待時間 + 更新參數時間。
因此可以從數據讀取、單節點訓練、通信模塊、Server端參數更新這4個方面進行參數服務器的優化。
Dataset數據讀取
目前支持PyReader和Dataset兩種,后者速度更快。PyReader采用的模式是多個讀數據線程寫到一個隊列中,多個訓練線程從這個一個隊列中讀取數據,形成了多生產者多消費者的模式,導致隊列成為瓶頸。Dataset采用多個讀數據線程寫到多個隊列中,多個訓練線程之間完全異步的模式, 消除隊列瓶頸,在數據讀取速度上更勝一籌。
在模型比較簡單、數量比較大時,可以使用參數服務器的全異步訓練模式和高性能的IO數據讀取模式來高速的訓練。Dataset是為多線程及全異步模式量身打造的數據讀取方式,每個數據讀取線程會與一個訓練線程耦合,形成了多生產者-多消費者的模式,極大的加速了模型訓練。詳細的Dataset的設計文檔可以參考:Dataset
如何在的訓練中引入Dataset讀取方式呢?
無需變更數據格式,只需在的訓練代碼中加入以下內容,便可達到媲美二進制讀取的高效率,以下是一個比較完整的流程。
一、定義Dataset
以下是dataset_generator.py的全部代碼,具體流程如下:
- 首先需要引入Dataset庫,位于paddle.distributed.QueueDataset。
- 聲明一些在數據讀取中會用到的變量,如示例代碼中的cont_min_、categorical_range_等。
- 創建一個子類,繼承Dataset的基類,基類有多種選擇,如果是多種數據類型混合,并且需要轉化為數值進行預處理的,建議使用MultiSlotDataGenerator;若已經完成了預處理并保存為數據文件,可以直接以string的方式進行讀取,使用MultiSlotStringDataGenerator,能夠進一步加速。在示例代碼,繼承并實現了名為CriteoDataset的dataset子類,使用MultiSlotDataGenerator方法。
- 繼承并實現基類中的generate_sample函數,逐行讀取數據。該函數應返回一個可以迭代的reader方法(帶有yield的函數不再是一個普通的函數,而是一個生成器generator,成為了可以迭代的對象,等價于一個數組、鏈表、文件、字符串etc.)
- 在這個可以迭代的函數中,如示例代碼中的def reader(),定義數據讀取的邏輯。例如對以行為單位的數據進行截取,轉換及預處理。
- 最后,需要將數據整理為特定的格式,才能夠被Dataset正確讀取,并灌入訓練網絡中。簡單來說,數據的輸出順序與在網絡中創建的inputs必須是嚴格一一對應的,并轉換為類似字典的形式。在示例代碼中,使用zip的方法將參數名與數值構成的元組組成了一個list,并將其yield輸出。如果展開來看,輸出的數據形如[(‘dense_feature’,[value]),(‘C1’,[value]),(‘C2’,[value]),…,(‘C26’,[value]),(‘label’,[value])]。
import paddle.distributed.fleet.data_generator as dg
cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
cont_max_ = [20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
cont_diff_ = [20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
hash_dim_ = 1000001
continuous_range_ = range(1, 14)
categorical_range_ = range(14, 40)
class CriteoDataset(dg.MultiSlotDataGenerator):
def generate_sample(self, line):def reader():features = line.rstrip('\n').split('\t')dense_feature = []sparse_feature = []for idx in continuous_range_:if features[idx] == "":dense_feature.append(0.0)else:dense_feature.append((float(features[idx]) - cont_min_[idx - 1]) /cont_diff_[idx - 1])for idx in categorical_range_:sparse_feature.append([hash(str(idx) + features[idx]) % hash_dim_])label = [int(features[0])]process_line = dense_feature, sparse_feature, labelfeature_name = ["dense_feature"]for idx in categorical_range_:feature_name.append("C" + str(idx - 13))feature_name.append("label")yield zip(feature_name, [dense_feature] + sparse_feature + [label])return reader
d = CriteoDataset()
d.run_from_stdin()
二、引入Dataset
- 通過工廠類paddle.distributed.QueueDataset創建一個Dataset對象。
- 將定義好的數據輸入格式傳給Dataset,通過dataset._set_use_var(inputs)實現。
- 指定的數據讀取方式,由dataset_generator.py實現數據讀取的規則,后面會介紹讀取規則的實現。
- 指定數據讀取的batch_size。
- 指定數據讀取的線程數,該線程數和訓練線程應保持一致,兩者為耦合的關系。
- 指定Dataset讀取的訓練文件的列表。
def get_dataset(inputs, params)
dataset = paddle.distributed.fleet.dataset.DatasetBase()
dataset._set_use_var(inputs)
dataset._set_pipe_command(“python dataset_generator.py”)
dataset._set_batch_size(params.batch_size)
dataset._set_thread(int(params.cpu_num))
file_list = [
str(params.train_files_path) + “/%s” % x
for x in os.listdir(params.train_files_path)
]
dataset.set_filelist(file_list)
logger.info(“file list: {}”.format(file_list))
return dataset
三、 Dataset Reader 快速調試
可以脫離組網架構,單獨驗證Dataset的輸出是否符合預期。使用命令 cat 數據文件 | python dataset讀取python文件進行dataset代碼的調試:
cat train_data/part-0 | python dataset_generator.py
輸出的數據格式如下: dense_input:size ; dense_input:value ; sparse_input:size ; sparse_input:value ; … ; sparse_input:size ; sparse_input:value ; label:size ; label:value
理想的輸出為(截取了一個片段):
…
13 0.05 0.00663349917081 0.05 0.0 0.02159375 0.008 0.15 0.04 0.362 0.1 0.2 0.0 0.04 1 715353 1 817085 1 851010 1 833725 1 286835 1 948614 1 881652 1 507110 1 27346 1 646986 1 643076 1 200960 1 18464 1 202774 1 532679 1 729573 1 342789 1 562805 1 880474 1 984402 1 666449 1 26235 1 700326 1 452909 1 884722 1 787527 1 0
…
使用Dataset的一些注意事項
? Dataset的基本原理:將數據print到緩存,再由C++端的代碼實現讀取,因此,不能在dataset的讀取代碼中,加入與數據讀取無關的print信息,會導致C++端拿到錯誤的數據信息。
? Dataset目前只支持在unbuntu及CentOS等標準Linux環境下使用,在Windows及Mac下使用時,會產生錯誤,請知悉。
節點訓練
加快訓練速度,采用多線程的方式進行單節點的訓練,如配置更大的線程數來加速訓練等。
通信模塊
? 采用稀疏更新:目前部分OP實現了稀疏更新(embedding/nce/hsigmoid),采用稀疏更新的方式傳輸梯度,可以減少通信量,提升訓練速度
? 減少通信時間:可以采用減少通信次數(GEO方式隔N個batch通信一次,可以調整N來減少通信)、壓縮單次通信量(稀疏通信), 采樣全異步、GEO異步等訓練模式可以減少網絡通信,減少節點之間等待時間,達到加速的目的
分布式訓練策略GEO異步訓練的配置,引入StrategyFactory后,只需調用create_geo_strategy即可。
optimizer = paddle.optimizer.SGD(params.learning_rate)
# geo異步模式可以指定通信間隔的MiniBatch數,間隔Batch數越大,理論上速度越快,但是對效果可能有影響,需要在此進行權衡strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.a_sync = True
strategy.a_sync_configs = {"k_steps": 100}# 配置分布式的optimizer,傳入指定的strategy,構建program
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(avg_cost)
參數更新
? 稀疏更新:部分優化器實現了稀疏更新的方式(既只更新有梯度的參數),在參數規模較大的情況下,可以極大的提升訓練速度,目前包括SGD/Adagrad/Adam(需要在配置optimizer的時候指定lazy_mode=True)實現了稀疏更新,采用此方式可以提速。
稀疏參數(embedding等)會采用稀疏更新
optimizer = paddle.optimizer.AdamOptimizer(learning_rate=0.002, lazy_mode=True)
效果提升
可以通過調整訓練數據分布、訓練模式、優化算法及超參等手段來提升模型效果。試想,在同步訓練下,由于Pserver端更新參數時采用的是全局梯度,當“多機下節點數乘以batchsize等于單機下batchsize”時,多機效果可以和單機打平, 所以分布式下的效果優化,可以歸結為向單機靠齊。
? 訓練數據均勻分布或節點之間差異最小:隨機亂序,并均勻分配給不同訓練節點,使得節點之間訓練數據分布差異最小,且訓練速度差異最小,這樣效果會更穩定。一般情況下,訓練數據在訓練開始只分配一次,然后訓練多個Epoch。舉例,兩個節點AB,AB始終都在訓同一個Epoch的效果,肯定好于節點A在訓Epoch2,節點B在訓Epoch20。因為節點B在嚴重過擬合自己部分的數據。
? 優化算法:在需訓練多輪情況下,SGD最終效果會好些;在只需訓練一輪情況下,Ada系列效果會更好些。
? 訓練模式:同步模式的效果往往比異步好些,但異步速度更快。在異步下,可以通過減少節點差異,將效果向同步對齊。
? 超參:節點數調整的時候,batchsize、學習率要相應的調整,保證收斂速度不變。
總結
以上是生活随笔為你收集整理的参数服务器训练基本理论的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 多机多卡训练基本原理
- 下一篇: Paddle Lite端侧部署