【分层强化学习】HAC源码解读
論文: Learning Multi-Level Hierarchies with Hindsight
代碼github地址:https://github.com/nikhilbarhate99/Hierarchical-Actor-Critic-HAC-PyTorch
HAC理論部分:https://blog.csdn.net/qq_47997583/article/details/126013369
算法偽碼:
簡而言之就是在原理DDPG算法的基礎(chǔ)上,加入分層結(jié)構(gòu),為了提高不同層之間非平穩(wěn)性不能并行訓(xùn)練的問題,在分層結(jié)構(gòu)上加入hindsight的思路修改擴充經(jīng)驗回放池中的數(shù)據(jù)進行訓(xùn)練。
文件結(jié)構(gòu)如上圖所示,其中asset文件夾中為環(huán)境相關(guān)代碼,gif文件夾下為效果圖,preTrained文件夾下保存訓(xùn)練出的參數(shù)文件,DDPG.py文件中是單層加入goal的DDPG算法實現(xiàn),HAC.py為核心的HAC算法的實現(xiàn),train.py中是整個項目訓(xùn)練的主文件,utils.py中是經(jīng)驗回放池的實現(xiàn)代碼,test.py中是測試的代碼。
接下來通過debug代碼走一遍訓(xùn)練的過程了解HAC的實現(xiàn)過程
首先是一些訓(xùn)練的參數(shù)設(shè)定:
下面定義了action的bounds和offset、state的bounds和offset、action和goal的探索噪聲、目標(biāo)狀態(tài)、閾值。在本項目中設(shè)定的goal為position是0.48±0.01,velocity是0.04±0.02。
關(guān)于bounds和offset的設(shè)置,作者在issue中進行了解釋:https://github.com/nikhilbarhate99/Hierarchical-Actor-Critic-HAC-PyTorch/issues/2
本項目中環(huán)境為MountainCarContinuous-h-v1,其中
狀態(tài)空間為連續(xù)的二維信息,
self.observation_space = spaces.Box(low=self.low_state, high=self.high_state, dtype=np.float32)
[position, velocity] between min value = [-1.2, -0.07] and max value = [0.6, 0.07],
動作空間為連續(xù)的一維信息,
self.action_space = spaces.Box(low=self.min_action, high=self.max_action,shape=(1,), dtype=np.float32)
the action space is between [-1, 1]
我的理解是在本項目應(yīng)用的環(huán)境中action和state空間不都是[-1,1]之間,神經(jīng)網(wǎng)絡(luò)最后一層是Tanh激活函數(shù)會將其歸一化到[-1,1]之間,因此需要進行一些修改。
作者使用以下公式實現(xiàn)
action = ( network output (Tanh) * bounds ) + offset
對于action space:
the action space is between (-1, 1), and as the mean value [ (1 + (-1)) / 2 ] is 0 we do not require an offset, and the value of bound = 1, since our network only outputs between (-1, 1), so,
action = ( network output (Tanh) * bounds ) + offset
i.e action = (network output * 1) + 0
對于state space:
here the position variable (-1.2, 0.6) is NOT normalised to (-1,1) and its mean value
[ (0.6 + (-1.2)) / 2 ] is 0.3
action = ( network output (Tanh) * bounds ) + offset
for position variable:
action = (network output * 0.9) + 0.3
this bounds the value of the action to (-1.2, 0.6)
similarly, the velocity variable (-0.07, 0.07) is NOT normalised to (-1,1) and its mean value [ (0.6 + (-1.2)) / 2 ] is 0, so,for velocity variable:
action = (network output * 0.07) + 0
this bounds the value of the action to (-0.07, 0.07)
接下來HAC算法、DDPG算法以及模型參數(shù)文件保存路徑的設(shè)定
接下來創(chuàng)建HAC agent并設(shè)置相關(guān)參數(shù),前面定義了這是一個兩層的網(wǎng)絡(luò),因此HAC類下的HAC屬性為一個包含兩個DDPG的列表。
接下來是整個訓(xùn)練過程
# training procedure for i_episode in range(1, max_episodes+1):agent.reward = 0agent.timestep = 0state = env.reset()# collecting experience in environmentlast_state, done = agent.run_HAC(env, k_level-1, state, goal_state, False)if agent.check_goal(last_state, goal_state, threshold):print("################ Solved! ################ ")name = filename + '_solved'agent.save(directory, name)# update all levelsagent.update(n_iter, batch_size)# logging updates:log_f.write('{},{}\n'.format(i_episode, agent.reward))log_f.flush()if i_episode % save_episode == 0:agent.save(directory, filename)print("Episode: {}\t Reward: {}".format(i_episode, agent.reward))在上面訓(xùn)練代碼中有三行比較核心的代碼,接下來逐一分析
1.跑一層HAC
last_state, done = agent.run_HAC(env, k_level-1, state, goal_state, False) def run_HAC(self, env, i_level, state, goal, is_subgoal_test):next_state = Nonedone = Nonegoal_transitions = []# logging updatesself.goals[i_level] = goal# H attemptsfor _ in range(self.H):# if this is a subgoal test, then next/lower level goal has to be a subgoal testis_next_subgoal_test = is_subgoal_testaction = self.HAC[i_level].select_action(state, goal)# <================ high level policy ================>if i_level > 0:# add noise or take random action if not subgoal testingif not is_subgoal_test:if np.random.random_sample() > 0.2: # p=0.8 exploratopmaction = action + np.random.normal(0, self.exploration_state_noise)action = action.clip(self.state_clip_low, self.state_clip_high)else: # p=0.2 random sampleaction = np.random.uniform(self.state_clip_low, self.state_clip_high)# Determine whether to test subgoal (action)if np.random.random_sample() < self.lamda:is_next_subgoal_test = True# Pass subgoal to lower level next_state, done = self.run_HAC(env, i_level-1, state, action, is_next_subgoal_test)# if subgoal was tested but not achieved, add subgoal testing transitionif is_next_subgoal_test and not self.check_goal(action, next_state, self.threshold):self.replay_buffer[i_level].add((state, action, -self.H, next_state, goal, 0.0, float(done)))# for hindsight action transitionaction = next_state# <================ low level policy ================>else:# add noise or take random action if not subgoal testingif not is_subgoal_test:if np.random.random_sample() > 0.2:action = action + np.random.normal(0, self.exploration_action_noise)action = action.clip(self.action_clip_low, self.action_clip_high)else:action = np.random.uniform(self.action_clip_low, self.action_clip_high)# take primitive actionnext_state, rew, done, _ = env.step(action)if self.render:# env.render() ##########if self.k_level == 2:env.unwrapped.render_goal(self.goals[0], self.goals[1])elif self.k_level == 3:env.unwrapped.render_goal_2(self.goals[0], self.goals[1], self.goals[2])# this is for loggingself.reward += rewself.timestep +=1# <================ finish one step/transition ================># check if goal is achievedgoal_achieved = self.check_goal(next_state, goal, self.threshold)# hindsight action transitionif goal_achieved:self.replay_buffer[i_level].add((state, action, 0.0, next_state, goal, 0.0, float(done)))else:self.replay_buffer[i_level].add((state, action, -1.0, next_state, goal, self.gamma, float(done)))# copy for goal transitiongoal_transitions.append([state, action, -1.0, next_state, None, self.gamma, float(done)])state = next_stateif done or goal_achieved:break# <================ finish H attempts ================># hindsight goal transition# last transition reward and discount is 0goal_transitions[-1][2] = 0.0goal_transitions[-1][5] = 0.0for transition in goal_transitions:# last state is goal for all transitionstransition[4] = next_stateself.replay_buffer[i_level].add(tuple(transition))return next_state, done2.檢查goal是否實現(xiàn)
agent.check_goal(last_state, goal_state, threshold) def check_goal(self, state, goal, threshold):for i in range(self.state_dim):if abs(state[i]-goal[i]) > threshold[i]:return Falsereturn True3.更新agent
agent.update(n_iter, batch_size)HAC.py
def update(self, n_iter, batch_size):for i in range(self.k_level):self.HAC[i].update(self.replay_buffer[i], n_iter, batch_size)DDPG.py
def update(self, buffer, n_iter, batch_size):for i in range(n_iter):# Sample a batch of transitions from replay buffer:state, action, reward, next_state, goal, gamma, done = buffer.sample(batch_size)# convert np arrays into tensorsstate = torch.FloatTensor(state).to(device)action = torch.FloatTensor(action).to(device)reward = torch.FloatTensor(reward).reshape((batch_size,1)).to(device)next_state = torch.FloatTensor(next_state).to(device)goal = torch.FloatTensor(goal).to(device)gamma = torch.FloatTensor(gamma).reshape((batch_size,1)).to(device)done = torch.FloatTensor(done).reshape((batch_size,1)).to(device)# select next actionnext_action = self.actor(next_state, goal).detach()# Compute target Q-value:target_Q = self.critic(next_state, next_action, goal).detach()target_Q = reward + ((1-done) * gamma * target_Q)# Optimize Critic:critic_loss = self.mseLoss(self.critic(state, action, goal), target_Q)self.critic_optimizer.zero_grad()critic_loss.backward()self.critic_optimizer.step()# Compute actor loss:actor_loss = -self.critic(state, self.actor(state, goal), goal).mean()# Optimize the actorself.actor_optimizer.zero_grad()actor_loss.backward()self.actor_optimizer.step()未完待續(xù)
總結(jié)
以上是生活随笔為你收集整理的【分层强化学习】HAC源码解读的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 什么是IDC
- 下一篇: 《华为云NIST CSF实践指南》白皮书