PPO实战学习总结
PPO used in go-bigger
前段時間一直在學習ppo算法,寫了 一點總結,記錄一下自己對ppo算法的一些理解與RL實戰時候容易遇到的一些問題。代碼地址如下,需要的可以自取:
https://github.com/FLBa9762/PPO_used_in_Gobigger.git
一般DRL在實際使用時根據流程可分類為collect、learn、eval三大過程,所以本文將PPO代碼分成這三大過程逐一解析,重點解析learn部分的原理與代碼,最后邊再附上共享參數的PPO網絡模型以供參考
collect
同時輸出 action 和 value, 并且保存在buffer中:
output = self._collect_model.forward(data, mode='compute_actor_critic')
保存到 buffer 的數據信息結構如下,其中 tarj_flag是一個標志位,從done信息得來但是將一組數據的最后一個done置為True:
其中;obs下數據的格式為:
每一次采集的數據可以多次輸入到learn中進行學習,提高樣本的利用率:
for i in range(cfg.policy.learn.update_per_collect):input_data = copy.deepcopy(train_data)learner.train(input_data, collector.envstep)
此外,還使用了并行采集,使用了pipe庫,具體使用方法還沒有掌握
learn
流程
(1)用data中的obs信息和next_obs信息得出value和next_value:并且對value信息進行標準化
with torch.no_grad():value = self._learn_model.forward(data['obs'], mode='compute_critic')['value']next_value = self._learn_model.forward(data['next_obs'], mode='compute_critic')['value']if self._value_norm:value *= self._running_mean_std.stdnext_value *= self._running_mean_std.std
(2)利用data中的數據和value,next_value得出adv信息:
compute_adv_data = gae_data(value, next_value, data['reward'], data['done'], data['traj_flag'])
data['adv'] = gae(compute_adv_data, self._gamma, self._gae_lambda) # 0.9, 0.95
具體步驟在本小節后的關鍵步驟中。
(3)求return值,并對return和value進行標準化:
unnormalized_returns = value + data['adv']
if self._value_norm:data['value'] = value / self._running_mean_std.stddata['return'] = unnormalized_returns / self._running_mean_std.stdself._running_mean_std.update(unnormalized_returns.cpu().numpy())
處理后的data數據包含信息如圖:
(4)將data數據隨機打亂,分組,每組大小為learn_batch,輸入到model中進行loss計算,下面代碼是計算loss時數據的準備工作(這里要注意data的維度,因為數據的運算全都是并行batch形式,不注意檢查很容易就會出現問題,之前在這里吃了大虧):
for batch in split_data_generator(data, self._cfg.learn.batch_size, shuffle=True):output = self._learn_model.forward(batch['obs'], mode='compute_actor_critic')adv = batch['adv'].squeeze(-1)output['logit'] = output['logit'].squeeze(1)batch['logit'] = batch['logit'].squeeze(1)if self._adv_norm:# Normalize advantage in a train_batchadv = (adv - adv.mean()) / (adv.std() + 1e-5)
(5)將處理后的數據打包輸入進行loss計算,clip_ratio一般取0.2:
ppo_batch = ppo_data(output['logit'], batch['logit'], batch['action'], output['value'], batch['value'], adv,batch['return'], batch['weight'])ppo_loss, ppo_info = ppo_error(ppo_batch, self._clip_ratio)
ppo_error 的具體步驟在本小節后的關鍵步驟中。
(6)第五步返回的loss有三種類型,分為policy_loss,value_loss和 entropy_loss, 由于actor和critic網絡使用了公共參數,并且可以通過添加熵加成來確保足夠的探索性。根據PPO論文所指出的(如下圖),每次迭代要 最大化以下公式, 這里我的 value_loss 和 entropy_loss 都是和公式一樣的,policy_loss我在求的時候就加了負號了(policy_gradient的習慣),所以代碼中的 total_loss 寫成了如下形式:
wv, we = self._value_weight, self._entropy_weighttotal_loss = ppo_loss.policy_loss + wv * ppo_loss.value_loss - we * ppo_loss.entropy_lossself._optimizer.zero_grad()total_loss.backward()self._optimizer.step()
其中,wv和we分別為value_loss和entropy_loss的權重,一般取0.5和0.01
關鍵步驟
求adv(使用GAE方法)
(1)數據整理,代碼如下:
compute_adv_data = gae_data(value, next_value, data['reward'], data['done'], data['traj_flag'])data['adv'] = gae(compute_adv_data, self._gamma, self._gae_lambda) # 0.9, 0.95
(2)gae函數求每幀數據對應的adv:
def gae(data: namedtuple, gamma: float = 0.99, lambda_: float = 0.97):value, next_value, reward, done, traj_flag = dataif done is None:done = torch.zeros_like(reward, device=reward.device)if len(value.shape) == len(reward.shape) + 1: # for some marl case: value(T, B, A), reward(T, B)reward = reward.unsqueeze(-1)done = done.unsqueeze(-1)delta = reward + (1 - done) * gamma * next_value - valuefactor = gamma * lambda_adv = torch.zeros_like(value, device=value.device)gae_item = torch.zeros_like(value[0])for t in reversed(range(reward.shape[0])):if traj_flag is None: # trag_flag is not Nonegae_item = delta[t] + factor * gae_item * (1 - done[t])else:gae_item = delta[t] + factor * gae_item * (1 - traj_flag[t].float())adv[t] += gae_itemreturn adv
關鍵的計算公式:
delta = reward + (1 - done) * gamma * next_value - value
用reward、value和next_value信息求的一個大致的基優勢
gae_item = delta[t] + factor * gae_item * (1 - traj_flag[t].float())
通過反向迭代,對基優勢進行修正,修正后優勢包含未來的獎勵信息,其中
factor = gamma * lambda_
factor越大,智能體越有遠見,但相應模型越難收斂
求loss(policy_loss, value_loss, entropy_loss)
(1)求三個loss,其中ppo_output中包括ppo_loss和entropy_loss,ppo_info中包括approx_kl(新舊策略差異)和 clipfrac(clip的數據占比)。
def ppo_error(data: namedtuple,clip_ratio: float = 0.2,use_value_clip: bool = True,dual_clip: Optional[float] = None
) -> Tuple[namedtuple, namedtuple]:assert dual_clip is None or dual_clip > 1.0, "dual_clip value must be greater than 1.0, but get value: {}".format(dual_clip)logit_new, logit_old, action, value_new, value_old, adv, return_, weight = datapolicy_data = ppo_policy_data(logit_new, logit_old, action, adv, weight)policy_output, policy_info = ppo_policy_error(policy_data, clip_ratio, dual_clip)value_data = ppo_value_data(value_new, value_old, return_, weight)value_loss = ppo_value_error(value_data, clip_ratio, use_value_clip)return ppo_loss(policy_output.policy_loss, value_loss, policy_output.entropy_loss), policy_info
(2)ppo_policy_error()代碼:
先給出ppo的loss公式:
其中:
代碼:
def ppo_policy_error(data: namedtuple,clip_ratio: float = 0.2,dual_clip: Optional[float] = None) -> Tuple[namedtuple, namedtuple]:logit_new, logit_old, action, adv, weight = dataif weight is None:weight = torch.ones_like(adv)dist_new = torch.distributions.categorical.Categorical(logits=logit_new)dist_old = torch.distributions.categorical.Categorical(logits=logit_old)logp_new = dist_new.log_prob(action)logp_old = dist_old.log_prob(action)dist_new_entropy = dist_new.entropy()if dist_new_entropy.shape != weight.shape:dist_new_entropy = dist_new.entropy().mean(dim=1)entropy_loss = (dist_new_entropy * weight).mean()# policy_lossratio = torch.exp(logp_new - logp_old)if ratio.shape != adv.shape:ratio = ratio.mean(dim=1)surr1 = ratio * advsurr2 = ratio.clamp(1 - clip_ratio, 1 + clip_ratio) * advif dual_clip is not None: #不執行這里clip1 = torch.min(surr1, surr2)clip2 = torch.max(clip1, dual_clip * adv)# only use dual_clip when adv < 0policy_loss = -(torch.where(adv < 0, clip2, clip1) * weight).mean()else:policy_loss = (-torch.min(surr1, surr2) * weight).mean()with torch.no_grad():approx_kl = (logp_old - logp_new).mean().item()clipped = ratio.gt(1 + clip_ratio) | ratio.lt(1 - clip_ratio)clipfrac = torch.as_tensor(clipped).float().mean().item()return ppo_policy_loss(policy_loss, entropy_loss), ppo_info(approx_kl, clipfrac)
其中關鍵代碼:
dist_new = torch.distributions.categorical.Categorical(logits=logit_new)dist_old = torch.distributions.categorical.Categorical(logits=logit_old)logp_new = dist_new.log_prob(action)logp_old = dist_old.log_prob(action)
torch.distributions.categorical.Categorical(logits=< >) 創建一個由logits參數組成的分布(logits是未標準化的概率),pytorch官方手冊中是這樣描述的
dist_new.log_prob() 是用新舊策略對采樣的動作進行處理 (這里具體處理理解的不是很清楚,其實就是對應了ppo算法中求新舊策略的差異) 并得到最后的logp_new和logp_old,(使用log_prob是因為后邊方便用對數運算的性質得到 新舊策略的比值作為新舊策略的差異)
log_prob 源碼:
ratio就是PPO公式中的r(θ),求法如下:
ratio = torch.exp(logp_new - logp_old)
之后將r(θ)帶入clip公式求ppo_loss。weight通常是全為1向量。
policy_loss = (-torch.min(surr1, surr2) * weight).mean()
至于ppo_loss前為何加負號,看公式,由于使用了adv優勢,所以我們希望得到更好的獎勵,所以希望優勢越高越好(最大化 torch.min(surr1, surr2)),根據神經網絡的更新原理,可以reduce(-torch.min(surr1, surr2))。
此外,entropy_loss的求解公式為:
dist_new = torch.distributions.categorical.Categorical(logits=logit_new)
dist_new_entropy = dist_new.entropy()
entropy_loss = (dist_new_entropy * weight).mean()
先上pytorch官方源碼:
公式很這樣看理解起來比較復雜,但是大體上是在求新策略的信息熵(entropy),信息熵的定義是: 表示隨機變量不確定的度量,越隨機的信源熵越大。 也就是logit_new(未標準化的概率,這段直接叫它概率)概率數組對應的動作不確定度。
例如,如果logit_new 中每個下標對應的概率都一樣,動作非常的不確定,那么它的entropy應該就很大,因為此時每個動作的概率都相等,所以logit_new中相當于信息含量很少,它的entropy就很大;相反,在模型訓練后期,智能體已經可以根據狀態得到一個非常確定的動作了,也就是logit_new中可能會有一個很大的概率和其他很小的概率,那么此時logit_new中信息含量很大,它的entropy也就越小。
(這部分是個人的一點理解,不知道是不是對的)
(3)ppo_value_error()代碼:
def ppo_value_error(data: namedtuple,clip_ratio: float = 0.2,use_value_clip: bool = True,) -> torch.Tensor:value_new, value_old, return_, weight = dataif weight is None:weight = torch.ones_like(value_old)# value_lossif use_value_clip:value_clip = value_old + (value_new - value_old).clamp(-clip_ratio, clip_ratio)v1 = (return_ - value_new).pow(2)v2 = (return_ - value_clip).pow(2)value_loss = 0.5 * (torch.max(v1, v2) * weight).mean()else:value_loss = 0.5 * ((return_ - value_new).pow(2) * weight).mean()return value_loss
利用return和value_new構成誤差來求value_loss,這里也用到了clip的思想,是為了防止網絡更新過快,這里構建方法類似于DQN,就不多余贅述,value_loss的符號為+。
eval
(1)對data信息進行處理后(信息結構為:obs)輸入網絡求action和logit,代碼如下:
with torch.no_grad():output = self._eval_model.forward(data, mode='compute_actor')
(2)與環境完成一個episode交互后求得評估信息并輸出,輸出內容如下(可參考使用)
'train_iter': train_iter, # 學習率'ckpt_name': 'iteration_{}.pth.tar'.format(train_iter),'episode_count': n_episode,'envstep_count': envstep_count,'avg_envstep_per_episode': envstep_count / n_episode,'evaluate_time': duration,'avg_envstep_per_sec': envstep_count / duration,'avg_time_per_episode': n_episode / duration,'reward_mean': np.mean(episode_reward),'reward_std': np.std(episode_reward),'reward_max': np.max(episode_reward),'reward_min': np.min(episode_reward),
model:
主模型
主模型如下,其中self.encoder為對輸入信息進行的特征提取,為Actor和Critic網絡的共用部分,self.actor_head為actor網絡獨有的輸出部分,由于本環境使用的動作空間維度為16,因此輸出的logit(未歸一化的概率) 也是16維的;self.critic_head 為critic網絡獨有的部分,輸出為評價值,維度為1。
mode = [‘compute_actor’, ‘compute_critic’, ‘compute_actor_critic’] 分為三種網絡輸出模式,可以根據需要選擇(官方庫的設置非常靈活,可以記錄一下這種forward選擇方式)
class GoBiggerPPoModel(nn.Module):mode = ['compute_actor', 'compute_critic', 'compute_actor_critic']def __init__(self,scalar_shape: int,food_shape: int,food_relation_shape: int,thorn_relation_shape: int,clone_shape: int,clone_relation_shape: int,hidden_shape: int,encode_shape: int,action_type_shape: int,rnn: bool = False,critic_head_hidden_size: int = 32,critic_head_layer_num: int = 1,activation=nn.ReLU(inplace=True),) -> None:super(GoBiggerPPoModel, self).__init__()self.activation = activationself.action_type_shape = action_type_shapeself.encoder = Encoder(scalar_shape = scalar_shape,food_shape = food_shape,food_relation_shape = food_relation_shape,thorn_relation_shape = thorn_relation_shape,clone_shape = clone_shape,clone_relation_shape = clone_relation_shape,hidden_shape = hidden_shape,encode_shape = encode_shape,activation = activation)self.actor_head = DiscreteHead(32, action_type_shape, layer_num=2, activation=self.activation)self.critic_head = RegressionHead(critic_head_hidden_size, 1, critic_head_layer_num,activation=activation)self.actor = [self.encoder, self.actor_head]self.critic = [self.encoder, self.critic_head]self.actor = nn.ModuleList(self.actor)self.critic = nn.ModuleList(self.critic)def forward(self, inputs, mode:str):assert mode in self.mode, "not support forward mode: {}/{}".format(mode, self.mode)return getattr(self, mode)(inputs)
compute_actor, compute_critic, compute_actor_critic部分的代碼:
compute_actor
def compute_actor(self, inputs: torch.Tensor):B = inputs['batch']A = inputs['player_num_per_team']scalar = inputs['scalar']food = inputs['food']food_relation = inputs['food_relation']thorn_relation = inputs['thorn_relation']thorn_mask = inputs['thorn_mask']clone = inputs['clone']clone_relation = inputs['clone_relation']clone_mask = inputs['clone_mask']x = self.encoder(scalar, food, food_relation, thorn_relation, thorn_mask, clone,clone_relation, clone_mask)res = self.actor_head(x)action_type_logit = res['logit'] # B, M, action_type_sizeaction_type_logit = action_type_logit.reshape(B, A,*action_type_logit.shape[1:])return {'logit': action_type_logit,}
compute_critic:
def compute_critic(self, inputs: torch.Tensor):B = inputs['batch']A = inputs['player_num_per_team']scalar = inputs['scalar']food = inputs['food']food_relation = inputs['food_relation']thorn_relation = inputs['thorn_relation']thorn_mask = inputs['thorn_mask']clone = inputs['clone']clone_relation = inputs['clone_relation']clone_mask = inputs['clone_mask']x = self.encoder(scalar, food, food_relation, thorn_relation, thorn_mask, clone,clone_relation, clone_mask)value = self.critic_head(x)value_pred = value['pred']value_type_pred = value_pred.reshape(B, A, *value_pred.shape[1:])value_output_pred = torch.mean(value_type_pred, 1).unsqueeze(-1)return {'value': value_output_pred}
compute_actor_critic:
def compute_actor_critic(self, inputs:torch.Tensor):B = inputs['batch']A = inputs['player_num_per_team']scalar = inputs['scalar']food = inputs['food']food_relation = inputs['food_relation']thorn_relation = inputs['thorn_relation']thorn_mask = inputs['thorn_mask']clone = inputs['clone']clone_relation = inputs['clone_relation']clone_mask = inputs['clone_mask']actor_embedding = critic_embedding = self.encoder(scalar, food, food_relation, thorn_relation,thorn_mask, clone, clone_relation, clone_mask)act = self.actor_head(actor_embedding)action_logit = act['logit'] # B, M, action_type_sizeaction_type_logit = action_logit.reshape(B, A, *action_logit.shape[1:])value = self.critic_head(critic_embedding)value_pred = value['pred']value_type_pred = value_pred.reshape(B, A, *value_pred.shape[1:])value_output_pred = torch.mean(value_type_pred, 1).unsqueeze(-1)return {'logit': action_type_logit, 'value': value_output_pred}
寫在最后
到這里就結束了,在這次PPO實戰中,給我最大的感悟就是真正自己在具體應用中實踐完一套RL流程會遇到非常多的問題,遠遠難于學習一下算法理論或者跑幾個官方例程。 真正實踐起來還有很多的遺留問題,例如狀態空間處理、動作空間的選擇,獎勵設置,狀態標準化等等等等,博主對這些知識了解的還不夠系統,日后學習到比較系統的程度之后再進行總結。
總結
- 上一篇: 为Oracle数据库在linux上配置h
- 下一篇: 浅谈二次剩余