玛丽天堂java游戏_用DQN玩超级玛丽
算法流程
這是我之前畫的一個體現一五年DQN算法的示意圖:
看這張圖需要注意的一點是,整個算法是可以看做獨立進行的兩個過程:
用價值網絡去玩游戲(play)
對價值網絡進行更新(updata)
開始編程
所需要的工具:
作為使用pytorch的新手,這次踩過的最大一個坑就是,如果ndarray和torch.Tensor之間頻繁轉換,關系比較混亂的話,把Tensor放在GPU里會特別的慢,而且可能慢到無法想象。大概是多維數據在GPU和CPU之間轉移所導致的。多維向量最好在一開始就是以Tensor的形式出現的。
# if gpu is to be used
use_cuda = torch.cuda.is_available()
FloatTensor = torch.cuda.FloatTensor if use_cuda else torch.FloatTensor
LongTensor = torch.cuda.LongTensor if use_cuda else torch.LongTensor
ByteTensor = torch.cuda.ByteTensor if use_cuda else torch.ByteTensor
Tensor = FloatTensor
在以后的代碼中,就都用Tensor定義變量。
價值網絡
我在價值網絡這個類中,除了定義了網絡結構,還增加了一些功能:
網絡結構及前向傳播
動作選擇
網絡參數更新
網絡結構
class dqn_net(nn.Module):
def __init__(self,ACTION_NUM):
super(dqn_net,self).__init__()
self.conv1=nn.Conv2d(in_channels=4,out_channels=16,kernel_size=8,stride=4)
self.conv2=nn.Conv2d(in_channels=16,out_channels=32,kernel_size=4,stride=2)
self.fc1=nn.Linear(in_features=9*9*32,out_features=256)
self.fc2=nn.Linear(in_features=256,out_features=ACTION_NUM)
self.action_num=ACTION_NUM
def forward(self,input):
output=F.relu(self.conv1(input))
output=F.relu(self.conv2(output))
output=output.view(-1,9*9*32)
output=F.relu(self.fc1(output))
output=self.fc2(output)
return output
動作選擇
def select_action(self,input):
'''
parameters
----------
input : {Tensor} of shape torch.Size([4,84,84])
Return
------
action_button , action_onehot : {int} , {Tensor}
'''
input=Variable(input.unsqueeze(0))
output=self.forward(input)
action_index=output.data.max(1)[1][0]
# action_button , action_onehot
if action_index==0: return 0,Tensor([1,0,0,0,0,0]) # 不動
elif action_index==1: return 3,Tensor([0,1,0,0,0,0]) # 左走
elif action_index==2: return 7,Tensor([0,0,1,0,0,0]) # 右走
elif action_index==3: return 11,Tensor([0,0,0,1,0,0]) # 原地跳
elif action_index==4: return 4,Tensor([0,0,0,0,1,0]) # 左跳
elif action_index==5: return 8,Tensor([0,0,0,0,0,1]) # 右跳
這里返回的動作指令有相對應的兩個形式:
1、用來輸入游戲環境的動作編號。對應著按鍵情況。實際上這個游戲一共有6個按鍵,原游戲環境中寫了14種按鍵情況,而我只取了其中6種按鍵組合,對應6種動作。
mapping = {
0: [0, 0, 0, 0, 0, 0],
# NO1: [1, 0, 0, 0, 0, 0],
# Up2: [0, 1, 0, 0, 0, 0],
# Down3: [0, 0, 1, 0, 0, 0],
# Left4: [0, 0, 1, 0, 1, 0],
# Left + A5: [0, 0, 1, 0, 0, 1],
# Left + B6: [0, 0, 1, 0, 1, 1],
# Left + A + B7: [0, 0, 0, 1, 0, 0],
# Right8: [0, 0, 0, 1, 1, 0],
# Right + A9: [0, 0, 0, 1, 0, 1],
# Right + B10: [0, 0, 0, 1, 1, 1],
# Right + A + B11: [0, 0, 0, 0, 1, 0],
# A12: [0, 0, 0, 0, 0, 1],
# B13: [0, 0, 0, 0, 1, 1],
# A + B}
2、用來與網絡輸出值對應的,one-hot 編碼的形式,每一位代表一種動作,為1代表執行該動作。
網絡更新
def update(self,samples,loss_func,optim_func,learn_rate,target_net,BATCH_SIZE,GAMMA):
'''update the value network one step
Parameters
----------
samples: {namedtuple}
Transition(obs4=(o1,o2,...),act=(a1,a2,...),
next_ob=(no1,no2,...),reward=(r1,r2,...),done=(d1,d2,...))
loss: string
the loss function of the network
e.g. 'nn.MSELoss'
optim: string
the optimization function of the network
e.g. 'optim.SGD'
learn_rate: float
the learing rate of the optimizer
Functions
---------
update the network one step
'''
obs4_batch=Variable(torch.cat(samples.obs4)) # ([BATCH,4,84,84])
next_obs4_batch=Variable(torch.cat(samples.next_obs4)) # ([BATCH,4,84,84])
action_batch=Variable(torch.cat(samples.act)) # ([BATCH,6])
done_batch=samples.done # {tuple} of bool,len=BATCH
reward_batch=torch.cat(samples.reward) # ([BATCH,1])
### compute the target Q(s,a) value ###
value_batch=target_net(next_obs4_batch)
target=Variable(torch.zeros(BATCH_SIZE).type(Tensor))
for i in range(BATCH_SIZE):
if done_batch[i]==False:
target[i]=reward_batch[i]+GAMMA*Tensor.max(value_batch.data[i])
elif done_batch[i]==True:
target[i]=reward_batch[i]
### compute the current net output value ###
output_all=self.forward(obs4_batch)*action_batch
output=output_all.sum(dim=1) # {Variable contain FloatTensor}
criterion=loss_func()
optimizer=optim_func(self.parameters(),lr=learn_rate)
loss=criterion(output,target)
optimizer.zero_grad()# set gradients of parameters to be optimized to zero
loss.backward()
optimizer.step()
用來訓練的樣本是一個namedtuple的形式:
{namedtuple}:
Transition(obs4=(o1,o2,...),act=(a1,a2,...),next_ob=(no1,no2,...),reward=(r1,r2,...),done=(d1,d2,...))
訓練的過程是這樣的:
取出觀察值obs4輸入網絡,得到網絡輸出
取出輸出中與樣本中的動作act對應的值
3. 利用樣本里的r和s'計算目標值:
計算目標值時需要判斷下一個狀態是不是終止狀態。
如果不是的話就按照下式計算目標值:
如果是終止狀態則目標值為:
4. 第二步中取出的Q值與第三步中計算的目標值,作為計算損失函數的兩項。
樣本池
from collections import namedtuple
import random
import numpy as np
class replay_memory:
def __init__(self,capacity):
self.capacity=capacity
self.memory=[]
self.position=0
self.Transition=namedtuple('Transition',
['obs4','act','next_obs4','reward','done'])
def __len__(self):
return len(self.memory)
def add(self,*args):
'''Add a transition to replay memory
Parameters
----------
e.g. repay_memory.add(obs4,action,next_obs4,reward,done)
obs4: {Tensor} of shape torch.Size([4,84,84])
act: {Tensor} of shape torch.Size([6])
next_obs4: {Tensor} of shape torch.Size([4,84,84])
reward: {int}
done: {bool} the next station is the terminal station or not
Function
--------
the replay_memory will save the latest samples
'''
if len(self.memory)
self.memory.append(None)
self.memory[self.position]=self.Transition(*args)
self.position=(self.position+1)%self.capacity
def sample(self,batch_size):
'''Sample a batch from replay memory
Parameters
----------
batch_size: int
How many trasitions you want
Returns
-------
obs_batch: {Tensor} of shape torch.Size([BATCH_SIZE,4,84,84])
batch of observations
act_batch: {Tensor} of shape torch.Size([BATCH_SIZE,6])
batch of actions executed w.r.t observations in obs_batch
nob_batch: {Tensor} of shape torch.Size([BATCH_SIZE,4,84,84])
batch of next observations w.r.t obs_batch and act_batch
rew_batch: {ndarray} of shape
batch of reward received w.r.t obs_batch and act_batch
'''
batch = random.sample(self.memory, batch_size)
batch_zip=self.Transition(*zip(*batch))
return batch_zip
其中的樣本,是以namedtuple的形式存放的:
{Transition}
0:{tuple} of {Tensor}-shape-torch.Size([4,84,84])
1:{tuple} of {Tensor}-shape-torch.Size([6])
2:{tuple} of {Tensor}-shape-torch.Size([4,84,84])
3:{tuple} of {int}
4:{tuple} of {bool}
圖片預處理
def ob_process(frame):
'''
Parameters
----------
frame: {ndarray} of shape (90,90)
Returns
-------
frame: {Tensor} of shape torch.Size([1,84,84])
'''
frame = cv2.resize(frame, (84, 84), interpolation=cv2.INTER_AREA)
frame=frame.astype('float64')
frame=torch.from_numpy(frame)
frame=frame.unsqueeze(0).type(Tensor)
return frame
學習過程
進行各種初始化
### initialization ###
action_space=[(0,Tensor([1,0,0,0,0,0])),
(3,Tensor([0,1,0,0,0,0])),
(7,Tensor([0,0,1,0,0,0])),
(11,Tensor([0,0,0,1,0,0])),
(4,Tensor([0,0,0,0,1,0])),
(8,Tensor([0,0,0,0,0,1]))]
# (action_button , action_onehot)
# 以上動作分別為:不動、左走、右走、跳、左跳、右跳
value_net = dqn_net(ACTION_NUM)
target_net=dqn_net(ACTION_NUM)
if torch.cuda.is_available():
value_net.cuda()
target_net.cuda()
if os.listdir(PATH):
value_net.load_state_dict(torch.load(PATH))
buffer=replay_memory(REPLAY_MEMORY_CAPACITY)
env.reset()
obs,_,_,_,_,_,_=env.step(0)
obs=ob_process(obs)
obs4=torch.cat(([obs,obs,obs,obs]),dim=0) # {Tensor} of shape torch.Size([4,84,84])
judge_distance=0
episode_total_reward = 0
epi_total_reward_list=[]
# counters #
time_step=0
update_times=0
episode_num=0
history_distance=200
之后進入以下循環,開始玩游戲:
while episode_num <= MAX_EPISODE:
進行動作選擇
### choose an action with epsilon-greedy ###
prob = random.random()
threshold = EPS_END + (EPS_START - EPS_END) * math.exp(-1 * episode_num / EPS_DECAY)
if prob <= threshold:
action_index = np.random.randint(6)
action_button = action_space[action_index][0] # {int}
action_onehot = action_space[action_index][1] # {Tensor}
else:
action_button, action_onehot = value_net.select_action(obs4)
執行該動作
### do one step ###
obs_next, reward, done, _, max_distance, _, now_distance = env.step(action_button)
obs_next = ob_process(obs_next)
obs4_next = torch.cat(([obs4[1:, :, :],obs_next]),dim=0)
buffer.add(obs4.unsqueeze(0), action_onehot.unsqueeze(0), obs4_next.unsqueeze(0), Tensor([reward]).unsqueeze(0), done)
episode_total_reward+=reward
if now_distance <= history_distance:
judge_distance+=1
else:
judge_distance=0
history_distance=max_distance
這里有一步用來計算馬里奧走了多遠的操作,如果馬里奧原地徘徊了一定時間,那么也算到達終止狀態而重新開始下一輪游戲。
轉移到下個狀態
### go to the next state ###
if done == False:
obs4 = obs4_next
time_step += 1
elif done == True or judge_distance > 50:
env.reset()
obs, _, _, _, _, _, _ = env.step(0)
obs = ob_process(obs)
obs4 = torch.cat(([obs, obs, obs, obs]), dim=0)
episode_num += 1
history_distance = 200
epi_total_reward_list.append(episode_total_reward)
print('episode%dtotal reward=%.2f'%(episode_num,episode_total_reward))
episode_total_reward = 0
這里要判斷是否到達了終止狀態,如果到了終止狀態,則進行一些初始化準備開始下一輪游戲。
更新網絡
### do one step update ###
if len(buffer) == buffer.capacity and time_step % 4 == 0:
batch_transition = buffer.sample(BATCH_SIZE)
value_net.update(samples=batch_transition, loss_func=LOSS_FUNCTION,
optim_func=OPTIM_METHOD, learn_rate=LEARNING_RATE,
target_net=target_net, BATCH_SIZE=BATCH_SIZE,
GAMMA=GAMMA)
update_times += 1
### copy value net parameters to target net ###
if update_times % NET_COPY_STEP == 0:
target_net.load_state_dict(value_net.state_dict())
以上便是整個過程。經過一段時間的訓練,可以看到智能體確實有進步,但還遠遠不夠“智能”。這個算法只是復現的論文中的方法,而對超級瑪麗這個游戲要想玩好,還應該做一些有針對性的分析。并且可以加入更多其他方法去玩這個游戲。
總結
以上是生活随笔為你收集整理的玛丽天堂java游戏_用DQN玩超级玛丽的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JSPatch地址
- 下一篇: Qt中模型/视图结构