[深度学习] 分布式Tensorflow 2.0 介绍(二)
[深度學習] 分布式模式介紹(一)
[深度學習] 分布式Tensorflow 2.0介紹(二)
[深度學習] 分布式Pytorch 1.0介紹(三)
[深度學習] 分布式Horovod介紹(四)
一 單GPU訓練 vs 多GPU訓練
單GPU訓練 一般代碼比較簡單,并且能滿足我們的基本需求,通常做法是設定變量CUDA_VISIBLE_DEVICES的值為某一塊GPU來Mask我們機器上的GPU設備,雖然有時當我們忘了設定該變量時程序會自動占用所有的GPU資源,但如果沒有相應的代碼去分配掌控GPU資源的使用的話,程序還是只會利用到第一張卡的計算資源,其他的資源則僅是占用浪費狀態。
多GPU訓練 則可以從兩個方面提升我們模型訓練的上限:1. 超過單卡顯存上限的模型大小, 2. 更大的Batch Size和更快訓練速度。
單機的多GPU訓練, tensorflow的官方已經給了一個cifar的例子,已經有比較詳細的代碼和文檔介紹, 這里大致說下多GPU的過程,以便方便引入到多機多GPU的介紹。
單機多GPU的訓練過程:
假設你的機器上有3個GPU;
在單機單GPU的訓練中,數據是一個batch一個batch的訓練。 在單機多GPU中,數據一次處理3個batch(假設是3個GPU訓練), 每個GPU處理一個batch的數據計算。
變量,或者說參數,保存在CPU上
剛開始的時候數據由CPU分發給3個GPU, 在GPU上完成了計算,得到每個batch要更新的梯度。
然后在CPU上收集完了3個GPU上的要更新的梯度, 計算一下平均梯度,然后更新參數。
然后繼續循環這個過程。
通過這個過程,處理的速度取決于最慢的那個GPU的速度。如果3個GPU的處理速度差不多的話, 處理速度就相當于單機單GPU的速度的3倍減去數據在CPU和GPU之間傳輸的開銷,實際的效率提升看CPU和GPU之間數據的速度和處理數據的大小。
二 分布式TensorFlow
Tensorflow分布式訓練的支持主要是通過tf.distribute.Strategy來實現
1 MirroredStrategy? 單機多卡訓練
in-graph replication with synchronous
MirroredStrategy是一種支持多張GPU在同一個機器上的同步訓練方法。在訓練開始時,Mirrored會在每張卡上復制一份模型,
每個顯卡會收到tf.data.Dataset傳來的數據,獨立計算梯度,然后采用all-reduce的方法進行同步更新。多個顯卡在通信時默認使用Nvidia NCCL進行。
我們可以深入MirroredStrategy的實現了解一下。基本上所有的distributed strategy都是通過某些collective ops和cross device ops進行數據通訊。MirroredStrategy也是如此,它是這樣選擇cross device ops的:
if len(workers) > 1:if not isinstance(self._cross_device_ops, cross_device_ops_lib.MultiWorkerAllReduce):raise ValueError("In-graph multi-worker training with `MirroredStrategy` is not ""supported.")self._inferred_cross_device_ops = self._cross_device_ops else:# TODO(yuefengz): make `choose_the_best` work with device strings# containing job names.self._inferred_cross_device_ops = cross_device_ops_lib.NcclAllReduce()這也就印證了MirroredStrategy在單機多卡的情況下默認使用NCCL來進行通信的說明。具體的實現大家可以去查看AllReduceCrossDeviceOps的實現。
同時,上面的程序也說明MirroredStrategy可以運用到多機多卡的情況中去,然而多機多卡的情況下用戶需要自己傳入cross_device_ops_lib.MultiWorkerAllReduce進行通訊,這里MultiWorkerAllReduce支持若干種通訊方式,比如nccl,?nccl/xring,?nccl/rechd,?nccl/pscpu,?xring,?pscpu,?pscpu/pscpu等等。由于目前最佳的通訊方式需要NCCL2.0加上xring,然而Tensorflow目前使用NCCL 1.1,并且nccl/xring在現有的代碼中有bug無法工作,所以這一模式常常被大家詬病。
MirroredStrategy instance which will use all the GPUs that are visible to TensorFlow, and use NCCL as the cross device communication.
訓練腳本就會自動進行分布式訓練。如果你只想用主機上的部分GPU訓練
mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])用戶通過該API控制使用何種分布式架構,例如如果用戶需要在單機多卡環境中使用All-Reduce架構,只需定義對應架構下的Strategy,指定Estimator的config參數即可:
mirrored_strategy = tf.distribute.MirroredStrategy() config = tf.estimator.RunConfig(train_distribute=mirrored_strategy, eval_distribute=mirrored_strategy) regressor = tf.estimator.LinearRegressor(feature_columns=[tf.feature_column.numeric_column('feats')],optimizer='SGD',config=config)tf.keras 例子
import tensorflow as tf import tensorflow_datasets as tfdsnum_epochs = 5 batch_size_per_replica = 64 learning_rate = 0.001strategy = tf.distribute.MirroredStrategy() print('Number of devices: %d' % strategy.num_replicas_in_sync) # 輸出設備數量 batch_size = batch_size_per_replica * strategy.num_replicas_in_sync# 載入數據集并預處理 def resize(image, label):image = tf.image.resize(image, [224, 224]) / 255.0return image, label# 當as_supervised為True時,返回image和label兩個鍵值 dataset = tfds.load("cats_vs_dogs", split=tfds.Split.TRAIN, as_supervised=True) dataset = dataset.map(resize).shuffle(1024).batch(batch_size)with strategy.scope():model = tf.keras.applications.MobileNetV2()model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),loss=tf.keras.losses.sparse_categorical_crossentropy,metrics=[tf.keras.metrics.sparse_categorical_accuracy])model.fit(dataset, epochs=num_epochs)MirroredStrategy 的步驟如下:
訓練開始前,該策略在所有 N 個計算設備上均各復制一份完整的模型;
每次訓練傳入一個批次的數據時,將數據分成 N 份,分別傳入 N 個計算設備(即數據并行);
N 個計算設備使用本地變量(鏡像變量)分別計算自己所獲得的部分數據的梯度;
使用分布式計算的 All-reduce 操作,在計算設備間高效交換梯度數據并進行求和,使得最終每個設備都有了所有設備的梯度之和;
使用梯度求和的結果更新本地變量(鏡像變量);
當所有設備均更新本地變量后,進行下一輪訓練(即該并行策略是同步的)。
默認情況下,TensorFlow 中的 MirroredStrategy 策略使用 NVIDIA NCCL 進行 All-reduce 操作。
2? MultiWorkerMirroredStrategy 多機訓練
對于分布式多機環境,最早是Uber專門提出了一種基于Ring-Allreduce的分布式TensorFlow架構Horovod,并已開源。
tf.distribute.experimental.MultiWorkerMirroredStrategy與MirroredStrategy非常類似,都在每一個device上存儲一份模型的備份,進行同步的分布式訓練。
該策略采用CollectiveOps作為多個worker之間通訊的操作。所謂的collective op是Tensorflow自己實現的根據當前硬件環境,網絡結構,和Tensor大小自動采用最佳算法進行all-reduce的計算操作。一個collective op的實現邏輯十分簡單
if (CanProceedWithCompute(c, col_exec, done)) {col_exec->ExecuteAsync(c, col_params_, GetCollectiveKey(c), actual_done); }c是當前op的計算狀態,col_exec是Tensorflow根據系統情況選擇的collective executor,所有的all reduce,boardcast和receive操作都有collective executor去執行。
該策略目前也實現了很多優化,比如將很多個小tensor的all reduce操作變成幾個大tensor的all reduce操作,以及在開發當中的采用最新NCCL 2.0進行通訊的操作,具體可以參見Issue 24505。可以看出Tensorflow分布式訓練在被吐槽很多次后,感受到了來自Pytorch,Horovod的壓力,在努力的提升自己。
最后,關于MultiWorkerMirroredStrategy的配置,有兩點需要注意。
一點是collective ops的策略選擇,目前支持CollectiveCommunication.RING,采用與Horovod類似的ring-based通訊策略。另一個是CollectiveCommunication.NCCL,采用Nvidia NCCL進行通訊,在啟動策略時可以傳入參數指定:
multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(tf.distribute.experimental.CollectiveCommunication.NCCL)CollectiveCommunication.AUTO defers the choice to the runtime.
另一個需要注意的是關于TF_CONFIG的設置,該策略并不需要指定Parameter server,只需要一系列worker即可,其配置如下:
TF_CONFIG = {'cluster': {'worker': ['worker1:port1', 'worker2:port2', 'worker3:port3', ...]},'task': {'type': 'worker', 'index': 0} })目前該API尚處于實驗階段。如果在代碼中通過MultiWorkerMirroredStrategy指定使用All-Reduce架構,則分布式提交時,TF_CONFIG環境變量中的cluster就不需要ps類型的節點了,例如:
TF_CONFIG='{"cluster": {"worker": ["host1:2222", "host2:2222", "host3:2222"]},"task": {"type": "work", "index": 0} }' strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy() config = tf.estimator.RunConfig(train_distribute=strategy, eval_distribute=strategy) regressor = tf.estimator.LinearRegressor(feature_columns=[tf.feature_column.numeric_column('feats')],optimizer='SGD',config=config)多機訓練的方法和單機多卡類似,將 MirroredStrategy 更換為適合多機訓練的 MultiWorkerMirroredStrategy 即可。不過,由于涉及到多臺計算機之間的通訊,還需要進行一些額外的設置。具體而言,需要設置環境變量 TF_CONFIG ,示例如下:
os.environ['TF_CONFIG'] = json.dumps({'cluster': {'worker': ["localhost:20000", "localhost:20001"]},'task': {'type': 'worker', 'index': 0} })TF_CONFIG 由 cluster 和 task 兩部分組成:
-
cluster 說明了整個多機集群的結構和每臺機器的網絡地址(IP + 端口號)。對于每一臺機器,cluster 的值都是相同的;
-
task 說明了當前機器的角色。例如, {'type': 'worker', 'index': 0} 說明當前機器是 cluster 中的第 0 個 worker(即 localhost:20000 )。每一臺機器的 task 值都需要針對當前主機進行分別的設置。
以上內容設置完成后,在所有的機器上逐個運行訓練代碼即可。先運行的代碼在尚未與其他主機連接時會進入監聽狀態,待整個集群的連接建立完畢后,所有的機器即會同時開始訓練。
請在各臺機器上均注意防火墻的設置,尤其是需要開放與其他主機通信的端口。如上例的 0 號 worker 需要開放 20000 端口,1 號 worker 需要開放 20001 端口。
以下示例的訓練任務與前節相同,只不過遷移到了多機訓練環境。假設我們有兩臺機器,即首先在兩臺機器上均部署下面的程序,唯一的區別是 task 部分,第一臺機器設置為 {'type': 'worker', 'index': 0} ,第二臺機器設置為 {'type': 'worker', 'index': 1} 。接下來,在兩臺機器上依次運行程序,待通訊成功后,即會自動開始訓練流程。
tf.keras例子
import tensorflow as tf import tensorflow_datasets as tfds import os import jsonnum_epochs = 5 batch_size_per_replica = 64 learning_rate = 0.001num_workers = 2 os.environ['TF_CONFIG'] = json.dumps({'cluster': {'worker': ["localhost:20000", "localhost:20001"]},'task': {'type': 'worker', 'index': 0} }) strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy() batch_size = batch_size_per_replica * num_workersdef resize(image, label):image = tf.image.resize(image, [224, 224]) / 255.0return image, labeldataset = tfds.load("cats_vs_dogs", split=tfds.Split.TRAIN, as_supervised=True) dataset = dataset.map(resize).shuffle(1024).batch(batch_size)with strategy.scope():model = tf.keras.applications.MobileNetV2()model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),loss=tf.keras.losses.sparse_categorical_crossentropy,metrics=[tf.keras.metrics.sparse_categorical_accuracy])model.fit(dataset, epochs=num_epochs)3. CentralStorageStrategy
tf.distribute.experimental.CentralStorageStrategy也執行同步訓練,但是變量不會被鏡像,而是放在CPU上。各操作(operation)在本地GPU之間復制進行。如果只有一個GPU,變量和操作都會放在GPU上。
創建一個 CentralStorageStrategy 實例:
central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy() INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0'這會創建一個 CentralStorageStrategy 實例使用所有可見的CPU和GPU。在更新應用到變量之前,不同副本上變量的更新將會匯總。
注意: 該策略是 實驗性的 ,因為我們正在對它進行改進,使他能在更多場景下工作. 敬請期待此API的變化。
4 ParameterServerStrategy
他是Tensorflow最初的分布式訓練方法,它由若干個parameter servers和若干個worker servers構成,parameter servers用于存儲參數,workers用于計算。
ps_strategy = tf.distribute.experimental.ParameterServerStrategy()ParameterServerStrategy 在訓練過程中worker servers會和不同的parameter servers溝通獲得參數,然后計算,向parameter servers傳遞參數的梯度。配置一個這樣的訓練環境非常簡單,只需要在程序運行時設置好環境變量TF_CONFIG,需要注意的是需要給分布式集群里每一個機子不同的task。
os.environ["TF_CONFIG"] = json.dumps({"cluster": {"worker": ["host1:port", "host2:port", "host3:port"],"ps": ["host4:port", "host5:port"]},"task": {"type": "worker", "index": 1} })同時,ParameterServerStrategy還有比較神奇的功能,它可以通過傳入num_gpus_per_worker在一個worker上進行多GPU的同步計算,然后不同worker之間進行異步計算。但是由于單一worker上多GPU并沒有利用NCCL進行通訊,而是直接將結果發送到CPU,所以效率非常低下。
strategy = tf.distribute.experimental.ParameterServerStrategy() run_config = tf.estimator.RunConfig(experimental_distribute.train_distribute=strategy) estimator = tf.estimator.Estimator(config=run_config) tf.estimator.train_and_evaluate(estimator,...)Examples and Tutorials
Here is a list of tutorials and examples that illustrate the above integration end to end with Keras:
We've integrated tf.distribute.Strategy into tf.keras which is TensorFlow's implementation of the Keras API specification.? tf.keras is a high-level API to build and train models. By integrating into tf.keras backend, we've made it seamless for Keras users to distribute their training written in the Keras training framework.
The only things that need to change in a user's program are:
(1) Create an instance of the appropriate tf.distribute.Strategy
(2) Move the creation and compiling of Keras model inside strategy.scope.
Here is a snippet of code to do this for a very simple Keras model with one dense layer:
mirrored_strategy = tf.distribute.MirroredStrategy() with mirrored_strategy.scope():model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])model.compile(loss='mse', optimizer='sgd')三? 分布式訓練的容錯 Fault tolerance
在同步訓練中, 如果有一個worker 失敗了, 整個訓練集群就會失敗,沒有故障恢復機制.
Using Keras with?tf.distribute.Strategy?comes with the advantage of fault tolerance in cases where workers die or are otherwise unstable. We do this by preserving training state in the distributed file system of your choice, such that upon restart of the instance that previously failed or preempted, the training state is recovered.
Since all the workers are kept in sync in terms of training epochs and steps, other workers would need to wait for the failed or preempted worker to restart to continue.
ModelCheckpoint callback
To take advantage of fault tolerance in multi-worker training, provide an instance of?tf.keras.callbacks.ModelCheckpoint?at the?tf.keras.Model.fit()?call. The callback will store the checkpoint and training state in the directory corresponding to the?filepath?argument to?ModelCheckpoint.
# Replace the `filepath` argument with a path in the file system # accessible by all workers. callbacks = [tf.keras.callbacks.ModelCheckpoint(filepath='/tmp/keras-ckpt')] with strategy.scope():multi_worker_model = build_and_compile_cnn_model() multi_worker_model.fit(x=train_datasets, epochs=3, callbacks=callbacks) Epoch 1/3469/Unknown - 8s 18ms/step - loss: 2.2049 - accuracy: 0.2318WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_core/python/ops/resource_variable_ops.py:1781: calling BaseResourceVariable.__init__ (from tensorflow.python.ops.resource_variable_ops) with constraint is deprecated and will be removed in a future version. Instructions for updating: If using Keras pass *_constraint arguments to layers.WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_core/python/ops/resource_variable_ops.py:1781: calling BaseResourceVariable.__init__ (from tensorflow.python.ops.resource_variable_ops) with constraint is deprecated and will be removed in a future version. Instructions for updating: If using Keras pass *_constraint arguments to layers.INFO:tensorflow:Assets written to: /tmp/keras-ckpt/assetsINFO:tensorflow:Assets written to: /tmp/keras-ckpt/assets469/469 [==============================] - 9s 19ms/step - loss: 2.2049 - accuracy: 0.2318 Epoch 2/3 451/469 [===========================>..] - ETA: 0s - loss: 1.9195 - accuracy: 0.5715INFO:tensorflow:Assets written to: /tmp/keras-ckpt/assetsINFO:tensorflow:Assets written to: /tmp/keras-ckpt/assets469/469 [==============================] - 2s 4ms/step - loss: 1.9113 - accuracy: 0.5767 Epoch 3/3 450/469 [===========================>..] - ETA: 0s - loss: 1.4175 - accuracy: 0.7550INFO:tensorflow:Assets written to: /tmp/keras-ckpt/assetsINFO:tensorflow:Assets written to: /tmp/keras-ckpt/assets469/469 [==============================] - 2s 4ms/step - loss: 1.4078 - accuracy: 0.7561<tensorflow.python.keras.callbacks.History at 0x7fc38fdfee80> ?If a worker gets preempted, the whole cluster pauses until the preempted worker is restarted. Once the worker rejoins the cluster, other workers will also restart. Now, every worker reads the checkpoint file that was previously saved and picks up its former state, thereby allowing the cluster to get back in sync. Then the training continues.
If you inspect the directory containing the?filepath?you specified in?ModelCheckpoint, you may notice some temporarily generated checkpoint files. Those files are needed for recovering the previously lost instances, and they will be removed by the library at the end of?tf.keras.Model.fit()?upon successful exiting of your multi-worker training.
三 總結
本文梳理了分布式TensorFlow編程模型的發展,主要從用戶使用分布式TensorFlow角度出發,闡述了不同的分布式TensorFlow架構。可以看到,隨著TensorFlow的迭代演進,其易用性越來越友好。目前TensorFlow已經發布了2.0.0 正式版本,標志著TensorFlow正式進入2.0時代了,通過不同的Strategy,可以輕松控制使用不同的分布式TensorFlow架構,可見TensorFlow的API設計更加靈活友好,擁有極強的可擴展性,相信將來會出現更多的Strategy來應對復雜的分布式場景。
在2.0版本中,其主打賣點是Eager Execution與Keras高階API,整體易用性將進一步提升,通過Eager Execution功能,我們可以像使用原生Python一樣操作Tensor,而不需要像以前一樣需要通過Session.run的方式求解Tensor,另外,通過TensorFlow Keras高階API,可以更加靈活方便構建模型,同時可以將模型導出為Keras標準格式HDF5,以靈活兼容在線服務等。
補充: Tensorflow 1.0---in-graph 和? between-graph
in-graph模式
In-graph模式,單機多GPU模型有點類似,? 把計算已經從單機多GPU,已經擴展到了多機多GPU了, 不過數據分發還是在一個節點,其他結算節點只需join操作。 這樣的好處是配置簡單, 其他多機多GPU的計算節點,只要起個join操作, 暴露一個網絡接口,等在那里接受任務就好了。 這些計算節點暴露出來的網絡接口,使用起來就跟本機的一個GPU的使用一樣, 只要在操作的時候指定tf.device("/job:worker/task:N"),就可以向指定GPU一樣,把操作指定到一個計算節點上計算,使用起來和多GPU的類似。 但是這樣的壞處是訓練數據的分發依然在一個節點上, 要把訓練數據分發到不同的機器上, 嚴重影響并發訓練速度。在大數據訓練的情況下, 不推薦使用這種模式。
對于圖內復制,只構建一個Client,這個Client構建一個Graph,Graph中包含一套模型參數,放置在ps上,同時Graph中包含模型計算部分的多個副本,每個副本都放置在一個worker上,這樣多個worker可以同時訓練復制的模型。
再開一個Python解釋器,作為Client,執行如下語句構建計算圖,并:
import tensorflow as tfwith tf.device("/job:ps/task:0"):w = tf.get_variable([[1., 2., 3.], [1., 3., 5.]])input_data = ... inputs = tf.split(input_data, num_workers) outputs = []for i in range(num_workers):with tf.device("/job:ps/task:%s" % str(i)):outputs.append(tf.matmul(inputs[i], w))output = tf.concat(outputs, axis=0) with tf.Session() as sess:sess.run(tf.global_variables_initializer())print sess.run(output)從以上代碼可以看到,當采用圖內復制時,需要在Client上創建一個包含所有worker副本的流程圖,隨著worker數量的增長,計算圖將會變得非常大,不利于計算圖的維護。此外,數據分發在Client單點,要把訓練數據分發到不同的機器上,會嚴重影響并發訓練速度。所以在大規模分布式多機訓練情況下,一般不會采用圖內復制的模式,該模式常用于單機多卡情況下,簡單直接。
between-graph模式
between-graph模式下,訓練的參數保存在參數服務器,數據不用分發,數據分片的保存在各個計算節點,各個計算節點自己算自己的,算完后把要更新的參數告訴參數服務器,參數服務器更新參數。這種模式的優點是不用進行訓練數據的分發,尤其數據量在TB級的時候,節省了大量的時間,所以大數據深度學習推薦使用between-graph模式。
為可以解決圖內復制在擴展上的局限性,我們可以采用圖間復制模式。對于圖間復制,每個worker節點上都創建一個Client,各個Client構建相同的Graph,但是參數還是放置在ps上,每個worker節點單獨運算,一個worker節點掛掉了,系統還可以繼續跑。
所以我們在第一個worker和第二個worker的Python解釋器里繼續執行如下語句實現Client完成整個分布式TensorFlow的運行:
with tf.device("/job:ps/task:0"):w = tf.get_variable(name='w', shape=[784, 10])b = tf.get_variable(name='b', shape=[10])x = tf.placeholder(tf.float32, shape=[None, 784]) y = tf.placeholder(tf.int32, shape=[None]) logits = tf.matmul(x, w) + b loss = ... train_op = ...with tf.Session() as sess:for _ in range(10000):sess.run(train_op, feed_dict=...)在上述描述的過程中,我們是全程手動做分布式驅動的,先建立Cluster,然后構建計算圖提交執行,Server上的Master Service和Worker Service根本沒有用到。實際應用時當然不會這么愚蠢,一般是將以上代碼片段放到一個文件中,通過參數控制執行不同的代碼片段,例如:
import tensorflow as tfps_hosts = FLAGS.ps_hosts.split(",") worker_hosts = FLAGS.worker_hosts.split(",") cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts}) server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)if FLAGS.job_name == 'ps':server.join() elif FLAGS.job_name == "worker":with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % FLAGS.task_index,cluster=cluster)):# Build model...loss = ...train_op = ...with tf.train.MonitoredTrainingSession(master="/job:worker/task:0",is_chief=(FLAGS.task_index == 0),checkpoint_dir="/tmp/train_logs") as mon_sess:while not mon_sess.should_stop():mon_sess.run(train_op)每個節點上都執行如上代碼,只是不同節點輸入的參數不一樣,對于ps節點,啟動Server后就堵塞等待參數服務,對于worker節點,啟動Server后(后臺服務),開始扮演Client,構建計算圖,最后通過Session提交計算。注意在調用Session.run之前,僅僅是Client的構圖,并未開始計算,各節點上的Server還未發揮作用,只有在調用Session.run后,worker和ps節點才會被派發Task。在調用Session.run時,需要給Session傳遞target參數,指定使用哪個worker節點上的Master Service,Client將構建的計算圖發給target指定的Master Service,一個TensorFlow集群中只有一個Master Service在工作,它負責子圖劃分、Task的分發以及模型保存與恢復等,在子圖劃分時,它會自動將模型參數分發到ps節點,將梯度計算分發到worker節點。另外,在Client構圖時通過tf.train.replica_device_setter告訴worker節點默認在本機分配Op,這樣每個Worker Service收到計算任務后構建出一個單獨的計算子圖副本,這樣每個worker節點就可以單獨運行,掛了不影響其他worker節點繼續運行。
雖然圖間復制具有較好的擴展性,但是從以上代碼可以看到,寫一個分布式TensorFlow應用,需要用戶自行控制不同組件的運行,這就需要用戶對TensorFlow的分布式架構有較深的理解。另外,分布式TensorFlow應用與單機版TensorFlow應用的代碼是兩套,一般使用過程中,用戶都是先在單機上調試好基本邏輯,然后再部署到集群,在部署分布式TensorFlow應用前,就需要將前面的單機版代碼改寫成分布式多機版,用戶體驗非常差。所以說,使用Low-level 分布式編程模型,不能做到一套代碼既可以在單機上運行也可以在分布式多機上運行,其用戶門檻較高,一度被相關工程及研究人員詬病。為此,TensorFlow推出了High-level分布式編程模型,極大地改善用戶易用性。
同步更新和異步更新
in-graph和between-graph模式都支持同步更新和異步更新。
在同步更新的時候,每次梯度更新,要等所有分發的數據計算完成,返回結果,把梯度累加算了均值之后,再更新參數。這樣的好處是loss的下降比較穩定,但這個的壞處也比較明顯,處理的速度取決于最慢的那個分片的計算時間。
在異步更新時,所有的計算節點,自己算自己的,更新參數也是自己更新自己的計算結果,這樣的優點是計算速度快,計算資源能得到充分利用,但是缺點是loss的下降不穩定,抖動大。
在數據量小的情況下,各個節點的計算能力比較均衡的情況下,推薦使用同步模式;數據量很大,各個機器的計算性能參差不齊的情況下,推薦使用異步的方式。
TensorFlow 1.X 版本的分布式
最原始的分布式TensorFlow
Parameter Server的配置數量也非常復雜,不同的網絡環境,模型大小都會對效率有影響,所以現在官方好像也不怎么推薦這種做法了。最原始的分布式TensorFlow編程是基于Low-level API來實現,下面我們通過舉例來理解最原始的分布式TensorFlow編程步驟。我們在一臺機器上啟動三個Server(2個worker,1個ps)來模擬分布式多機環境,開啟三個Python解釋器(分別對應2個worker和1個ps),執行如下python語句,定義一個Cluster:
import tensorflow as tfcluster = tf.train.ClusterSpec({"worker": ["localhost:2222","localhost:2223"],"ps": ["localhost:2224"]})在第一個worker解釋器內執行如下語句啟動Server: server = tf.train.Server(cluster, job_name="worker", task_index=0)在第二個worker解釋器內執行如下語句啟動Server:
server = tf.train.Server(cluster, job_name="worker", task_index=1)在ps解釋器內執行如下語句啟動Server: server = tf.train.Server(cluster, job_name="ps", task_index=0)至此,我們已經啟動了一個TensorFlow Cluster,它由兩個worker節點和一個ps節點組成,每個節點上都有Master Service和Worker Service,其中worker節點上的Worker Service將負責梯度運算,ps節點上的Worker Service將負責參數更新,三個Master Service將僅有一個會在需要時被用到,負責子圖劃分與Task派發。
上圖所示,假設存在兩個任務:
- /job:ps/task:0: 負責模型參數的存儲和更新
- /job:worker/task:0: 負責模型的訓練或推理
有了Cluster,我們就可以編寫Client,構建計算圖,并提交到這個Cluster上執行。使用分布式TensorFlow時,最常采用的分布式訓練策略是數據并行,數據并行就是在很多設備上放置相同的模型,在TensorFlow中稱之為Replicated training,主要表現為兩種模式:圖內復制(in-graph replication)和圖間復制(between-graph replication)。不同的運行模式,Client的表現形式不一樣。
-
Client
可以把它看成是TensorFlow前端,它支持多語言的編程環境(Python/C++/Go/Java等),方便用戶構造各種復雜的計算圖。Client通過Session連接TensorFlow后端,并啟動計算圖的執行。Client基于TensorFlow的編程接口,構造計算圖。此時,TensorFlow并未執行任何計算。直至建立Session會話,并以Session為橋梁,建立Client與后端運行時的通道,將Protobuf格式的GraphDef發送至Distributed Master。也就是說,當Client對OP結果進行求值時,將觸發Distributed Master的計算圖的執行過程 -
Master
Master根據要計算的操作(Op),從計算圖中反向遍歷,找到其所依賴的最小子圖,然后將該子圖再次分裂為多個子圖片段,以便在不同的進程和設備上運行這些子圖片段,最后將這些子圖片段派發給Worker執行。 -
Worker
Worker按照計算子圖中節點之間的依賴關系,根據當前的可用的硬件環境(GPU/CPU/TPU),調用Op的Kernel實現完成運算。對于每個任務,都將存在相應的Worker Service,它主要負責如下3個方面的職責:1 處理來自Master的請求;2 調度OP的Kernel實現,執行本地子圖;3 協同任務之間的數據通信。
在分布式TensorFlow中,參與分布式系統的所有節點或者設備統稱為一個Cluster,一個Cluster中包含很多Server,每個Server去執行一項Task,Server和Task是一一對應的。
所以,Cluster可以看成是Server的集合,也可以看成是Task的集合,TensorFlow為各個Task又增加了一個抽象層,將一系列相似的Task集合稱為一個Job。
一組Task集合(即Job)有若干個Server(host和port標識),每個Server上會綁定兩個Service,就是前面提到的Master Service和Worker Service,Client通過Session連接集群中的任意一個Server的Master Service提交計算圖,Master Service負責劃分子圖并派發Task給Worker Service,Worker Service則負責運算派發過來的Task完成子圖的運算。
為什么要分成Cluster Job和Task
首先,我們介紹一下Task:Task就是主機上的一個進程,在大多數情況下,一個機器上只運行一個Task.
為什么Job是Task的集合呢? 在分布式深度學習框架中,我們一般把Job劃分為Parameter和Worker,Parameter Job是管理參數的存儲和更新工作.Worker Job是來運行ops.如果參數的數量太大,一臺機器處理不了,這就要需要多個Tasks.
Cluster?是?Jobs?的集合:?Cluster(集群),就是我們用的集群系統了
參數服務器
當計算模型越來越大,模型的參數越來越多,多到模型參數的更新,一臺機器的性能都不夠時,我們需要將參數分開到不同的機器去存儲和更新。參數服務器可以是多臺機器組成的集群,類似于分布式的存儲結構。主要用來解決參數存儲和更新的性能問題。
對于PS架構,Parameter Server的Task集合為ps(即job類型為ps),而執行梯度計算的Task集合為worker(即job類型為worker),Low-level 分布式編程模型
High-level 分布式編程模型
TensorFlow提供Estimator和Dataset高階API,簡化模型構建以及數據輸入,用戶通過Estimator和Dataset高階API編寫TensorFlow應用,不用了解TensorFlow內部實現細節,只需關注模型本身即可。
Estimator代表一個完整的模型,它提供方法用于模型的訓練、評估、預測及導出
Estimator具備如下優勢:
- 基于Estimator編寫的代碼,可運行在單機和分布式環境中,不用區別對待
- 簡化了模型開發者之間共享部署,它提供了標準的模型導出功能,可以將訓練好的模型直接用于TensorFlow-Serving等在線服務
- 提供全套的分布式訓練生命周期管理,自動初始化變量、處理異常、創建檢查點文件并從故障中恢復、以及保存TensorBoard 的摘要等
- 提供了一系列開箱即用的常見Estimator,例如DNNClassifier,LinearClassifier等
使用Estimator編寫應用時,需將數據輸入從模型中分離出來。數據輸入可以通過?Dataset?API 構建數據 pipeline,類似Spark RDD或DataFrame,可以輕松處理大規模數據、不同的數據格式以及復雜的轉換等。具體關于Estimator的使用可以參考TensorFlow官方文檔,講的特別詳細。
使用Estimator編寫完應用后,可以直接單機上運行,如果需要將其部署到分布式環境運行,則需要在每個節點執行代碼前設置集群的TF_CONFIG環境變量(實際應用時通常借助資源調度平臺自動完成,如K8S,不需要修改TensorFlow應用程序代碼):
TF_CONFIG='{"cluster": {"chief": ["host0:2222"],"worker": ["host1:2222", "host2:2222", "host3:2222"],"ps": ["host4:2222", "host5:2222"]},"task": {"type": "chief", "index": 0} }'TF_CONFIG環境變量是一個json字符串,指定集群規格cluster以及節點自身的角色task,cluster包括chief、worker、ps節點,chief節點其實是一個特殊的worker節點,而且只能有一個節點,表示分布式TensorFlow Master Service所在的節點。
通過以上描述可以看到,使用高階API編寫分布式TensorFlow應用已經很方便了,然而因為PS架構的緣故,我們實際部署時,需要規劃使用多少個ps,多少個worker,那么調試過程中,需要反復調整ps和worker的數量。當模型規模較大時,在分布式訓練過程中,ps可能成為網絡瓶頸,因為所有worker都需要從ps處更新/獲取參數,如果ps節點網絡被打滿,那么worker節點可能就會堵塞等待,以至于其計算能力就發揮不出來。所以后面TensorFlow引入All-Reduce架構解決這類問題。
參考
TensorFlow分布式全套
TensorFlow架構與設計:概述
http://sharkdtu.com/posts/dist-tf-evolution.html
https://zhuanlan.zhihu.com/p/70312627
總結
以上是生活随笔為你收集整理的[深度学习] 分布式Tensorflow 2.0 介绍(二)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: AMD R9 7845HX 游戏本处理器
- 下一篇: 手把手教你用 Word 制作“红头文件”