莫烦-强化学习
強(qiáng)化學(xué)習(xí)
Q-Learning
簡單流程
''' # T 就是寶藏的位置, o 是探索者的位置(主循環(huán))偽代碼: Initalize Q(s, a) arbitrarily #Q(s, a):Q表 每個狀態(tài)對應(yīng)的動作的Q值 Repeat (for each episode):Initialize SRepeat (for each step of episode): Choose a from s using policy derived from Q (e.g.,E-greedy)Take action a, observe r, s'Q(s,a)←Q(s,a) +a[r + gamma*maxa' Q(s',a')- Q(s,a)]s←s';until S is terminalQ-learning 是一種記錄行為值 (Q value) 的方法, 每種在一定狀態(tài)的行為都會有一個值 Q(s, a), 就是說 行為 a 在 s 狀態(tài)的值是 Q(s, a). s 在上面的探索者游戲中, 就是 o 所在的地點了. 而每一個地點探索者都能做出兩個行為 left/right, 這就是探索者的所有可行的 a 啦.如果在某個地點 s1, 探索者計算了他能有的兩個行為, a1/a2=left/right, 計算結(jié)果是 Q(s1, a1) > Q(s1, a2), 那么探索者就會選擇 left 這個行為. 這就是 Q learning 的行為選擇簡單規(guī)則.''' import numpy as np from numpy.testing._private.utils import jiffies import pandas as pd import time#預(yù)設(shè)值 參數(shù) np.random.seed(2) #reproducible 產(chǎn)生一組偽隨機(jī)數(shù)列 N_STATES = 6 #一維世界的寬度 (起始點到目標(biāo)點的距離) ACTIONS = ['left','right'] #探索者的可用動作 EPSILON = 0.9 #貪婪度 ALPHA = 0.1 # 學(xué)習(xí)率 GAMMA = 0.9 # 衰減率(獎勵遞減值) MAX_EPISODES = 13 # 最大迭代數(shù) FRESH_TIME = 0.01 # 移動時間間隔#建立Q表 def build_q_table(n_states, actions):table = pd.DataFrame(np.zeros((n_states, len(actions))), #初始化為 0 表大小為 N_STATES * ACTIONScolumns = actions, #對應(yīng)的是行為名稱)return table """ Q-table:left right 0 0.0 0.0 1 0.0 0.0 2 0.0 0.0 3 0.0 0.0 4 0.0 0.0 5 0.0 0.0 """''' 定義探索者是如何挑選行為的.epsilon greedy:因為在初始階段, 隨機(jī)的探索環(huán)境, 往往比固定的行為模式要好, 所以這也是累積經(jīng)驗的階段, 我們希望探索者不會那么貪婪(greedy). 所以 EPSILON 就是用來控制貪婪程度的值.EPSILON 可以隨著探索時間不斷提升(越來越貪婪), 這個例子中, 固定成 EPSILON = 0.9: 90% 的時間是選擇最優(yōu)策略, 10% 的時間來探索. ''' #選動作 :根據(jù)所在狀態(tài)和Q-table中的值選擇動作 即在某個 state 地點, 選擇行為 def choose_action(state, q_table):state_actions = q_table.iloc[state, :] # 選出這個 state 的所有 action 值if np.random.uniform() > EPSILON or state_actions.all() == 0: # 非貪婪 or 或者這個 state 還沒有探索過action_name = np.random.choice(ACTIONS) #隨機(jī)選取一個行動else:action_name = state_actions.argmax() #貪婪模式 選擇Q值最大的行動return action_name''' 創(chuàng)建環(huán)境:探索者具體怎么探索 做出行為后, 環(huán)境也要給我們的行為一個反饋, 反饋出下個 state (S_) 和 在上個 state (S) 做出 action (A) 所得到的 reward (R). 這里定義的規(guī)則就是, 只有當(dāng) o 移動到了 T, 探索者才會得到唯一的一個獎勵, 獎勵值 R=1, 其他情況都沒有獎勵. ''' def get_env_feedback(S, A):if A == 'right':if S == N_STATES - 2:S_ = 'terminal' #終止R = 1 # 獎勵為1else:S_ = S + 1 #當(dāng)前狀態(tài)+1R = 0else: #向左移動 R = 0 #獎勵為0 if S == 0:S_ = Selse: S_ = S - 1return S_, R#環(huán)境更新 def update_env(S, episode, step_counter):# This is how environment be updatedenv_list = ['-']*(N_STATES-1) + ['T'] # '---------T' our environmentif S == 'terminal':interaction = 'Episode %s: total_steps = %s' % (episode+1, step_counter)print('\r{}'.format(interaction), end='')time.sleep(2)print('\r ', end='')else:env_list[S] = 'o'interaction = ''.join(env_list)print('\r{}'.format(interaction), end='')time.sleep(FRESH_TIME)# 主循環(huán) ''' Initialize Q(s, a) arbitrarily #Q(s, a):Q表 每個狀態(tài)對應(yīng)的動作的Q值 Repeat (for each episode):Initialize SRepeat (for each step of episode): Choose a from s using policy derived from Q (e.g.,E-greedy)Take action a, observe r, s'Q(s,a)←Q(s,a) +a[r + gamma*max a' Q(s',a')- Q(s,a)]s←s';until S is terminal ''' def RL():q_table = build_q_table(N_STATES, ACTIONS) # 初始 q tablefor episode in range(MAX_EPISODES):step_counter = 0 S = 0 # 回合初始位置is_terminated = False # 是否回合結(jié)束update_env(S, episode, step_counter) # 環(huán)境更新while not is_terminated:A = choose_action(S, q_table) # 選行為S_, R = get_env_feedback(S, A) # 實施行為并得到環(huán)境的反饋q_predict = q_table.loc[S, A] # 估算的(狀態(tài)-行為)值if S_ != 'terminal': #如果沒有終止q_target = R + GAMMA * q_table.loc[S_, :].max() #q_target 為真實值else:q_target - R # # 實際的(狀態(tài)-行為)值 (回合結(jié)束)is_terminated = Trueq_table.loc[S, A] += ALPHA * (q_target - q_predict) # q_table 更新S = S_ # 探索者移動到下一個 stateupdate_env(S, episode, step_counter + 1) # 環(huán)境更新step_counter += 1return q_tableif __name__ == '__main__':q_table = RL()print('\r\nQ-table:\n')print(q_table)運行環(huán)境
""" Reinforcement learning maze example. Red rectangle: explorer. Black rectangles: hells [reward = -1]. Yellow bin circle: paradise [reward = +1]. All other states: ground [reward = 0]. This script is the environment part of this example. The RL is in RL_brain.py. View more on my tutorial page: https://morvanzhou.github.io/tutorials/ """import numpy as np import time import sys if sys.version_info.major == 2:import Tkinter as tk else:import tkinter as tkUNIT = 100 # pixels MAZE_H = 5 # grid height MAZE_W = 5 # grid widthclass Maze(tk.Tk, object):def __init__(self):super(Maze, self).__init__()self.action_space = ['u', 'd', 'l', 'r']self.n_actions = len(self.action_space)self.title('maze')self.geometry('{0}x{1}'.format(MAZE_H * UNIT, MAZE_H * UNIT))self._build_maze()def _build_maze(self):self.canvas = tk.Canvas(self, bg='green',height=MAZE_H * UNIT,width=MAZE_W * UNIT)# create gridsfor c in range(0, MAZE_W * UNIT, UNIT):x0, y0, x1, y1 = c, 0, c, MAZE_H * UNITself.canvas.create_line(x0, y0, x1, y1)for r in range(0, MAZE_H * UNIT, UNIT):x0, y0, x1, y1 = 0, r, MAZE_W * UNIT, rself.canvas.create_line(x0, y0, x1, y1)# create originorigin = np.array([20, 20])# hellhell1_center = origin + np.array([UNIT * 2, UNIT])self.hell1 = self.canvas.create_rectangle(hell1_center[0] - 15, hell1_center[1] - 15,hell1_center[0] + 15, hell1_center[1] + 15,fill='black')# hellhell2_center = origin + np.array([UNIT, UNIT * 2])self.hell2 = self.canvas.create_rectangle(hell2_center[0] - 15, hell2_center[1] - 15,hell2_center[0] + 15, hell2_center[1] + 15,fill='black')# create ovaloval_center = origin + UNIT * 2self.oval = self.canvas.create_oval(oval_center[0] - 15, oval_center[1] - 15,oval_center[0] + 15, oval_center[1] + 15,fill='yellow')# create red rectself.rect = self.canvas.create_rectangle(origin[0] - 15, origin[1] - 15,origin[0] + 15, origin[1] + 15,fill='red')# pack allself.canvas.pack()def reset(self):self.update()time.sleep(0.5)self.canvas.delete(self.rect)origin = np.array([20, 20])self.rect = self.canvas.create_rectangle(origin[0] - 15, origin[1] - 15,origin[0] + 15, origin[1] + 15,fill='red')# return observationreturn self.canvas.coords(self.rect)def step(self, action):s = self.canvas.coords(self.rect)base_action = np.array([0, 0])if action == 0: # upif s[1] > UNIT:base_action[1] -= UNITelif action == 1: # downif s[1] < (MAZE_H - 1) * UNIT:base_action[1] += UNITelif action == 2: # rightif s[0] < (MAZE_W - 1) * UNIT:base_action[0] += UNITelif action == 3: # leftif s[0] > UNIT:base_action[0] -= UNITself.canvas.move(self.rect, base_action[0], base_action[1]) # move agents_ = self.canvas.coords(self.rect) # next state# reward functionif s_ == self.canvas.coords(self.oval):reward = 1done = Trues_ = 'terminal'elif s_ in [self.canvas.coords(self.hell1), self.canvas.coords(self.hell2)]:reward = -1done = Trues_ = 'terminal'else:reward = 0done = Falsereturn s_, reward, donedef render(self):time.sleep(0.1)self.update()def update():for t in range(10):s = env.reset()while True:env.render()a = 1s, r, done = env.step(a)if done:breakif __name__ == '__main__':env = Maze()env.after(100, update)env.mainloop()構(gòu)建Q-Learning類
""" Reinforcement learning maze example. Red rectangle: explorer. Black rectangles: hells [reward = -1]. Yellow bin circle: paradise [reward = +1]. All other states: ground [reward = 0]. This script is the environment part of this example. The RL is in RL_brain.py. View more on my tutorial page: https://morvanzhou.github.io/tutorials/ """import numpy as np import time import sys if sys.version_info.major == 2:import Tkinter as tk else:import tkinter as tkUNIT = 100 # pixels MAZE_H = 5 # grid height MAZE_W = 5 # grid widthclass Maze(tk.Tk, object):def __init__(self):super(Maze, self).__init__()self.action_space = ['u', 'd', 'l', 'r']self.n_actions = len(self.action_space)self.title('maze')self.geometry('{0}x{1}'.format(MAZE_H * UNIT, MAZE_H * UNIT))self._build_maze()def _build_maze(self):self.canvas = tk.Canvas(self, bg='green',height=MAZE_H * UNIT,width=MAZE_W * UNIT)# create gridsfor c in range(0, MAZE_W * UNIT, UNIT):x0, y0, x1, y1 = c, 0, c, MAZE_H * UNITself.canvas.create_line(x0, y0, x1, y1)for r in range(0, MAZE_H * UNIT, UNIT):x0, y0, x1, y1 = 0, r, MAZE_W * UNIT, rself.canvas.create_line(x0, y0, x1, y1)# create originorigin = np.array([20, 20])# hellhell1_center = origin + np.array([UNIT * 2, UNIT])self.hell1 = self.canvas.create_rectangle(hell1_center[0] - 15, hell1_center[1] - 15,hell1_center[0] + 15, hell1_center[1] + 15,fill='black')# hellhell2_center = origin + np.array([UNIT, UNIT * 2])self.hell2 = self.canvas.create_rectangle(hell2_center[0] - 15, hell2_center[1] - 15,hell2_center[0] + 15, hell2_center[1] + 15,fill='black')# create ovaloval_center = origin + UNIT * 2self.oval = self.canvas.create_oval(oval_center[0] - 15, oval_center[1] - 15,oval_center[0] + 15, oval_center[1] + 15,fill='yellow')# create red rectself.rect = self.canvas.create_rectangle(origin[0] - 15, origin[1] - 15,origin[0] + 15, origin[1] + 15,fill='red')# pack allself.canvas.pack()def reset(self):self.update()time.sleep(0.5)self.canvas.delete(self.rect)origin = np.array([20, 20])self.rect = self.canvas.create_rectangle(origin[0] - 15, origin[1] - 15,origin[0] + 15, origin[1] + 15,fill='red')# return observationreturn self.canvas.coords(self.rect)def step(self, action):s = self.canvas.coords(self.rect)base_action = np.array([0, 0])if action == 0: # upif s[1] > UNIT:base_action[1] -= UNITelif action == 1: # downif s[1] < (MAZE_H - 1) * UNIT:base_action[1] += UNITelif action == 2: # rightif s[0] < (MAZE_W - 1) * UNIT:base_action[0] += UNITelif action == 3: # leftif s[0] > UNIT:base_action[0] -= UNITself.canvas.move(self.rect, base_action[0], base_action[1]) # move agents_ = self.canvas.coords(self.rect) # next state# reward functionif s_ == self.canvas.coords(self.oval):reward = 1done = Trues_ = 'terminal'elif s_ in [self.canvas.coords(self.hell1), self.canvas.coords(self.hell2)]:reward = -1done = Trues_ = 'terminal'else:reward = 0done = Falsereturn s_, reward, donedef render(self):time.sleep(0.1)self.update()def update():for t in range(10):s = env.reset()while True:env.render()a = 1s, r, done = env.step(a)if done:breakif __name__ == '__main__':env = Maze()env.after(100, update)env.mainloop()運行Q-Learning
''' 讓探索者學(xué)會走迷宮. 黃色的是天堂 (reward 1), 黑色的地獄 (reward -1). 大多數(shù) RL 是由 reward 導(dǎo)向的, 所以定義 reward 是 RL 中比較重要的一點.Reinforcement learning maze example. Red rectangle: explorer. Black rectangles: hells [reward = -1]. Yellow bin circle: paradise [reward = +1]. All other states: ground [reward = 0]. This script is the main part which controls the update method of this example. The RL is in RL_brain.py. View more on my tutorial page: https://morvanzhou.github.io/tutorials/ '''from maze_env import Maze from RL_brain import QLearningTable#Updata def update():for episode in range(100):#初始化 觀察值 observation = env.reset()while True:#刷新環(huán)境env.render()#RL 大腦根據(jù) state 的觀測值挑選 actionaction = RL.choose_action(str(observation))#探索者在環(huán)境中實施這個 action, 并得到環(huán)境返回的下一個 state 觀測值, reward 和done (是否是掉下地獄或者升上天堂)observation_next, reward, done = env.step(action)#RL 從這個序列 (state, action, reward, state_) 中學(xué)習(xí)RL.learn(str(observation), action, reward, str(observation_next))# 將下一個 state 的值傳到下一次循環(huán)observation = observation_next#如果掉下地獄或者升上天堂, 這回合就結(jié)束了if done:break# 結(jié)束游戲并關(guān)閉窗口print('game over')env.destroy() if __name__ == '__main__':env = Maze()RL = QLearningTable(actions=list(range(env.n_actions)))env.after(100, update)env.mainloop()Sarsa
簡單流程
''' # T 就是寶藏的位置, o 是探索者的位置(主循環(huán))偽代碼: Initialize Q(s, a) arbitrarily #Q(s, a):Q表 每個狀態(tài)對應(yīng)的動作的Q值 Repeat (for each episode):Initialize SChoose a from s using policy derived from Q (e.g.,E-greedy)Repeat (for each step of episode): Take action a, observe r, s'Choose a' from s' using policy derived from Q (e.g.,E-greedy)Q(s,a)←Q(s,a) +alpha[r + gamma*Q(s',a')- Q(s,a)]s←s',a←a';until S is terminalSarsa 相對于 Q-learning, 更加的膽小. 因為 Q-learning 永遠(yuǎn)都是想著 maxQ 最大化, 因為這個 maxQ 而變得貪婪, 不考慮其他非 maxQ 的結(jié)果. 我們可以理解成 Q-learning 是一種貪婪, 大膽, 勇敢的算法, 對于錯誤, 死亡并不在乎. 而 Sarsa 是一種保守的算法, 他在乎每一步?jīng)Q策, 對于錯誤和死亡比較銘感. 這一點我們會在可視化的部分看出他們的不同. 兩種算法都有他們的好處, 比如在實際中, 你比較在乎機(jī)器的損害, 用一種保守的算法, 在訓(xùn)練時就能減少損壞的次數(shù).''' import numpy as np from numpy.testing._private.utils import jiffies import pandas as pd import time#預(yù)設(shè)值 參數(shù) np.random.seed(2) #reproducible 產(chǎn)生一組偽隨機(jī)數(shù)列 N_STATES = 6 #一維世界的寬度 (起始點到目標(biāo)點的距離) ACTIONS = ['left','right'] #探索者的可用動作 EPSILON = 0.9 #貪婪度 ALPHA = 0.1 # 學(xué)習(xí)率 GAMMA = 0.9 # 衰減率(獎勵遞減值) MAX_EPISODES = 13 # 最大迭代數(shù) FRESH_TIME = 0.01 # 移動時間間隔#建立Q表 def build_s_table(n_states, actions):table = pd.DataFrame(np.zeros((n_states, len(actions))), #初始化為 0 表大小為 N_STATES * ACTIONScolumns = actions, #對應(yīng)的是行為名稱)return table """ Q-table:left right 0 0.0 0.0 1 0.0 0.0 2 0.0 0.0 3 0.0 0.0 4 0.0 0.0 5 0.0 0.0 """''' 定義探索者是如何挑選行為的.epsilon greedy:因為在初始階段, 隨機(jī)的探索環(huán)境, 往往比固定的行為模式要好, 所以這也是累積經(jīng)驗的階段, 我們希望探索者不會那么貪婪(greedy). 所以 EPSILON 就是用來控制貪婪程度的值.EPSILON 可以隨著探索時間不斷提升(越來越貪婪), 這個例子中, 固定成 EPSILON = 0.9: 90% 的時間是選擇最優(yōu)策略, 10% 的時間來探索. ''' #選動作 :根據(jù)所在狀態(tài)和Q-table中的值選擇動作 即在某個 state 地點, 選擇行為 def choose_action(state, q_table):state_actions = q_table.iloc[state, :] # 選出這個 state 的所有 action 值if np.random.uniform() > EPSILON or state_actions.all() == 0: # 非貪婪 or 或者這個 state 還沒有探索過action_name = np.random.choice(ACTIONS) #隨機(jī)選取一個行動else:action_name = state_actions.argmax() #貪婪模式 選擇Q值最大的行動return action_name''' 創(chuàng)建環(huán)境:探索者具體怎么探索 做出行為后, 環(huán)境也要給我們的行為一個反饋, 反饋出下個 state (S_) 和 在上個 state (S) 做出 action (A) 所得到的 reward (R). 這里定義的規(guī)則就是, 只有當(dāng) o 移動到了 T, 探索者才會得到唯一的一個獎勵, 獎勵值 R=1, 其他情況都沒有獎勵. ''' def get_env_feedback(S, A):if A == 'right':if S == N_STATES - 2:S_ = 'terminal' #終止R = 1 # 獎勵為1else:S_ = S + 1 #當(dāng)前狀態(tài)+1R = 0else: #向左移動 R = 0 #獎勵為0 if S == 0:S_ = Selse: S_ = S - 1return S_, R#環(huán)境更新 def update_env(S, episode, step_counter):# This is how environment be updatedenv_list = ['-']*(N_STATES-1) + ['T'] # '---------T' our environmentif S == 'terminal':interaction = 'Episode %s: total_steps = %s' % (episode+1, step_counter)print('\r{}'.format(interaction), end='')time.sleep(2)print('\r ', end='')else:env_list[S] = 'o'interaction = ''.join(env_list)print('\r{}'.format(interaction), end='')time.sleep(FRESH_TIME)# 主循環(huán) ''' Initialize Q(s, a) arbitrarily #Q(s, a):Q表 每個狀態(tài)對應(yīng)的動作的Q值 Repeat (for each episode):Initialize SRepeat (for each step of episode): Choose a from s using policy derived from Q (e.g.,E-greedy)Take action a, observe r, s'Q(s,a)←Q(s,a) +a[r + gamma*max a' Q(s',a')- Q(s,a)]s←s';until S is terminal ''' def RL():q_table = build_q_table(N_STATES, ACTIONS) # 初始 q tablefor episode in range(MAX_EPISODES):step_counter = 0 S = 0 # 回合初始位置is_terminated = False # 是否回合結(jié)束update_env(S, episode, step_counter) # 環(huán)境更新while not is_terminated:A = choose_action(S, q_table) # 選行為S_, R = get_env_feedback(S, A) # 實施行為并得到環(huán)境的反饋q_predict = q_table.loc[S, A] # 估算的(狀態(tài)-行為)值if S_ != 'terminal': #如果沒有終止q_target = R + GAMMA * q_table.loc[S_, :].max() #q_target 為真實值else:q_target - R # # 實際的(狀態(tài)-行為)值 (回合結(jié)束)is_terminated = Trueq_table.loc[S, A] += ALPHA * (q_target - q_predict) # q_table 更新S = S_ # 探索者移動到下一個 stateupdate_env(S, episode, step_counter + 1) # 環(huán)境更新step_counter += 1return q_tableif __name__ == '__main__':q_table = RL()print('\r\nQ-table:\n')print(q_table)構(gòu)建Sarsa類
""" This part of code is the Q learning brain, which is a brain of the agent. All decisions are made in here.View more on my tutorial page: https://morvanzhou.github.io/tutorials/ """''' Sarsa: Initialize Q(s, a) arbitrarily #Q(s, a):Q表 每個狀態(tài)對應(yīng)的動作的Q值 Repeat (for each episode):Initialize SChoose a from s using policy derived from Q (e.g.,E-greedy)Repeat (for each step of episode): Take action a, observe r, s'Choose a' from s' using policy derived from Q (e.g.,E-greedy)Q(s,a)←Q(s,a) +alpha[r + gamma*Q(s',a')- Q(s,a)]s←s',a←a';until S is terminal'''import numpy as np import pandas as pdclass RL(object):def __init__(self, action_space, learning_rate=0.01, reward_decay=0.9, e_greedy=0.9):self.actions = action_space # a listself.lr = learning_rateself.gamma = reward_decayself.epsilon = e_greedyself.q_table = pd.DataFrame(columns=self.actions, dtype=np.float64)def check_state_exist(self, state):if state not in self.q_table.index:# append new state to q tableself.q_table = self.q_table.append(pd.Series([0]*len(self.actions),index=self.q_table.columns,name=state,))def check_state_exist2(self,state):if state not in self.q_table.index:to_be_append = pd.Series([0] * len(self.actions),index = self.q_table.columns,name = state,)self.q_table = self.q_table.append(to_be_append)def choose_action(self, observation):self.check_state_exist(observation)# action selectionif np.random.rand() < self.epsilon:# choose best actionstate_action = self.q_table.loc[observation, :]# some actions may have the same value, randomly choose on in these actionsaction = np.random.choice(state_action[state_action == np.max(state_action)].index)else:# choose random actionaction = np.random.choice(self.actions)return actiondef learn(self, *args):pass# off-policy class QLearningTable(RL):def __init__(self, actions, learning_rate=0.01, reward_decay=0.9, e_greedy=0.9):super(QLearningTable, self).__init__(actions, learning_rate, reward_decay, e_greedy)def learn(self, s, a, r, s_):self.check_state_exist(s_)q_predict = self.q_table.loc[s, a]if s_ != 'terminal':q_target = r + self.gamma * self.q_table.loc[s_, :].max() # next state is not terminalelse:q_target = r # next state is terminalself.q_table.loc[s, a] += self.lr * (q_target - q_predict) # update# on-policy class SarsaTable(RL):def __init__(self, actions, learning_rate=0.01, reward_decay=0.9, e_greedy=0.9):super(SarsaTable, self).__init__(actions, learning_rate, reward_decay, e_greedy)def learn(self, s, a, r, s_, a_):self.check_state_exist(s_)q_predict = self.q_table.loc[s, a]if s_ != 'terminal':q_target = r + self.gamma * self.q_table.loc[s_, a_] # next state is not terminalelse:q_target = r # next state is terminalself.q_table.loc[s, a] += self.lr * (q_target - q_predict) # updateclass SarsaLambdaTable(RL):def __init__(self, actions, learning_rate=0.01, reward_decay=0.9, e_greedy=0.9, trace_decay=0.9):super(SarsaLambdaTable, self).__init__(actions, learning_rate, reward_decay, e_greedy)# 后向觀測算法, eligibility trace.self.lambda_ = trace_decayself.eligibility_trace = self.q_table.copy() # 空的 eligibility trace 表def learn(self, s, a, r, s_, a_):# 這部分和 Sarsa 一樣self.check_state_exist2(s_)q_predict = self.q_table.loc[s, a]if s_ != 'terminal':q_target = r + self.gamma * self.q_table.loc[s_, a_] # next state is not terminalelse:q_target = r # next state is terminalerror = q_target - q_predict# 這里開始不同:# 對于經(jīng)歷過的 state-action, 我們讓他+1, 證明他是得到 reward 路途中不可或缺的一環(huán)self.eligibility_trace.loc[s, a] += 1 # 更有效的方式:self.eligibility_trace.loc[s, :] *= 0self.eligibility_trace.loc[s, a] = 1# Q table 更新self.q_table += self.lr * error * self.eligibility_trace# 隨著時間衰減 eligibility trace 的值, 離獲取 reward 越遠(yuǎn)的步, 他的"不可或缺性"越小self.eligibility_trace *= self.gamma*self.lambda_運行Sarsa
from maze_env import Maze from RL_brain import SarsaTabledef update():for episode in range(100):observation = env.reset()action = RL.choose_action(str(observation))while True:env.render()observation_, reward, done = env.step(action)# RL choose action based on next observationaction_ = RL.choose_action(str(observation_))# RL learn from this transition (s, a, r, s, a) ==> SarsaRL.learn(str(observation), action, reward, str(observation_), action_)# swap observation and actionobservation = observation_action = action_# break while loop when end of this episodeif done:break# end of gameprint('game over')env.destroy()if __name__ == '__main__':env = Maze()RL = SarsaTable(actions=list(range(env.n_actions)))env.after(100, update)env.mainloop()Deep-Q-Learning-Network(DQN)
環(huán)境配置
""" Reinforcement learning maze example. Red rectangle: explorer. Black rectangles: hells [reward = -1]. Yellow bin circle: paradise [reward = +1]. All other states: ground [reward = 0]. This script is the environment part of this example. The RL is in RL_brain.py. View more on my tutorial page: https://morvanzhou.github.io/tutorials/ """ import numpy as np import time import sys if sys.version_info.major == 2:import Tkinter as tk else:import tkinter as tkUNIT = 40 # pixels MAZE_H = 4 # grid height MAZE_W = 4 # grid widthclass Maze(tk.Tk, object):def __init__(self):super(Maze, self).__init__()self.action_space = ['u', 'd', 'l', 'r']self.n_actions = len(self.action_space)self.n_features = 2self.title('maze')self.geometry('{0}x{1}'.format(MAZE_H * UNIT, MAZE_H * UNIT))self._build_maze()def _build_maze(self):self.canvas = tk.Canvas(self, bg='white',height=MAZE_H * UNIT,width=MAZE_W * UNIT)# create gridsfor c in range(0, MAZE_W * UNIT, UNIT):x0, y0, x1, y1 = c, 0, c, MAZE_H * UNITself.canvas.create_line(x0, y0, x1, y1)for r in range(0, MAZE_H * UNIT, UNIT):x0, y0, x1, y1 = 0, r, MAZE_W * UNIT, rself.canvas.create_line(x0, y0, x1, y1)# create originorigin = np.array([20, 20])# hellhell1_center = origin + np.array([UNIT * 2, UNIT])self.hell1 = self.canvas.create_rectangle(hell1_center[0] - 15, hell1_center[1] - 15,hell1_center[0] + 15, hell1_center[1] + 15,fill='black')# hell# hell2_center = origin + np.array([UNIT, UNIT * 2])# self.hell2 = self.canvas.create_rectangle(# hell2_center[0] - 15, hell2_center[1] - 15,# hell2_center[0] + 15, hell2_center[1] + 15,# fill='black')# create ovaloval_center = origin + UNIT * 2self.oval = self.canvas.create_oval(oval_center[0] - 15, oval_center[1] - 15,oval_center[0] + 15, oval_center[1] + 15,fill='yellow')# create red rectself.rect = self.canvas.create_rectangle(origin[0] - 15, origin[1] - 15,origin[0] + 15, origin[1] + 15,fill='red')# pack allself.canvas.pack()def reset(self):self.update()time.sleep(0.1)self.canvas.delete(self.rect)origin = np.array([20, 20])self.rect = self.canvas.create_rectangle(origin[0] - 15, origin[1] - 15,origin[0] + 15, origin[1] + 15,fill='red')# return observationreturn (np.array(self.canvas.coords(self.rect)[:2]) - np.array(self.canvas.coords(self.oval)[:2]))/(MAZE_H*UNIT)def step(self, action):s = self.canvas.coords(self.rect)base_action = np.array([0, 0])if action == 0: # upif s[1] > UNIT:base_action[1] -= UNITelif action == 1: # downif s[1] < (MAZE_H - 1) * UNIT:base_action[1] += UNITelif action == 2: # rightif s[0] < (MAZE_W - 1) * UNIT:base_action[0] += UNITelif action == 3: # leftif s[0] > UNIT:base_action[0] -= UNITself.canvas.move(self.rect, base_action[0], base_action[1]) # move agentnext_coords = self.canvas.coords(self.rect) # next state# reward functionif next_coords == self.canvas.coords(self.oval):reward = 1done = Trueelif next_coords in [self.canvas.coords(self.hell1)]:reward = -1done = Trueelse:reward = 0done = Falses_ = (np.array(next_coords[:2]) - np.array(self.canvas.coords(self.oval)[:2]))/(MAZE_H*UNIT)return s_, reward, donedef render(self):# time.sleep(0.01)self.update()構(gòu)建DQN類
''' 搭建神經(jīng)網(wǎng)絡(luò) 為了使用 Tensorflow 來實現(xiàn) DQN, 比較推薦的方式是搭建兩個神經(jīng)網(wǎng)絡(luò), target_net 用于預(yù)測q_target 值, 他不會及時更新參數(shù). eval_net 用于預(yù)測 q_eval, 這個神經(jīng)網(wǎng)絡(luò)擁有最新的神經(jīng)網(wǎng)絡(luò)參數(shù). 不過這兩個神經(jīng)網(wǎng)絡(luò)結(jié)構(gòu)是完全一樣的, '''import numpy as np import pandas as pd # import tensorflow as tf import tensorflow.compat.v1 as tf tf.compat.v1.disable_eager_execution() tf.compat.v1.disable_eager_execution() np.random.seed(1) tf.random.set_seed(1)class DeepQNetwork:'''n_actions:神經(jīng)網(wǎng)絡(luò)輸出actions的q值的個數(shù)n_features:接收observation個數(shù) 例如 長寬高learning_rate = 0.01: 學(xué)習(xí)率reward_decay = 0.9: gamma值e_greedy = 0.9:replace_target_iter = 300:隔多少步更新target參數(shù)memory_size = 500:記憶庫容量 可以記多少數(shù)據(jù)batch_size = 32:神經(jīng)網(wǎng)絡(luò)提升 梯度下降e_greey_increment = None: 不斷縮小學(xué)習(xí)范圍output_graph = False: 輸出神經(jīng)網(wǎng)絡(luò)圖'''def __init__(self,n_actions,n_features,learning_rate = 0.01,reward_decay = 0.9,e_greedy = 0.9,replace_target_iter = 300,memory_size = 500,batch_size = 32,e_greedy_increment = None,output_graph = False,):self.n_actions = n_actionsself.n_features = n_featuresself.lr = learning_rateself.gamma = reward_decayself.epsilon_max = e_greedyself.replace_target_iter = replace_target_iterself.memory_size = memory_sizeself.batch_size = batch_sizeself.epsilon_increment = e_greedy_incrementself.epsilon = 0 if e_greedy_increment is not None else self.epsilon_max#學(xué)習(xí)總步數(shù) epsilon根據(jù)步數(shù)提高 # 記錄學(xué)習(xí)次數(shù) (用于判斷是否更換 target_net 參數(shù))self.learn_step_counter = 0#初始化記憶庫 ??? # 初始化全 0 記憶 [s, a, r, s_]self.memory = np.zeros((self.memory_size, n_features*2+2)) #建立神經(jīng)網(wǎng)絡(luò)self._build_net()# 替換 target net 的參數(shù)t_params = tf.get_collection('target_net_params') # 提取 target_net 的參數(shù)e_params = tf.get_collection('eval_net_params') # 提取 eval_net 的參數(shù)self.replace_target_op = [tf.assign(t, e) for t, e in zip(t_params, e_params)] # 更新 target_net 參數(shù)self.sess = tf.compat.v1.Session()#輸出 tensorboard 文件if output_graph:# $ tensorboard --logdir=logs# tf.train.SummaryWriter soon be deprecated, use followingtf.compat.v1.summary.FileWriter("logs/", self.sess.graph)self.sess.run(tf.compat.v1.global_variables_initializer()) #激活self.cost_his = [] #記錄誤差 記錄所有 cost 變化, 用于最后 plot 出來觀看def _build_net(self):# -------------- 創(chuàng)建 eval 神經(jīng)網(wǎng)絡(luò), 及時提升參數(shù) --------------self.s = tf.compat.v1.placeholder(tf.float32, [None, self.n_features], name='s') #用來接收 observationself.q_target = tf.compat.v1.placeholder(tf.float32, [None, self.n_actions], name='Q_target') # 用來接收 q_target 的值, 這個之后會通過計算得到with tf.compat.v1.variable_scope('eval_net'):#c_names(collections_names) 是在更新target_net 參數(shù)時用到#每一層的默認(rèn)參數(shù)c_names, n_l1, w_initializer, b_initializer = ['eval_net_params',tf.compat.v1.GraphKeys.GLOBAL_VARIABLES],10,tf.compat.v1.random_normal_initializer(0., 0.3), tf.compat.v1.constant_initializer(0.1) # config of layers'''c_names: 集合形式,通過該變量調(diào)用參數(shù)n_l1:第一層的神經(jīng)元數(shù)w_initializer、b_initializer:生成隨機(jī)參數(shù)'''# eval_net 的第一層. collections 是在更新 target_net 參數(shù)時會用到with tf.compat.v1.variable_scope('l1'):w1 = tf.compat.v1.get_variable('w1', [self.n_features, n_l1], initializer=w_initializer,collections=c_names)b1 = tf.compat.v1.get_variable('b1', initializer=b_initializer, collections=c_names)l1 = tf.nn.relu(tf.matmul(self.s, w1) + b1)# eval_net 的第二層. collections 是在更新 target_net 參數(shù)時會用到with tf.compat.v1.variable_scope('l2'):w2 = tf.compat.v1.get_variable('w2', [n_l1, self.n_actions], initializer=w_initializer,collections=c_names)b2 = tf.compat.v1.get_variable('b2',[1, self.n_actions], initializer=b_initializer, collections=c_names)self.q_eval = tf.matmul(l1, w2) + b2 # matmul 相乘 q_eval:為q值 有多少行為就有多少q值 為q估計with tf.name_scope('loss'): #求誤差self.loss = tf.reduce_sum(tf.compat.v1.squared_difference(self.q_target, self.q_eval)) with tf.name_scope('train'):# 梯度下降self._train_op = tf.compat.v1.train.RMSPropOptimizer(self.lr).minimize(self.loss)# ---------------- 創(chuàng)建 target 神經(jīng)網(wǎng)絡(luò), 提供 target Q ---------------------self.s_ = tf.compat.v1.placeholder(tf.float32, [None, self.n_features], name='s_') # 接收下個 observationwith tf.compat.v1.variable_scope('target_net'):# c_names(collections_names) 是在更新 target_net 參數(shù)時會用到c_names = ['targe_net_params', tf.compat.v1.GraphKeys.GLOBAL_VARIABLES]# target_net 的第一層. collections 是在更新 target_net 參數(shù)時會用到with tf.compat.v1.variable_scope('l1'):w1 = tf.compat.v1.get_variable('w1', [self.n_features, n_l1], initializer=w_initializer,collections=c_names)b1 = tf.compat.v1.get_variable('b1',[1, n_l1], initializer=b_initializer, collections=c_names)l1 = tf.nn.relu(tf.matmul(self.s_, w1) + b1)# target_net 的第二層. collections 是在更新 target_net 參數(shù)時會用到with tf.compat.v1.variable_scope('l2'):w2 = tf.compat.v1.get_variable('w2', [n_l1, self.n_actions], initializer=w_initializer,collections=c_names)b2 = tf.compat.v1.get_variable('b2',[1, self.n_actions], initializer=b_initializer, collections=c_names)self.q_next = tf.matmul(l1, w2) + b2 #存儲 transition 存儲記憶def store_transition(self, s, a, r, s_):'''s:當(dāng)前的observationa: actionsr: reward s_: 下一步的observation'''if not hasattr(self, 'memory_counter'):self.memory_counter = 0# 記錄一條 [s, a, r, s_] 記錄transition = np.hstack((s, [a, r], s_))# 總 memory 大小是固定的, 如果超出總大小, 舊 memory 就被新 memory 替換index = self.memory_counter % self.memory_size self.memory[index, :] = transition #替換過程self.memory_counter += 1#選取q值最大的動作def choose_action(self, observation):observation = observation[np.newaxis, :] #增加維度if np.random.uniform() < self.epsilon:# 讓 eval_net 神經(jīng)網(wǎng)絡(luò)生成所有 action 的值, 并選擇值最大的 actionactins_value = self.sess.run(self.q_eval, feed_dict = {self.s: observation})action = np.argmax(actins_value)else:action = np.random.randint(0, self.n_actions) #隨機(jī)選擇return action#換參數(shù)def _replace_target_params(self):t_params = tf.compat.v1.get_collection('target_net_params')e_params = tf.compat.v1.get_collection('eval_net_params')self.sess.run([tf.compat.v1.assign(t, 3) for t, e in zip(t_params, e_params)])#學(xué)習(xí)并更新參數(shù)def learn(self):# 檢查是否替換 target_net 參數(shù)if self.learn_step_counter % self.replace_target_iter == 0:self._replace_target_params()print('\ntarget_params_replaced\n') # 從記憶庫memory 中隨機(jī)抽取 batch_size 個記憶if self.memory_counter > self.memory_size:sample_index = np.random.choice(self.memory_size, size = self.batch_size)else:sample_index = np.random.choice(self.memory_counter, size = self.batch_size)batch_memory = self.memory[sample_index, :]#獲取 q_next(target_net 產(chǎn)生的q) 和 q_eval(eval_net 產(chǎn)生的q)'''q_next:接收后x個n_featuresq_eval:接收前x個n_features'''q_next, q_eval = self.sess.run([self.q_next, self.q_eval],feed_dict = {self.s_:batch_memory[:, -self.n_features:], self.s: batch_memory[:, :self.n_features]})# 下面這幾步十分重要. q_next, q_eval 包含所有 action 的值,# 而我們需要的只是已經(jīng)選擇好的 action 的值, 其他的并不需要.# 所以我們將其他的 action 值全變成 0, 將用到的 action 誤差值 反向傳遞回去, 作為更新憑據(jù).# 這是我們最終要達(dá)到的樣子, 比如 q_target - q_eval = [1, 0, 0] - [-1, 0, 0] = [2, 0, 0]# q_eval = [-1, 0, 0] 表示這一個記憶中有我選用過 action 0, 而 action 0 帶來的 Q(s, a0) = -1, 所以其他的 Q(s, a1) = Q(s, a2) = 0.# q_target = [1, 0, 0] 表示這個記憶中的 r+gamma*maxQ(s_) = 1, 而且不管在 s_ 上我們?nèi)×四膫€ action,# 我們都需要對應(yīng)上 q_eval 中的 action 位置, 所以就將 1 放在了 action 0 的位置.# 下面也是為了達(dá)到上面說的目的, 不過為了更方面讓程序運算, 達(dá)到目的的過程有點不同.# 是將 q_eval 全部賦值給 q_target, 這時 q_target-q_eval 全為 0,# 不過 我們再根據(jù) batch_memory 當(dāng)中的 action 這個 column 來給 q_target 中的對應(yīng)的 memory-action 位置來修改賦值.# 使新的賦值為 reward + gamma * maxQ(s_), 這樣 q_target-q_eval 就可以變成我們所需的樣子.# 具體在下面還有一個舉例說明."""假如在這個 batch 中, 我們有2個提取的記憶, 根據(jù)每個記憶可以生產(chǎn)3個 action 的值:q_eval =[[1, 2, 3],[4, 5, 6]]q_target = q_eval =[[1, 2, 3],[4, 5, 6]]然后根據(jù) memory 當(dāng)中的具體 action 位置來修改 q_target 對應(yīng) action 上的值:比如在:記憶 0 的 q_target 計算值是 -1, 而且我用了 action 0;記憶 1 的 q_target 計算值是 -2, 而且我用了 action 2:q_target =[[-1, 2, 3],[4, 5, -2]]所以 (q_target - q_eval) 就變成了:[[(-1)-(1), 0, 0],[0, 0, (-2)-(6)]]最后我們將這個 (q_target - q_eval) 當(dāng)成誤差, 反向傳遞會神經(jīng)網(wǎng)絡(luò).所有為 0 的 action 值是當(dāng)時沒有選擇的 action, 之前有選擇的 action 才有不為0的值.我們只反向傳遞之前選擇的 action 的值,"""q_target = q_eval.copy()batch_index = np.arange(self.bath_size, dtype = np.int32)eval_act_index = batch_memory[:, self.n_features].astype(int)reward = batch_memory[:, self.n_features + 1]q_target[batch_index, eval_act_index] = reward + self.gamma * np.max(q_next, axis=1)# 訓(xùn)練 eval_net_, self.cost = self.sess.run([self._train_op, self.loss],feed_dict = {self.s: batch_memory[:, self.n_features],self.q_target: q_target})self.cost_his.append(self.cost)# 逐漸增加 epsilon, 降低行為的隨機(jī)性self.epsilon = self.epsilon + self.epsilon_increment if self.epsilon < self.epsilon_max else self.epsilon_maxself.learn_step_counter += 1#打印cost變化曲線def plot_cost(self):import matplotlib.pyplot as pltplt.plot(np.arange(len(self.cost_his)), self.cost_his)plt.ylabel('Cost')plt.xlabel('training steps')plt.show()運行DQN
from maze_env import Maze from RL_brain import DeepQNetworkdef run_maze():step = 0 # 用來控制什么時候?qū)W習(xí)for episode in range(300):#初始化環(huán)境observation = env.reset()while True:# 刷新環(huán)境env.render()# DQN 根據(jù)觀測值選擇行為action = RL.choose_action(observation)# 環(huán)境根據(jù)行為給出下一個 state, reward, 是否終止observation_, reward, done = env.step(action)# DQN 存儲記憶RL.store_transition(observation, action, reward, observation_)# 控制學(xué)習(xí)起始時間和頻率 (先累積一些記憶再開始學(xué)習(xí))if (step > 200) and (step % 5 == 0):RL.learn()# 將下一個 state_ 變?yōu)?下次循環(huán)的 stateobservation = observation_# 如果終止, 就跳出循環(huán)if done:breakstep += 1 # 總步數(shù)# end of gameprint('game over')env.destroy()if __name__ == "__main__":env = Maze()RL = DeepQNetwork(env.n_actions, env.n_features,learning_rate = 0.01,reward_decay=0.9,e_greedy=0.9,replace_target_iter=200, # 每 200 步替換一次 target_net 的參數(shù)memory_size=2000, # 記憶上限# output_graph=True # 是否輸出 tensorboard 文件)env.after(100, run_maze)env.mainloop()RL.plot_cost() # 觀看神經(jīng)網(wǎng)絡(luò)的誤差曲線Gym模擬
CartPole-v0
import gym from RL_brain import DeepQNetworkenv = gym.make('CartPole-v0') # 定義使用 gym 庫中的那一個環(huán)境 env = env.unwrapped # 不做這個會有很多限制print(env.action_space) # 查看這個環(huán)境中可用的 action 有多少個 print(env.observation_space) # 查看這個環(huán)境中可用的 state 的 observation 有多少個 print(env.observation_space.high) # 查看 observation 最高取值 print(env.observation_space.low) # 查看 observation 最低取值# 定義使用 DQN 的算法 RL = DeepQNetwork(n_actions = env.action_space.n,n_features = env.observation_space.shape[0],learning_rate = 0.01,e_greedy = 0.9,replace_target_iter = 100,memory_size = 2000,e_greedy_increment = 0.0008,)total_steps = 0 # 記錄步數(shù)for i_episode in range(100):#初始化環(huán)境observation = env.reset()ep_r = 0while True:env.render() # 刷新環(huán)境action = RL.choose_action(observation) # 選行為observation_, reward, done, info = env.step(action) # 獲取下一個 statex, x_dot, theta, theta_dot = observation_ # 細(xì)分開, 為了修改原配的 reward# x 是車的水平位移, 所以 r1 是車越偏離中心, 分越少# theta 是棒子離垂直的角度, 角度越大, 越不垂直. 所以 r2 是棒越垂直, 分越高x, x_dot, theta, theta_dot = observation_r1 = (env.x_threshold - abs(x))/env.x_threshold - 0.8r2 = (env.theta_threshold_radians - abs(theta))/env.theta_threshold_radians - 0.5reward = r1 + r2 # 總 reward 是 r1 和 r2 的結(jié)合, 既考慮位置, 也考慮角度, 這樣 DQN 學(xué)習(xí)更有效率# DQN 存儲記憶RL.store_transition(observation, action, reward, observation_)# 控制學(xué)習(xí)起始時間和頻率 (先累積一些記憶再開始學(xué)習(xí))if total_steps > 1000 :RL.learn()ep_r += rewardif done:print('episode: ', i_episode,'ep_r: ', round(ep_r, 2),' epsilon: ', round(RL.epsilon, 2))breakobservation = observation_total_steps += 1 RL.plot_cost()MountainCar-v0
import gym from RL_brain import DeepQNetworkenv = gym.make('MountainCar-v0') env = env.unwrappedprint(env.action_space) print(env.observation_space) print(env.observation_space.high) print(env.observation_space.low)RL = DeepQNetwork(n_actions=3,n_features=2,learning_rate=0.01,e_greedy=0.9,replace_target_iter=300,memory_size=3000,e_greedy_increment=0.0001)total_step = 0for i_episode in range(10):observation = env.reset()ep_r = 0while True:env.render()action = RL.choose_action(observation)observation_, reward, done, info = env.step(action)position, velocity = observation_reward = abs(position - (0.5))RL.store_transition(observation, action, reward, observation_)if total_step > 1000:RL.learn()ep_r += rewardif done:get = '| Get' if observation_[0] >= env.unwrapped.goal_position else '| ----'print('Epi:',i_episode,get,'| Ep_r:',round(ep_r, 4),'|Epsilon:',round(RL.epsilon,2))breakobservation = observation_total_step += 1 RL.plot_cost()Taxi-v3
import gym from RL_brain import QLearningTable import matplotlib.pyplot as plt import numpy as npenv = gym.make('Taxi-v3') state = env.reset() taxirow, taxicol, passloc, destidx = env.unwrapped.decode(state) print('出租車位置 = {}'.format((taxirow, taxicol))) print('乘客位置 = {}'.format((env.unwrapped.locs[passloc]))) print('目標(biāo)位置 = {}'.format((env.unwrapped.locs[destidx])))RL = QLearningTable(actions=list(range(env.action_space.n))) total_reward = 0 episode_rewards = [] for episode in range(100):observation = env.reset()while True:env.render()action = RL.choose_action(str(observation))observation_next, reward, done, info = env.step(action)total_reward += rewardepisode_rewards.append(total_reward)RL.learn(str(observation),action, reward, str(observation_next))observation = observation_nextif done:break plt.plot(episode_rewards) plt.show() # print('平均回合獎勵 = {} / {} = {}'.format(sum(episode_rewards)),len(episode_rewards), np.mean(episode_rewards))Policy Gradients
強(qiáng)化學(xué)習(xí)是一個通過獎懲來學(xué)習(xí)正確行為的機(jī)制. 家族中有很多種不一樣的成員, 有學(xué)習(xí)獎懲值, 根據(jù)自己認(rèn)為的高價值選行為, 比如 Q-learning, Deep Q Network。
也有不通過分析獎勵值, 直接輸出行為的方法, 即Policy Gradients,簡單而言就是PG加上一個神經(jīng)網(wǎng)絡(luò)來輸出預(yù)測的動作,可以在一個連區(qū)間內(nèi)挑選動作。但是例如Q-learning, 它如果在無窮多的動作中計算價值, 從而選擇行為, 會很慢。
總之,Policy gradient 是 RL 中另外一個大家族, 他不像 Value-based 方法 (Q learning, Sarsa), 但他也要接受環(huán)境信息 (observation), 不同的是他要輸出不是 action 的 value, 而是具體的那一個 action, 這樣 policy gradient 就跳過了 value 這個階段. Policy gradient 最大的一個優(yōu)勢是: 輸出的這個 action 可以是一個連續(xù)的值, 之前我們說到的 value-based 方法輸出的都是不連續(xù)的值, 然后再選擇值最大的 action. 而 policy gradient 可以在一個連續(xù)分布上選取 action。
更新過程,如圖,通過觀察信息選擇了左側(cè)的動作,網(wǎng)絡(luò)想要進(jìn)行反向傳遞,以便于下次被選中的可能性增加,但是由于獎懲機(jī)制的存在,會提示該行為是不好的,那么該動作的選擇可能性降低。如果選擇的動作是右側(cè)的,神經(jīng)網(wǎng)絡(luò)想要進(jìn)行反向傳遞, 使右邊的行為下次被多選一點, 這時, 獎懲信息也來了, 告訴我們這是好行為, 那我們就在這次反向傳遞的時候加大力度, 讓它下次被多選的幅度更大。
算法:
delta(log(Policy(s,a))V) 表示在 狀態(tài) s 對所選動作 a* 的吃驚度, 如果 Policy(s,a) 概率越小, 反向的 log(Policy(s,a)) (即 -log§) 反而越大. 如果在 Policy(s,a) 很小的情況下, 拿到了一個 大的 R, 也就是 大的 V, 那 -delta(log(Policy(s, a))*V) 就更大, 表示更吃驚, (我選了一個不常選的動作, 卻發(fā)現(xiàn)原來它能得到了一個好的 reward, 那我就得對我這次的參數(shù)進(jìn)行一個大幅修改)。
代碼:
主循環(huán)代碼:
import gym from RL_brain import PolicyGradient import matplotlib.pyplot as pltRENDER = False # 在屏幕上顯示模擬窗口會拖慢運行速度, 我們等計算機(jī)學(xué)得差不多了再顯示模擬 DISPLAY_REWARD_THRESHOLD = 400 # 當(dāng) 回合總 reward 大于 400 時顯示模擬窗口env = gym.make('CartPole-v0') env = env.unwrapped # 取消限制 env.seed(1)# 普通的 Policy gradient 方法, 使得回合的 variance 比較大, 所以我們選了一個好點的隨機(jī)種子print(env.action_space) # 顯示可用 action print(env.observation_space) # 顯示可用 state 的 observation print(env.observation_space.high) # 顯示 observation 最高值 print(env.observation_space.low) # 顯示 observation 最低值#定義神經(jīng)網(wǎng)絡(luò) RL = PolicyGradient(n_actions = env.action_space.n,n_features = env.observation_space.shape[0],learning_rate = 0.02,reward_decay = 0.99, #gammaoutput_graph=True, )#主循環(huán) for i_episode in range(3000):observation = env.reset()while True:if RENDER: env.render()action = RL.choose_action(observation)observation_, reward, done, info = env.step(action)RL.store_transition(observation, action, reward)if done:ep_rs_sum = sum(RL.ep_rs)if 'running_reward' not in globals():running_reward = ep_rs_sumelse:running_reward = running_reward * 0.99 + ep_rs_sum * 0.01if running_reward > DISPLAY_REWARD_THRESHOLD:RENDER = True # 判斷是否顯示模擬print("episode:", i_episode, " reward:", int(running_reward))vt = RL.learn() # 學(xué)習(xí), 輸出 vtif i_episode == 0:plt.plot(vt)plt.xlabel('episode steps')plt.ylabel('normalized state-action value')plt.show()breakobservation = observation_PL決策代碼:
''' 搭建 Policy Gradient神經(jīng)網(wǎng)絡(luò) ''' # import tensorflow.compat.v1 as tf import numpy as np import tensorflow as tf tf.compat.v1.disable_eager_execution()class PolicyGradient:def __init__(self, n_actions, n_features, learning_rate=0.01, reward_decay=0.95,output_graph=False):self.n_actions = n_actionsself.n_features = n_featuresself.lr = learning_rateself.gamma = reward_decay #reward 的遞減率self.ep_obs, self.ep_as, self.ep_rs = [], [], [] # 這是我們存儲 回合信息的 list obs:觀測值 as:使用的行動 rs:獲得的獎勵self._build_net() #建立policy 網(wǎng)絡(luò)# self.sess = tf.Session()self.sess = tf.compat.v1.Session()if output_graph:tf.compat.v1.summary.FileWriter("logs/",self.sess.graph)self.sess.run(tf.compat.v1.global_variables_initializer())def _build_net(self):with tf.name_scope('inputs'):self.tf_obs = tf.compat.v1.placeholder(tf.float32, [None,self.n_features], name='observation') #接收Observation# 接收我們在這個回合中選過的 actionsself.tf_acts = tf.compat.v1.placeholder(tf.int32, [None, ], name='actions_num')# 接收每個 state-action 所對應(yīng)的 value (通過 reward 計算)self.tf_vt = tf.compat.v1.placeholder(tf.float32, [None, ], name='actions_value')#建立全連接層layer = tf.compat.v1.layers.dense(inputs = self.tf_obs,units = 10, #神經(jīng)元個數(shù)activation = tf.nn.tanh, #激勵函數(shù)kernel_initializer = tf.random_normal_initializer(mean=0, stddev=0.3),bias_initializer = tf.constant_initializer(0.1),name = 'fc1')#第二層 輸出所有action的值all_act = tf.compat.v1.layers.dense(inputs = layer,units = self.n_actions, #輸出個數(shù) 神經(jīng)元activation = None,# 之后再加 Softmaxkernel_initializer = tf.random_normal_initializer(mean=0, stddev=0.3),bias_initializer = tf.constant_initializer(0.1),name = 'fc2')self.all_act_prob = tf.nn.softmax(all_act, name='act_prob') #激勵函數(shù)Softmax#損失函數(shù)with tf.name_scope('loss'):# 最大化 總體 reward (log_p * R) 就是在最小化 -(log_p * R), 而 tf 的功能里只有最小化 loss# neg_log_prob = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=all_act, labels=self.tf_acts)# 下面的方式是一樣的 更能表示公式: 如果沒有選的動作,one_hot為0 neg_log_prob = tf.reduce_sum(-tf.compat.v1.log(self.all_act_prob) * tf.one_hot(self.tf_acts, self.n_actions), axis=1) #記憶庫中存放的是每個action 的值loss = tf.reduce_mean(neg_log_prob * self.tf_vt )# (vt = 本reward + 衰減的未來reward) 引導(dǎo)參數(shù)的梯度下降'''tf.reduce_mean 函數(shù)用于計算張量tensor沿著指定的數(shù)軸(tensor的某一維度)上的的平均值,主要用作降維或者計算tensor(圖像)的平均值reduce_mean(input_tensor,axis=None,keep_dims=False,name=None,reduction_indices=None)第一個參數(shù)input_tensor: 輸入的待降維的tensor;第二個參數(shù)axis: 指定的軸,如果不指定,則計算所有元素的均值;第三個參數(shù)keep_dims:是否降維度,設(shè)置為True,輸出的結(jié)果保持輸入tensor的形狀,設(shè)置為False,輸出結(jié)果會降低維度;第四個參數(shù)name: 操作的名稱;'''#利用AdamOptimizer 進(jìn)行自適應(yīng)學(xué)習(xí)優(yōu)化with tf.name_scope('train'):#因為Adam算法中只有minimize 所以要將 tf.log加負(fù)號,才能得最大值self.train_op = tf.compat.v1.train.AdamOptimizer(self.lr).minimize(loss)'''行為不再是用 Q value 來選定的, 而是用概率來選定. 即使不用 epsilon-greedy, 也具有一定的隨機(jī)性.'''def choose_action(self, observation): prob_weights = self.sess.run(self.all_act_prob, feed_dict={self.tf_obs:observation[np.newaxis, :]}) # 所有 action 的概率action = np.random.choice(range(prob_weights.shape[1]), p=prob_weights.ravel()) # 根據(jù)概率來選 actionreturn action#存儲回合 transition'''將這一步的 observation, action, reward 加到列表中去. 因為本回合完畢之后要清空列表, 然后存儲下一回合的數(shù)據(jù), 所以我們會在 learn() 當(dāng)中進(jìn)行清空列表的動作.'''def store_transition(self, s, a, r):#清空init中的三個列表self.ep_obs.append(s)self.ep_as.append(a)self.ep_rs.append(r)#學(xué)習(xí)更新參數(shù)'''對這回合的所有 reward 動動手腳, 使他變得更適合被學(xué)習(xí). 第一就是隨著時間推進(jìn), 用 gamma 衰減未來的 reward, 然后為了一定程度上減小 policy gradient 回合 variance, 我們標(biāo)準(zhǔn)化回合的 state-action value 依據(jù)在 Andrej Karpathy 的 blog.'''def learn(self):#衰減,并標(biāo)準(zhǔn)化該回合的rewarddiscounted_ep_rs_norm = self._discount_and_norm_rewards() # 功能再面#在episode下訓(xùn)練self.sess.run(self.train_op, feed_dict={self.tf_obs : np.vstack(self.ep_obs), # shape=[None, n_obs]self.tf_acts : np.array(self.ep_as), # shape=[None, ]self.tf_vt : discounted_ep_rs_norm, # shape=[None,]})self.ep_obs, self.ep_as, self.ep_rs = [], [], [] # 清空回合 datareturn discounted_ep_rs_norm # 返回這一回合的 state-action value#實現(xiàn)對未來 reward 的衰減def _discount_and_norm_rewards(self):discounted_ep_rs = np.zeros_like(self.ep_rs)running_add = 0for t in reversed(range(0, len(self.ep_rs))):running_add = running_add * self.gamma + self.ep_rs[t]discounted_ep_rs[t] = running_adddiscounted_ep_rs -= np.mean(discounted_ep_rs)return discounted_ep_rs:]}) # 所有 action 的概率
action = np.random.choice(range(prob_weights.shape[1]), p=prob_weights.ravel()) # 根據(jù)概率來選 action
return action
總結(jié)
- 上一篇: linux中如何快速进入某个目录
- 下一篇: 简单的分页类