强化学习(六) - 连续空间中的强化学习(RL in Continuous Spaces)及相关实例
強化學習(六) - 連續空間中的強化學習
- 6.1 連續空間中的強化學習
- 6.2 離散空間和連續空間
- 6.3 離散化
- 實例:小車上山
- 6.3.1 相關程序
- 6.3.2 程序注解
- (1) 環境測試
- (2) 離散化
- (3) 模型訓練
- (4) 模型優化
6.1 連續空間中的強化學習
在之前的實例中,狀態和動作的數量受到限制。使用小的,有限的馬爾可夫決策過程(MDP),可以用表,字典或其他有限結構來表示動作價值函數。
例如,考慮下面的非常小的gridworld。假設世界有四個可能的狀態,并且代理有四個可能的操作可供使用(上,下,左,右)。我們可以在表中表示估計的最佳操作價值函數,每個狀態對應一個行,每個動作對應一個列。我們將此表稱為Q表。
但是,具有更大空間的MDP呢?考慮到Q表的每個狀態必須有一行。因此,例如,如果有1000萬個可能的狀態,則Q表必須具有1000萬行。此外,如果狀態空間是連續的實數值的集合(無窮大),那么就不可能以有限的結構表示動作值。
強化學習算法通常分為兩大類別: 基于模型的方法,例如策略迭代。以及需要已知轉換和獎勵模型的值迭代。它們本質上通過動態規劃并使用該模型, 以迭代方式計算期望的價值函數和最優策略,另一方面 蒙特卡洛方法和時間差分學習等,不基于模型的方法不需要明確的模型。它們通過執行探索性動作對環境抽樣,并使用獲得的經驗直接估計價價值函數。這就是強化學習的簡單介紹 當然還有更多內容。
深度強化學習是一個相對較新的術語。 是指使用深度學習(主要是多層神經網絡)解決強化學習問題的方法。 強化學習通常包含有限的 MDP,即狀態和動作數量是有限的。 但是有太多的問題具有非常大的狀態和動作空間,甚至由連續的實數組成。 傳統算法使用表格或字典,或其他有限結構來記錄狀態和動作值,但是不再適合此類問題。 因此,我們首先要考慮的是,如何泛化這些算法以便適合大型連續空間。 這就為開發深度強化學習算法奠定了基礎,包括深度 Q 學習等基于值的技巧,以及直接嘗試優化策略的方法,例如策略梯度。 最后,會使用到結合這兩類方法的更高級方法,即行動者-評論者方法。
6.2 離散空間和連續空間
我們先來看看離散空間和連續空間的含義。回憶下馬爾可夫決策流程的定義, 我們假設任何時間的環境狀態來自于一組潛在狀態,當該組合是有限組合時,我們可以將其稱之為離散狀態空間。動作也類似,如果有一組有限的動作,則表示環境有一個離散動作空間。
離散空間簡化了問題,首先,使我們能夠將任何狀態和動作函數,表示為字典或查詢表。假設有一個狀態價值函數 VVV,它是從一組狀態到實數的映射。如果將狀態表示為整數,則可以將價值函數表示為字典 并將每個狀態當做鍵。類似地, 假設有一個動作價值函數 Q,它將每個狀態動作對映射到一個實數。同樣 我們可以使用字典或將價值函數存儲為表格或矩陣,每行對應一個狀態,每列對應一個動作。
離散空間對很多強化學習算法來說也很關鍵,例如, 在價值迭代中,這個內部 for 循環逐個遍歷每個狀態,并更新相應的估值V(s)V(s)V(s)。如果狀態空間是連續的 則不可能這么操作,循環將永遠持續下去,甚至對于有很多狀態的離散狀態空間來說, 這一流程也很快變得不可行,Q 學習等不基于模型的方法也需要離散空間。這里 我們對狀態 S′S^′S′ 的所有潛在動作執行 max 運算。如果有一組有限的動作, 則很輕松。但是如果動作空間是連續的,這個小小的計算步驟本身就會變成完全失敗的優化問題。那么連續空間到底是什么意思?
連續是離散的反義詞。連續空間并不限定于一組獨特的值, 例如整數。相反, 它可以是一定范圍的值, 通常是實數。這意味著狀態值等量值。例如表示離散情況的條形圖,每個狀態對應一個長條。現在需要轉換成預期范圍內的密度圖。同一記法也擴展到了環境中,狀態不再是單個實數 而是一個此類數字的向量。這樣依然稱之為連續空間 只是不再是一個維度。
在繼續深入講解之前,我們嘗試了解下為何連續狀態空間很重要。它們來自何處?思考一個高級決策制定任務, 例如下棋。經常可以將一組潛在狀態看做離散狀態。每個棋子都在棋盤上的哪個方框內。我們不需要精確地確定,每個棋子處在方框內的哪個位置或朝著哪個方向。雖然我們也可以了解這些細節信息,并思考, 為何你的騎士在瞪著我的王后。但是這些信息與要解決的問題不相關,我們可以從游戲模型中刪除這些信息。通常, 網格環境在強化學習中非常熱門。它們使我們能夠直觀地查看智能體在空間環境中的行為如何,但是現實的物理空間并不能始終清晰地劃分為網格。
動作也可以是連續的動作。例如玩飛鏢的機器人,它必須設置扔飛鏢的高度和角度。選擇相應級別的扔力。即使這些值出現小小的變化,也會對飛鏢最終落在板上的位置有很大的影響。通常。 需要在物理環境中,采取的大多數動作本質上都是連續動作。很明顯。 我們需要修改表示法或算法,或者同時修改二者以便處理連續空間。我們將討論的兩個主要策略是,離散化和函數逼近。
6.3 離散化
顧名思義,離散化就是將連續空間轉換為離散空間。對于某些環境, 離散化狀態空間效果很好,使我們能夠幾乎不加修改就能使用現有的算法。動作也可以離散化,例如, 角可以拆分為完整度數。甚至按 90 度遞增。
如果合適的話,現在假設一個離散化環境中有一些物體, 機器人需要繞過這些障礙物。對于網格表示法,我們只能標記存在物體的單元格 甚至稍微超出范圍,稱之為占據網格。但是如果我們以較大網格離散化,可能會使智能體以為,沒有繞過這些障礙物以抵達目標位置的道路。如果我們能夠根據這些障礙物調整網格,那么可以為智能體找到一條潛在的道路。另一種方法是在需要時將網格拆分為更小的單元格,依然是逼近結果。但是可以讓我們在需要的位置分配更多的狀態表示,這樣比將整個狀態空間拆分為更小的單元格合適。拆分整個狀態空間可能會增加狀態的總數,進而增加計算值函數所需的時間。
這種離散化適合網格世界等空間領域,但是其他狀態空間呢?我們來看看汽車換擋這個不同領域的問題。如今的大部分汽車都會自動換擋, 汽車如何決定切換到哪一擋以及何時換擋?這個圖表簡單地描述了對于一輛普通汽車,油耗如何隨著不同擋位速度的變化而變化。假設狀態僅包含車輛速度,以及當前擋位,獎勵與油耗成反比,智能體可以采取的動作包括換到更高擋位或更低擋位。雖然速度是連續值,但是可以離散化為不同的范圍。最佳劃分方式是一個擋位對應一個速度范圍,注意這些范圍可以具有不同的長度,即離散化是不均勻的。如果狀態空間還有其他維度, 例如油門位置,那么它們也可以不均勻地細分。
實例:小車上山
小車上山(MountainCar-v0)是一個經典的控制問題.如圖6-6所示,小車在一段范圍內行駛.在任意時刻,在水平方向看小車的位置范圍是[-1.2, 0.6],速度的范圍是[-0.07, 0.07].在每個時刻,智能體可以對小車施加3種動作中的一種, 向左施力, 不實例, 向右施力.智能體施力和小車的水平位置會共同決定小車 下一個時刻的速度.當某時刻小車的水平位置大于0.5時,控制目標成功達成,回合結束.控制的目標是讓小車盡可能少的步驟達到目標.一般認為,如果智能體在連續100個回合中的平均步數≤100\leq 100≤100,就認為問題解決了.
在絕大多數情況下,智能體簡單的向右施力并不足以讓小車成功越過目標.
本節假設智能體并不知道環境確定小車位置和速度的數學表達式.實際上,小車位置和速度是有數學表達式的.記ttt時刻(t=0,1,2,...t=0, 1, 2,...t=0,1,2,...),小車的位置為Xt(Xt∈[?1.2,0.6])X_t(X_t \in [-1.2, 0.6])Xt?(Xt?∈[?1.2,0.6]), 速度為Vt(Vt∈[?0.07,0.07])V_t(V_t \in [-0.07, 0.07])Vt?(Vt?∈[?0.07,0.07]),智能體施力為At∈{0,1,2}A_t \in \{ 0, 1, 2\}At?∈{0,1,2}, 初始狀態X0∈[?0.6,?0.4),V0=0X_0 \in [-0.6, -0.4), V_0=0X0?∈[?0.6,?0.4),V0?=0.
從ttt時刻到t+1t+1t+1時刻的更新式為
Xt+1=clip(Xt+Vt,?1.2,0.6)Vt+1=clip(Vt+0.001(At?1)?0.0025cos(3Xt),?0.07,0.07)\begin{aligned}X_{t+1} &= \text{clip}(X_t +V_t, -1.2, 0.6)\\V_{t+1}&=\text{clip}(V_t+0.001(A_t-1)-0.0025cos(3X_t), -0.07, 0.07)\end{aligned}Xt+1?Vt+1??=clip(Xt?+Vt?,?1.2,0.6)=clip(Vt?+0.001(At??1)?0.0025cos(3Xt?),?0.07,0.07)?
其中clip函數限制了位置和速度的范圍
clip(x,xmin,xmax)={xmin,x≤xminx,xmin<x<xmaxxmax,x≥xmax\text{clip}(x, x_{min}, x_{max}) = \begin{cases} x_{min}, & x\leq x_{min} \\ x, & x_{min}< x < x_{max} \\ x_{max}, & x\geq x_{max} \\ \end{cases} clip(x,xmin?,xmax?)=??????xmin?,x,xmax?,?x≤xmin?xmin?<x<xmax?x≥xmax??
6.3.1 相關程序
首先我們建立關于Agent的Q學習類
K6_MountainCar_Agent
import numpy as np# 定義可用于離散空間的等間距網格。
def create_uniform_grid(low, high, bins=(10, 10)):grid = [np.linspace(low[dim], high[dim], bins[dim] + 1)[1:-1] for dim in range(len(bins))]print("Uniform grid: [<low>, <high>] / <bins> => <splits>")for l, h, b, splits in zip(low, high, bins, grid):print(" [{}, {}] / {} => {}".format(l, h, b, splits))return grid# 根據給定的網格離散樣本。
def discretize(sample, grid):return list(int(np.digitize(s, g)) for s, g in zip(sample, grid)) # 返回索引值class QLearningAgent:"""Q-Learning agent,,通過離散化可以作用于連續的狀態空間。"""def __init__(self, env, state_grid, alpha=0.02, gamma=0.99,epsilon=1.0, epsilon_decay_rate=0.9995, min_epsilon=.01, seed=505):"""初始化變量,創建離散化網格。"""# Environment infoself.env = envself.state_grid = state_gridself.state_size = tuple(len(splits) + 1 for splits in self.state_grid) # n-維狀態空間self.action_size = self.env.action_space.n # 1-維離散動作空間self.seed = np.random.seed(seed)print("Environment:", self.env)print("State space size:", self.state_size)print("Action space size:", self.action_size)# 學習模型參數self.alpha = alpha # 學習率self.gamma = gamma # 折扣因子self.epsilon = self.initial_epsilon = epsilon # 初始探索率self.epsilon_decay_rate = epsilon_decay_rate # epsilon衰減系數self.min_epsilon = min_epsilon# 創建Q表self.q_table = np.zeros(shape=(self.state_size + (self.action_size,)))print("Q table size:", self.q_table.shape)def preprocess_state(self, state):"""將連續狀態映射到它的離散表示。"""return tuple(discretize(state, self.state_grid))def reset_episode(self, state):"""為新的事件重置變量."""# 逐步降低探索率self.epsilon *= self.epsilon_decay_rateself.epsilon = max(self.epsilon, self.min_epsilon)# 決定初始行動self.last_state = self.preprocess_state(state)self.last_action = np.argmax(self.q_table[self.last_state])return self.last_actiondef reset_exploration(self, epsilon=None):"""重置訓練時使用的探索率."""self.epsilon = epsilon if epsilon is not None else self.initial_epsilondef act(self, state, reward=None, done=None, mode='train'):"""選擇next操作并更新內部Q表 (when mode != 'test')."""state = self.preprocess_state(state)if mode == 'test':# 測試模式:簡單地產生一個動作action = np.argmax(self.q_table[state])else:# 訓練模式(默認):更新Q表,選擇下一步行動# Note: 我們用當前狀態,回報更新最后的狀態動作對的Q表條目self.q_table[self.last_state + (self.last_action,)] += self.alpha * \(reward + self.gamma * max(self.q_table[state]) -self.q_table[self.last_state + (self.last_action,)])# 探索 vs. 利用do_exploration = np.random.uniform(0, 1) < self.epsilonif do_exploration:# 隨機選擇一個動作action = np.random.randint(0, self.action_size)else:# 從Q表中選擇最佳動作action = np.argmax(self.q_table[state])# 存儲當前狀態,下一步操作self.last_state = stateself.last_action = actionreturn action
主程序K6_MountainCar如下
import sys
import gym
import numpy as npimport matplotlib.collections as mc
import pandas as pd
import matplotlib.pyplot as plt
from K6_MountainCar_Agent import QLearningAgent# 定義可用于離散空間的等間距網格。
def create_uniform_grid(low, high, bins=(10, 10)):grid = [np.linspace(low[dim], high[dim], bins[dim] + 1)[1:-1] for dim in range(len(bins))]print("Uniform grid: [<low>, <high>] / <bins> => <splits>")for l, h, b, splits in zip(low, high, bins, grid):print(" [{}, {}] / {} => {}".format(l, h, b, splits))return grid# 根據給定的網格離散樣本。
def discretize(sample, grid):return list(int(np.digitize(s, g)) for s, g in zip(sample, grid)) # 返回索引值# 在給定的二維網格上可視化原始的和離散的樣本。
def visualize_samples(samples, discretized_samples, grid, low=None, high=None):fig, ax = plt.subplots(figsize=(10, 10))# 顯示網格ax.xaxis.set_major_locator(plt.FixedLocator(grid[0]))ax.yaxis.set_major_locator(plt.FixedLocator(grid[1]))ax.grid(True)# 如果指定了邊界(低、高),則使用它們來設置軸的限制if low is not None and high is not None:ax.set_xlim(low[0], high[0])ax.set_ylim(low[1], high[1])else:# 否則使用第一個、最后一個網格位置為low、high(為了進一步映射離散化的樣本)low = [splits[0] for splits in grid]high = [splits[-1] for splits in grid]# 將每個離散的樣本(實際上是一個索引)映射到相應網格單元格的中心grid_extended = np.hstack((np.array([low]).T, grid, np.array([high]).T)) # add low and high endsgrid_centers = (grid_extended[:, 1:] + grid_extended[:, :-1]) / 2 # compute center of each grid celllocs = np.stack(grid_centers[i, discretized_samples[:, i]] for i in range(len(grid))).T # map discretized samplesax.plot(samples[:, 0], samples[:, 1], 'o') # 繪制初始樣本ax.plot(locs[:, 0], locs[:, 1], 's') # 繪制離散后的樣本ax.add_collection(mc.LineCollection(list(zip(samples, locs)),colors='orange')) # 添加一條線連接每個原始離散樣本ax.legend(['original', 'discretized'])def run(agent, env, num_episodes=20000, mode='train'):"""給定的強化學習環境中運行agent并返回分數."""scores = []max_avg_score = -np.inffor i_episode in range(1, num_episodes + 1):# 初始化事件state = env.reset()action = agent.reset_episode(state)total_reward = 0done = False# 運行步驟直到完成while not done:state, reward, done, info = env.step(action)total_reward += rewardaction = agent.act(state, reward, done, mode)# 保存最終分數scores.append(total_reward)# 輸出事件狀態if mode == 'train':if len(scores) > 100:avg_score = np.mean(scores[-100:])if avg_score > max_avg_score:max_avg_score = avg_scoreif i_episode % 100 == 0:print("\rEpisode {}/{} | Max Average Score: {}".format(i_episode, num_episodes, max_avg_score), end="")sys.stdout.flush()return scoresdef plot_scores(scores, rolling_window=100):"""Plot scores and optional rolling mean using specified window."""plt.plot(scores)plt.title("Scores")rolling_mean = pd.Series(scores).rolling(rolling_window).mean()plt.plot(rolling_mean)return rolling_meandef plot_q_table(q_table):"""Visualize max Q-value for each state and corresponding action."""q_image = np.max(q_table, axis=2) # max Q-value for each stateq_actions = np.argmax(q_table, axis=2) # best action for each statefig, ax = plt.subplots(figsize=(10, 10))cax = ax.imshow(q_image, cmap='jet')cbar = fig.colorbar(cax)for x in range(q_image.shape[0]):for y in range(q_image.shape[1]):ax.text(x, y, q_actions[x, y], color='white',horizontalalignment='center', verticalalignment='center')ax.grid(False)ax.set_title("Q-table, size: {}".format(q_table.shape))ax.set_xlabel('position')ax.set_ylabel('velocity')# main function
if __name__ == "__main__":# 創建一個環境并設置隨機種子env = gym.make("MountainCar-v0")env.seed(505)# 環境測試env_test = Falseif env_test is True:state = env.reset()score = 0for t in range(200):action = env.action_space.sample()env.render()state, reward, done, _ = env.step(action)score += rewardif done:breakprint("Final score:", score)env.close()# Explore state (observation) spaceprint("State space:", env.observation_space)print("- low:", env.observation_space.low)print("- high:", env.observation_space.high)print("State space samples:")print(np.array([env.observation_space.sample() for i in range(10)]))# action spaceprint("Action space:", env.action_space)# 從動作空間生成一些示例print("Action space samples:")print(np.array([env.action_space.sample() for i in range(10)]))state_grid = create_uniform_grid(env.observation_space.low, env.observation_space.high, bins=(10, 10))q_agent = QLearningAgent(env, state_grid)# 以不同模式運行,方便測試結果run_mode = Trueif run_mode is True:q_agent.q_table = np.load('q_table.npy', allow_pickle=True)state = env.reset()score = 0for t in range(200):action = q_agent.act(state, mode='test')env.render()state, reward, done, _ = env.step(action)score += rewardif done:breakprint('Final score:', score)env.close()else:scores = run(q_agent, env)# plot dataplt.plot(scores)plt.title("Scores")rolling_mean = plot_scores(scores)plt.show()test_scores = run(q_agent, env, num_episodes=100, mode='test')print("[TEST] Completed {} episodes with avg. score = {}".format(len(test_scores), np.mean(test_scores)))_ = plot_scores(test_scores)plot_q_table(q_agent.q_table)plt.show()np.save('q_table.npy', q_agent.q_table)
6.3.2 程序注解
(1) 環境測試
首先來看一下MountainCar這個環境條件,在主程序K6_MountainCar中,可以通過更改env_test的值,使其為True來進行環境的測試.輸出如下
Final score: -200.0
State space: Box(2,)
- low: [-1.2 -0.07]
- high: [0.6 0.07]
State space samples:
[[-0.00739189 -0.01564005][-0.4523059 0.04406008][-0.1716276 -0.04345753][-0.22001167 0.01072859][-1.1200054 -0.05656335][-0.91125274 -0.05465209][-0.38469937 -0.06611647][ 0.15009743 0.03064202][-1.1663655 -0.06613734][-0.352857 -0.01548356]]
Action space: Discrete(3)
Action space samples:
[1 1 2 2 1 1 1 1 0 0]
(2) 離散化
# 將觀測空間離散化,其中bins控制離散精度state_grid = create_uniform_grid(env.observation_space.low, env.observation_space.high, bins=(10, 10))
這個部分的作用是將觀測空間進行離散化,借助create_uniform_grid()函數
# 定義可用于離散空間的等間距網格。
def create_uniform_grid(low, high, bins=(10, 10)):grid = [np.linspace(low[dim], high[dim], bins[dim] + 1)[1:-1] for dim in range(len(bins))]print("Uniform grid: [<low>, <high>] / <bins> => <splits>")for l, h, b, splits in zip(low, high, bins, grid):print(" [{}, {}] / {} => {}".format(l, h, b, splits))return grid
其中bins定義了離散化的精度
Uniform grid: [<low>, <high>] / <bins> => <splits>[-1.2000000476837158, 0.6000000238418579] / 10 => [-1.02000004 -0.84000003 -0.66000003 -0.48000002 -0.30000001 -0.120.06 0.24000001 0.42000002][-0.07000000029802322, 0.07000000029802322] / 10 => [-0.056 -0.042 -0.028 -0.014 0. 0.014 0.028 0.042 0.056]
(3) 模型訓練
q_agent = QLearningAgent(env, state_grid)# 以不同模式運行,方便測試結果run_mode = False# 運行測試模式if run_mode is True:q_agent.q_table = np.load('q_table.npy', allow_pickle=True)state = env.reset()score = 0for t in range(200):action = q_agent.act(state, mode='test')env.render()state, reward, done, _ = env.step(action)score += rewardif done:breakprint('Final score:', score)env.close()# 訓練模式else:scores = run(q_agent, env)# plot dataplt.plot(scores)plt.title("Scores")rolling_mean = plot_scores(scores)plt.show()test_scores = run(q_agent, env, num_episodes=100, mode='test')print("[TEST] Completed {} episodes with avg. score = {}".format(len(test_scores), np.mean(test_scores)))_ = plot_scores(test_scores)plot_q_table(q_agent.q_table)plt.show()
在這里提供了兩種運行模式,當run_mode = False時,為訓練模式. 此時會進行模型訓練,相關訓練過程的輸出如下
--Agent--
Environment: <TimeLimit<MountainCarEnv<MountainCar-v0>>>
State space size: (10, 10)
Action space size: 3
Q table size: (10, 10, 3)
Episode 20000/20000 | Max Average Score: -130.49[TEST] Completed 100 episodes with avg. score = -146.91
可視化q表的輸出:
分數和平均分數輸出:
可以看到平均分數隨著訓練輪數的增加而增加.
同時,我們也可以在test模式下運行模型,來分析和觀測所獲得的分數,輸出如下,
在訓練完成之后,程序會將Q表保存到q_table.npy文件中,方便之后的使用.
當run_mode = True 時,程序以運行模式進行.程序會從文件中讀取訓練后的q_表的值,運行完之后,程序自動退出.
(4) 模型優化
之前在環境介紹中說過,小車上山問題在分數小于100時可以認為解決了.在之前的模型中,我們的平均分數保持在-150左右,我們可以通過細化離散度來獲得更好的分數.
我們將bins更改為(20, 20),并將訓練輪數調整到50000輪.運行程序
分數和平均分數輸出:
test模式輸出:
可以看到分數有了一定的提升.
將run_mode設置成True即可查看效果.
總結
以上是生活随笔為你收集整理的强化学习(六) - 连续空间中的强化学习(RL in Continuous Spaces)及相关实例的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 强化学习(五) - 时序差分学习(Tem
- 下一篇: HTTP 和 HTTPS 之间除了安全性