论文代码解读 Hierarchical Reinforcement Learning for Scarce Medical Resource Allocation
生活随笔
收集整理的這篇文章主要介紹了
论文代码解读 Hierarchical Reinforcement Learning for Scarce Medical Resource Allocation
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
論文解讀?論文筆記 Hierarchical Reinforcement Learning for Scarce Medical Resource Allocation_UQI-LIUWJ的博客-CSDN博客
代碼部分
KYHKL-Q/Hierarchical-RL (github.com)?
1 RL_train.py
訓練強化學習模塊的部分
由于論文中提及了口罩和床位兩個指標,兩個的模型在很多方面都是類似的,所以為了方便起見,我只設計口罩部分
1.1 導入庫
import numpy as np import json import torch import random import copy import os from torch import nn,optim from simulator import simulator from reward_fun import reward_fun1.2 RL_train 類
1.2.1 __init__
def __init__(self,action_lr=0.001, #action ,也就是論文中顯著性排序DQN的學習率Actor_lr=0.001, #actor(生成滿足因子)的學習率Critic_lr=0.002, #critic的學習率decay_rate=0.5, #未來獎勵的折現率 batch_size=8, #batch_sizepool_volume=128, #經驗回放 buffer的大小action_explore_factor=0.5, #action 探索的比例 (ε-greedy)Actor_explore_factor=0.05, #actor探索的比例 (ε-greedy)soft_replace_rate=0.1, #軟更新,eval的參數對target net的影響#DDPG問題中,每次target net向eval net靠近的幅度train_steps=150, #總共要訓練的輪數(類似于epoch-number)interval=48, #每隔多久更新一次八狀態mask_total=1000000, #總的口罩數量mask_quality=0.9, #醫用口罩有效過濾病毒的比例mask_lasting=48, #口罩有效期city='city_sample' #研究區域的名稱):print('Initializing...')self.action_lr=action_lrself.Actor_lr=Actor_lrself.Critic_lr=Critic_lrself.decay_rate=decay_rate self.batch_size=batch_sizeself.pool_volume=pool_volumeself.action_explore_factor=action_explore_factorself.Actor_explore_factor=Actor_explore_factorself.soft_replace_rate=soft_replace_rate self.train_steps=train_stepsself.interval=intervalself.mask_total=mask_totalself.mask_quality=mask_qualityself.mask_lasting=mask_lastingself.city=city#Initialize state and replay bufferwith open(os.path.join('../data',self.city,'start.json'),'r') as f:self.start = np.array(json.load(f))#由于提供的代碼里面沒有這個文件,我猜測是初始情況下各區域各狀態的人數##是一個region_num*8的二維數組from Net.Anet import Anet#actor network————>計算滿足因子from Net.Qnet import Qnet#action network————>計算選擇哪個顯著性排序from Net.RNN import RNN#用可以求得的三個狀態'''It(已經感染疾病,同時被檢測出疾病的人。)Ih(被送醫治療的感染者)D(死亡人群)'''#計算其他三個環境可看到的狀態的值'''E(暴露人群)Iu(已經感染疾病,但是沒有檢測的人。)R(康復人群)'''from Net.Cnet import Cnet#critic networkself.region_num = 161#Supposed to be modified according to the number of regions in the studied city.#區劃的數量self.pool_pointer=0#經驗回放 relay buffer 下一個下標(下一組記錄放到哪里)self.pool_count=0#目前relay buffer中的記錄個數self.pool_state=np.zeros([self.pool_volume,self.region_num,8])#relay buffer中的狀態數#buffer_size * region_num * 8self.pool_mask_action=np.zeros(self.pool_volume)#relay buffer 中選擇了哪個顯著性排序self.pool_mask_perc=np.zeros(self.pool_volume)#relay buffer 中 口罩的滿足因子self.pool_reward=np.zeros(self.pool_volume)#relay buffer 中的單步獎勵self.pool_state1=np.zeros([self.pool_volume,self.region_num,8])#relay buffer 中的下一狀態self.current_state=self.start#目前的狀態self.next_state=self.current_state#下一狀態#Initialize the networkstorch.manual_seed(1234)torch.cuda.manual_seed(1234)self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')self.Mask_action_eval=Qnet().to(self.device)self.Mask_action_eval.train()self.Mask_action_target=copy.deepcopy(self.Mask_action_eval)#深拷貝,也就是二者不共享參數#一個是eval network 一個是target networkself.Mask_action_target.eval()#表示target 不參與訓練(后同) self.Mask_action_optimizer=optim.Adam(self.Mask_action_eval.parameters(),lr=self.action_lr)#action 指的是論文模型示意圖中的DQN,也就是學習顯著性排序的部分self.Mask_Actor_eval=Anet().to(self.device)self.Mask_Actor_eval.train()self.Mask_Actor_target=copy.deepcopy(self.Mask_Actor_eval) self.Mask_Actor_target.eval()self.Mask_Actor_optimizer=optim.Adam(self.Mask_Actor_eval.parameters(),lr=self.Actor_lr)#actor 就是論文模型示意圖下面的actorself.Mask_Critic_eval=Cnet(self.region_num).to(self.device)self.Mask_Critic_eval.train()self.Mask_Critic_target=copy.deepcopy(self.Mask_Critic_eval)self.Mask_Critic_eval.eval() self.Mask_Critic_optimizer=optim.Adam(self.Mask_Critic_eval.parameters(),lr=self.Critic_lr)#critic 就是論文模型示意圖下面的critic#Initialize the simulator#各區域疾病狀態轉換self.simulator=simulator(city=self.city)self.simulator.reset(self.start)#將start中各區域各狀態的初始人數設置到simulater 的nodes列表中#nodes列表的每一個元素都是一個node對象#node對象有一個id,以及8狀態(S,E,Iu,It,Ia,Ih,R,D)print('Initializing down!')1.2.2 train
def train(self):print('Start training...')loss_record = list()train_count = 0 #目前訓練的輪數while train_count < self.train_steps:self.current_state=self.start##由于提供的代碼里面沒有這個文件,我猜測start是初始情況下各區域各狀態的人數##是一個region_num*8的二維數組self.next_state=self.current_state#下一狀態self.is_end=False#是否結束episodeend_count = 0#samplingstep_count=0while ((not self.is_end) andstep_count <= 60/(self.interval/48)):mask_action_out = self.Mask_action_eval(torch.FloatTensor(self.current_state[:,[1,2,3,5,6,7]]).to(self.device).unsqueeze(0))'''選取E,Iu,It,Ih,R,D同時升維度至[1,region_num,6]action 指的是論文模型示意圖中的DQN,也就是學習顯著性排序的部分mask 就是指看口罩mask_action_out->[1,7]這里是表示不同的排序原則的“打分”論文中一共出現了3中不同的排序原則,其中可以任意組合,所以有2^3-1=7種(不能都不選)'''#select an action through epsilon-greedymask_action=torch.argmax(mask_action_out).cpu().item()#選擇“打分最高的”顯著性排序原則rand_temp=random.random()if(rand_temp>(1-self.action_explore_factor)):mask_action = int(7 * random.random())#一定比例是隨機探索,否則就是利用print('mask_action:{}'.format(mask_action))mask_perc = self.Mask_Actor_eval(torch.FloatTensor(self.current_state[:,[1,2,3,5,6,7]]).to(self.device).unsqueeze(0))#選取E,Iu,It,Ih,R,D#同時升維度至[1,region_num,6]#mask_perc是一個[1,1]的0~1之間的數,表示的是論文里提到的滿足因子fmask_perc = mask_perc.squeeze(0).cpu().detach().item()\+ self.Actor_explore_factor * np.random.randn()mask_perc_clip = np.clip(mask_perc, 0.1, 1)print('mask_perc:{}'.format(mask_perc_clip))#我們通過actor算出來的比例,加上一定探索的概率,得到最終的滿足因子f#與此同時,滿足因子f需要在0.1~1之間#get the next state through simulationself.next_state, _ = self.simulator.simulate(sim_type='Policy_a',interval=self.interval,bed_total=self.bed_total,bed_action=bed_action,bed_satisfy_perc=bed_perc_clip,mask_on=True,mask_total=self.mask_total,mask_quality=self.mask_quality,mask_lasting=self.mask_lasting,mask_action=mask_action,mask_satisfy_perc=mask_perc_clip)#各個區域新的狀態if(self.simulator.full and end_count==0):end_count=1#full表示床位有沒有滿if((not self.simulator.full) and end_count==1):self.is_end=Trueprint('Is_end:{}'.format(self.is_end))#get single step rewardreward=reward_fun(self.current_state,self.next_state)#單步獎勵函數print('Reward:{}'.format(reward))#put the sample into the replay buffer#將數據放入經驗回放中if(self.simulator.full):self.pool_state[self.pool_pointer]=self.current_stateself.pool_bed_action[self.pool_pointer]=bed_actionself.pool_mask_action[self.pool_pointer]=mask_actionself.pool_bed_perc[self.pool_pointer]=bed_percself.pool_mask_perc[self.pool_pointer]=mask_percself.pool_reward[self.pool_pointer]=rewardself.pool_state1[self.pool_pointer]=self.next_stateself.pool_pointer=(self.pool_pointer+1)%self.pool_volumeif (self.pool_count<self.pool_volume):self.pool_count=self.pool_count+1print('Sampling:{} sampeles in pool now'.format(self.pool_count))#將當前的狀態、床位和口罩使用哪種顯著性排序、床位和口罩的滿足因子#單步獎勵,下一狀態 送到relay buffer中#mini batch sampling and updating the parametersif (self.pool_count>=self.batch_size):sample_index=random.sample(range(self.pool_count),self.batch_size)#從relay buffer中隨機選擇batch_size個indexsample_state=torch.FloatTensor(self.pool_state[sample_index]).to(self.device)#選擇的batch中的當前狀態sample_bed_action=self.pool_bed_action[sample_index]#選擇的batch中的床位顯著性排序sample_mask_action=self.pool_mask_action[sample_index]#選擇的batch中的口罩顯著性排序sample_reward=self.pool_reward[sample_index]#選擇的batch中的單步獎勵sample_bed_perc=torch.FloatTensor(self.pool_bed_perc[sample_index]).to(self.device).unsqueeze(1)#選擇的batch中的床位滿足因子sample_mask_perc=torch.FloatTensor(self.pool_mask_perc[sample_index]).to(self.device).unsqueeze(1)#選擇的batch中的口罩滿足因子sample_state1=torch.FloatTensor(self.pool_state1[sample_index]).to(self.device)##選擇的batch中的下一個狀態mask_action_eval=self.Mask_action_eval(sample_state[:,:,[1,2,3,5,6,7]])#顯著性排序的eval_net#sample_state[:,:,[1,2,3,5,6,7]]————>[batch_size,region_num,6]#結果為[batch_size,7]mask_action_target=self.Mask_action_target(sample_state1[:,:,[1,2,3,5,6,7]])#顯著性排序的target_net#sample_state[:,:,[1,2,3,5,6,7]]————>[batch_size,region_num,6]#結果為[batch_size,7]mask_action_max = torch.argmax(mask_action_target, dim=-1).cpu()#batch_size 每個最大的action,也就是bacth中每個記錄應該選擇哪一種顯著性排序mask_perc_eval=self.Mask_Actor_eval(sample_state[:,:,[1,2,3,5,6,7]])#滿足因子的eval_net#sample_state[:,:,[1,2,3,5,6,7]]————>[batch_size,region_num,6]#結果為[batch_size,1]mask_perc_target=self.Mask_Actor_target(sample_state1[:,:,[1,2,3,5,6,7]])#滿足因子的target_net#sample_state[:,:,[1,2,3,5,6,7]]————>[batch_size,region_num,6]#結果為[batch_size,1]mask_reward_eval = self.Mask_Critic_eval(sample_state[:,:,[1,2,3,5,6,7]], sample_mask_perc)#critic的eval_net#sample_state[:,:,[1,2,3,5,6,7]]————>[batch_size,region_num,6]#結果為[batch_size,1]mask_reward_target=self.Mask_Critic_target(sample_state1[:,:,[1,2,3,5,6,7]],mask_perc_target)#critic的eval_net#sample_state[:,:,[1,2,3,5,6,7]]————>[batch_size,region_num,6]#結果為[batch_size,1]loss = 0for i in range(self.batch_size):y = sample_reward[i]y = y + \self.decay_rate * (mask_action_target[i][mask_action_max[i]] +mask_reward_target[i])#顯著性排序中最大的那個的值+critic reward時的值loss = loss + \(mask_reward_eval[i] +mask_action_eval[i][int(sample_mask_action[i])]- y)** 2#和eval的均方差loss=loss/self.batch_sizeprint('Loss:{}'.format(loss.cpu().item()))self.Mask_action_optimizer.zero_grad()self.Mask_Critic_optimizer.zero_grad()loss.backward()self.Mask_action_optimizer.step() self.Mask_Critic_optimizer.step()#訓練critic和actionmask_reward0 = self.Mask_Critic_eval(sample_state[:,:,[1,2,3,5,6,7]], mask_perc_eval)loss_pg = - torch.mean(mask_reward0)#因為是梯度上升(policy gradient,所以這里有一個負號)print('Loss_pg:{}'.format(loss_pg))self.Mask_Actor_optimizer.zero_grad()loss_pg.backward()self.Mask_Actor_optimizer.step()#訓練actorloss_record.append([loss.cpu().item(),loss_pg.cpu().item()])#soft#對target network進行軟更新#無論是action,actor還是critic,都需要進行軟更新#target=(1-soft_rate)target+soft_rate*evalfor x in self.Mask_action_target.state_dict().keys():eval('self.Mask_action_target.'+x+'.data.mul_(1-self.soft_replace_rate)')eval('self.Mask_action_target.'+x+'.data.add_(self.soft_replace_rate*self.Mask_action_eval.'+x+'.data)')for x in self.Mask_Actor_target.state_dict().keys():eval('self.Mask_Actor_target.'+x+'.data.mul_((1-self.soft_replace_rate))')eval('self.Mask_Actor_target.'+x+'.data.add_(self.soft_replace_rate*self.Mask_Actor_eval.'+x+'.data)')for x in self.Mask_Critic_target.state_dict().keys():eval('self.Mask_Critic_target.'+x+'.data.mul_((1-self.soft_replace_rate))')eval('self.Mask_Critic_target.'+x+'.data.add_(self.soft_replace_rate*self.Mask_Critic_eval.'+x+'.data)')train_count += 1print('Training:epoch {}/{}\n'.format(train_count, self.train_steps))if train_count == self.train_steps:break#update the stateself.current_state=self.next_statestep_count+=1#save the modelstorch.save(self.Mask_action_eval.state_dict(), os.path.join('../model',self.city,'mask_action_model.pth'))torch.save(self.Mask_Actor_eval.state_dict(), os.path.join('../model',self.city,'mask_Actor_model.pth'))'''with open(os.path.join('../model',self.city,'loss.json'),'w') as f:json.dump(loss_record,f)'''print('Training complete!')1.3 主函數部分
if __name__ == "__main__":os.chdir(os.path.split(os.path.realpath(__file__))[0])train_platform = RL_train(mask_total=1000000)#設置了總的口罩數train_platform.train()2 Net/Qnet.py
action 部分?
計算顯著性排序的DQN
import torch from torch import nn,optim import torch.nn.functional as Fclass Qnet(nn.Module):def __init__(self):super(Qnet,self).__init__()self.cov1=nn.Conv1d(in_channels=6,out_channels=16,kernel_size=5,stride=1,padding=2,bias=True)#這里池化層的輸入維度就是我環境可以看到的狀態的維度:6#分別表示:'''It(已經感染疾病,同時被檢測出疾病的人。)Ih(被送醫治療的感染者)D(死亡人群)E(暴露人群)Iu(已經感染疾病,但是沒有檢測的人。)R(康復人群)順序為:E,Iu,It,Ih,R,D'''self.cov2=nn.Conv1d(in_channels=16,out_channels=32,kernel_size=5,stride=1,padding=2,bias=True)self.cov3=nn.Conv1d(in_channels=32,out_channels=16,kernel_size=5,stride=2,padding=0,bias=True)self.cov4=nn.Conv1d(in_channels=16,out_channels=4,kernel_size=5,stride=2,padding=0,bias=True)#四層卷積層 6->16->32->16->4self.lin1=nn.Linear(in_features=664,out_features=128,bias=True)self.lin2=nn.Linear(in_features=128,out_features=7,bias=True)self.device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')self.mask=torch.tensor([[0.01,0,0,0,0,0],[0,0.05,0,0,0,0],[0,0,0.01,0,0,0],[0,0,0,0.05,0,0],[0,0,0,0,0.01,0],[0,0,0,0,0,0.05]],dtype=torch.float).to(self.device)def forward(self,x):#x是[1,region_num,6]#E,Iu,It,Ih,R,Dx1=torch.matmul(x,self.mask).permute(0,2,1)#不同的狀態乘以不同的加權系數,然后將維度變成[1,6,region_num]c1=F.leaky_relu(self.cov1(x1),0.2)#[1,16,region_num]c2=F.leaky_relu(self.cov2(c1),0.2)#[1,32,region_num]c3=F.leaky_relu(self.cov3(c2),0.2)#[1,16,region_num]c4=F.leaky_relu(self.cov4(c3),0.2)#[1,4,region_num]l1=F.leaky_relu(self.lin1(c4.view(x.shape[0],-1)),0.2)#[1,4,region_num]->[1,664]([1,4*161])->[1,128]l2 = self.lin2(l1)#->[1,7]#這里是表示不同的排序原則的“打分”#論文中一共出現了3中不同的排序原則,其中可以任意組合,所以有2^3-1=7種(不能都不選)return l23 Net/Anet.py
actor 部分
import torch from torch import nn,optim import torch.nn.functional as Fclass Anet(nn.Module):def __init__(self):super(Anet,self).__init__()self.cov1=nn.Conv1d(in_channels=6,out_channels=16,kernel_size=5,stride=1,padding=2,bias=True)#這里池化層的輸入維度就是我環境可以看到的狀態的維度:6#分別表示:'''It(已經感染疾病,同時被檢測出疾病的人。)Ih(被送醫治療的感染者)D(死亡人群)E(暴露人群)Iu(已經感染疾病,但是沒有檢測的人。)R(康復人群)順序為:E,Iu,It,Ih,R,D'''self.cov2=nn.Conv1d(in_channels=16,out_channels=32,kernel_size=5,stride=1,padding=2,bias=True)self.cov3=nn.Conv1d(in_channels=32,out_channels=16,kernel_size=5,stride=2,padding=0,bias=True)self.cov4=nn.Conv1d(in_channels=16,out_channels=4,kernel_size=5,stride=2,padding=0,bias=True)#四層卷積層 6->16->32->16->4self.lin1=nn.Linear(in_features=664,out_features=128,bias=True)self.lin2=nn.Linear(in_features=128,out_features=16,bias=True)self.lin3=nn.Linear(in_features=16,out_features=1,bias=True)self.device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')self.mask=torch.tensor([[0.01,0,0,0,0,0],[0,0.05,0,0,0,0],[0,0,0.01,0,0,0],[0,0,0,0.05,0,0],[0,0,0,0,0.01,0],[0,0,0,0,0,0.05]],dtype=torch.float).to(self.device)def forward(self,state):#x是[1,region_num,6]#E,Iu,It,Ih,R,Dx1 = torch.matmul(state, self.mask).permute(0, 2, 1)#不同的狀態乘以不同的加權系數,然后將維度變成[1,6,region_num]c1 = F.leaky_relu(self.cov1(x1), 0.2)#[1,16,region_num]c2 = F.leaky_relu(self.cov2(c1), 0.2)#[1,32,region_num]c3 = F.leaky_relu(self.cov3(c2), 0.2)#[1,16,region_num]c4 = F.leaky_relu(self.cov4(c3), 0.2)#[1,4,region_num]l1 = F.leaky_relu(self.lin1(c4.view(state.shape[0], -1)), 0.2)#[1,4,region_num]->[1,664]([1,4*161])->[1,128]l2 = F.leaky_relu(self.lin2(l1), 0.2)#->[1,16]l3 = torch.sigmoid(self.lin3(l2))#->[1,1]經過sigmoid之后,相當于是一個[0,1]之間的數return l34 Net/Cnet.py
critic 網絡
import torch from torch import nn,optim import torch.nn.functional as Fclass Cnet(nn.Module):def __init__(self,region_num):super(Cnet,self).__init__()self.lin_1 = nn.Linear(in_features=1,out_features=128,bias=True)self.lin_2 = nn.Linear(in_features=128,out_features=256,bias=True)self.lin_3 = nn.Linear(in_features=256,out_features=region_num,bias=True)self.cov1=nn.Conv1d(in_channels=7,out_channels=16,kernel_size=5,stride=1,padding=2,bias=True)self.cov2=nn.Conv1d(in_channels=16,out_channels=32,kernel_size=5,stride=1,padding=2,bias=True)self.cov3=nn.Conv1d(in_channels=32,out_channels=16,kernel_size=5,stride=2,padding=0,bias=True)self.cov4=nn.Conv1d(in_channels=16,out_channels=4,kernel_size=5,stride=2,padding=0,bias=True)self.lin1=nn.Linear(in_features=664,out_features=128,bias=True)self.lin2=nn.Linear(in_features=128,out_features=16,bias=True)self.lin3=nn.Linear(in_features=16,out_features=1,bias=True)self.device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')self.mask=torch.tensor([[0.01,0,0,0,0,0],[0,0.05,0,0,0,0],[0,0,0.01,0,0,0],[0,0,0,0.05,0,0],[0,0,0,0,0.01,0],[0,0,0,0,0,0.05]],dtype=torch.float).to(self.device)def forward(self,state,perc):#state是當前各狀態[batch_size,region_num,6]#perc是滿足因子 [batch_size,1]a1 = F.leaky_relu(self.lin_1(perc), 0.2)#[batch_size,1]——>[batch_size,128]a2 = F.leaky_relu(self.lin_2(a1), 0.2)#[batch_size,128]——>[batch_size,256]a3 = F.leaky_relu(self.lin_3(a2), 0.2)#[batch_size,256]——>[batch_size,num_region]x1 = torch.cat((torch.matmul(state, self.mask), a3.unsqueeze(2)), dim=-1).permute(0, 2, 1)#在permute之前,[batch_size,region_num,7]#[batch_size,7,region_num]c1 = F.leaky_relu(self.cov1(x1), 0.2)#[batch_size,7,region_num]->[batch_size,16,region_num]c2 = F.leaky_relu(self.cov2(c1), 0.2)#[batch_size,16,region_num]->[batch_size,32,region_num]c3 = F.leaky_relu(self.cov3(c2), 0.2)#[batch_size,32,region_num]->[batch_size,16,region_num]c4 = F.leaky_relu(self.cov4(c3), 0.2)#[batch_size,16,region_num]->[batch_size,4,region_num]l1 = F.leaky_relu(self.lin1(c4.view(state.shape[0], -1)), 0.2)#c4.view(state.shape[0], -1):[batch_size,664]#[batch_size,664]->[batch_size,128]l2 = F.leaky_relu(self.lin2(l1), 0.2)#[batch_size,128]->[batch_size,16]l3 = self.lin3(l2)#[batch_size,16]->[batch_size,1]return l35 simulator.py
import numpy as np import json import random import os from node import node5.1 __init__
class simulator(object):def __init__(self,city='city_sample'):super(simulator, self).__init__()self.city = city#需要考慮的城市名稱#load dataprint('Loading dynamic pattern...')with open(os.path.join('../data',self.city,'prob.json'), 'r') as f:self.prob = np.array(json.load(f))#由于提供的代碼里面沒有這個文件,我猜測是各區域各狀態轉移比例print('Down!')print('Loading population data...')with open(os.path.join('../data',self.city,'flow.json'), 'r') as f:self.flow = np.array(json.load(f))#由于提供的代碼里面沒有這個文件,我猜測是各區域人流量with open(os.path.join('../data',self.city,'dense.json'), 'r') as f:self.dense = np.array(json.load(f))#由于提供的代碼里面沒有這個文件,我猜測是各區域人口密度with open(os.path.join('../data',self.city,'pop_region.json'), 'r') as f:self.pop = np.array(json.load(f))##由于提供的代碼里面沒有這個文件,我猜測是各區域最佳分配的數量(上限)print('Down!')self.START = 1self.region_num = len(self.flow)#區域數self.nodes = list()self.full = Falseself.parameters = [[1, 0, 0], [0, 1, 0], [0, 0, 1], [1, 1, 0], [1, 0, 1], [0, 1, 1], [1, 1, 1]]#三種排序原則,至少選擇一種,不同的排序方式(1表示選,0表示不選)5.2 reset
def reset(self, state):# state是一個region_num*8的二維數組#初始狀態下各區域各狀態的人數print('\nReseting...')self.nodes = list()for i in range(self.region_num):self.nodes.append(node(i))#初始化id為i的點,初始化的時候8狀態為0self.nodes[i].set_susceptible(state[i][0])#設置i點易感人群數量(S)self.nodes[i].set_latent(state[i][1])#設置i點潛伏人群數量(E)self.nodes[i].set_infected_ut(state[i][2])#設置i點已經感染,但是沒有檢測的人的數量(Iu)self.nodes[i].set_infected_t(state[i][3])#設置i點已經感染疾病,但是同時被檢測出疾病的人的數量(It)self.nodes[i].set_infected_asymptomatic(state[i][4])#設置i點已經感染疾病,但是沒有測出陽性的人(無癥狀感染者)的數量(Ia)self.nodes[i].set_in_hospital(state[i][5])#設置i點已經感染疾病,且送醫治療的人(Ih)self.nodes[i].set_recovered(state[i][6])#設置i點康復人群的數量(R)self.nodes[i].set_death(state[i][7])#設置i點死亡人數的數量print('Down!')5.3 simulate
各狀態轉移量模擬
def simulate(self,sim_type, #分配口罩的方案interval, #間隔多久更新一次mask_on=False, #是否戴maskmask_total=0, #整體口罩數mask_quality=0,mask_lasting=48, #口罩有效期mask_action=0, mask_distribute_perc=[0], #口罩滿足系數bed_satisfy_perc=1,mask_satisfy_perc=1,it_move=False,is_save=False,save_time=0):print('Simulating...')#temp variablesS_temp = np.zeros((self.region_num, self.region_num + 1))L_temp = np.zeros((self.region_num, self.region_num + 1))Iut_temp = np.zeros((self.region_num, self.region_num + 1))if it_move:It_temp = np.zeros((self.region_num, self.region_num + 1))Ia_temp = np.zeros((self.region_num, self.region_num + 1))R_temp = np.zeros((self.region_num, self.region_num + 1))#不同的各狀態之間region之間的移動比例Ih_new = 0for time in range(interval):#maskif (time % mask_lasting == 0):#如果口罩有效期過了,那么就要重新分配口罩mask_numbers = np.zeros(self.region_num)#各區域口罩分配情況if mask_on:mask_num = mask_totalif sim_type == 'Mean':mask_numbers = np.ones(self.region_num) * int(mask_num / self.region_num)#如果選擇平均分配的話,那么每個區域均分口罩數量elif sim_type == 'Lottery':order = np.array(range(self.region_num))np.random.shuffle(order)#隨機排序區域id,按照這個排序從前到后分配口罩(排在前面的盡可能分配滿)for i in range(self.region_num):if mask_num > 0:mask_numbers[order[i]] = min(mask_num, self.pop[order[i]])#self.pop[order[i]]——該區域需要分配的數量mask_num = mask_num - mask_numbers[order[i]]else:breakelse:patient_num = np.zeros(self.region_num)patient_num_order = np.zeros(self.region_num)for i in range(self.region_num):patient_num[i] = self.nodes[i].infected_t#已經感染疾病,但是同時被檢測出疾病的人的數量(It)patient_num_order[i] = self.nodes[i].infected_t + self.nodes[i].in_hospital + self.nodes[i].recovered + self.nodes[i].death'''已經感染疾病,但是同時被檢測出疾病的人的數量(It)已經感染疾病,且送醫治療的人(Ih)康復人群的數量(R)死亡人數的數量(D)這四個的和,也即累計感染人數'''if sim_type == 'Max_first':order = np.argsort(-patient_num_order)#根據累積感染人數排序(累計感染越多的越先)for i in range(self.region_num):if mask_num > 0:mask_numbers[order[i]] = min(mask_num, self.pop[order[i]])#self.pop[order[i]]——該區域需要分配的數量mask_num = mask_num - mask_numbers[order[i]]else:break#排序后,前面的區域要盡量滿足elif sim_type == 'Min_first':order = np.argsort(patient_num_order)##根據累積感染人數排序(累計感染越少的越先)for i in range(self.region_num):if mask_num > 0:mask_numbers[order[i]] = min(mask_num, self.pop[order[i]])mask_num = mask_num - mask_numbers[order[i]]else:break#排序后,前面的區域要盡量滿足elif sim_type == 'Policy':mask_parameter = self.parameters[mask_action]#mask_parameter 返回的是一個0~7的數,對應的就是我們選擇哪種排序方式infect_num = patient_num_order/np.mean(patient_num_order)order = np.argsort(-(mask_parameter[0]*infect_num+ mask_parameter[1]*self.flow+ mask_parameter[2]*self.dense))#根據我們選擇的排序方式,決定我們是否需要感染人數、人口密度、區域人口流動強度alloc_mask = np.floor(mask_num*np.array(mask_distribute_perc))#每個區域可以分配到的口罩數量(bed_distribute_perc就是不同區域床位的滿足因子)for i in range(len(mask_distribute_perc)):mask_numbers[order[i]] = min(self.pop[order[i]], alloc_mask[i])mask_num -= mask_numbers[order[i]]#每個區域分配口罩的數量if mask_num > 0:#如果還有口罩,那么按照各區域滿足因子從大到小的順序排序,前面的優先滿足temp_order = np.argsort(-np.array(mask_distribute_perc))for i in range(len(mask_distribute_perc)):temp=min(mask_num,self.pop[order[temp_order[i]]]-mask_numbers[order[temp_order[i]]])#排序在前面的組,按照可以滿足的口罩書填滿mask_num = mask_num - tempmask_numbers[order[temp_order[i]]] += tempif mask_num == 0:breakwhile mask_num > 0:index = np.random.randint(0, self.region_num)if mask_numbers[index] < self.pop[index]:mask_numbers[index] += 1mask_num -= 1else:break#如果還有多的口罩,那么就隨機分配elif sim_type == 'Policy_a':mask_parameter = self.parameters[mask_action]#mask_parameter 返回的是一個0~7的數,對應的就是我們選擇哪種排序方式infect_num = patient_num_order/np.mean(patient_num_order)order = np.argsort(-(mask_parameter[0] * infect_num+ mask_parameter[1] * self.flow+ mask_parameter[2] * self.dense))#根據我們選擇的排序方式,決定我們是否需要感染人數、人口密度、區域人口流動強度for i in range(self.region_num):if mask_num > 0:mask_numbers[order[i]] = min(mask_num, int(self.pop[order[i]]*mask_satisfy_perc))mask_num = mask_num - mask_numbers[order[i]]else:break#由于此時各組的滿足因子是一樣的,所以我們根據排序,從多到少分配口罩(各區域按照上限分配)#state transitionfor i in range(self.region_num):self.nodes[i].step(mask_numbers[i], mask_quality)#每一個點,更新各個狀態的值#cross region traveling 空間上的遷移for k in range(self.region_num):S_temp[k] = np.random.multinomial(self.nodes[k].susceptible, self.prob[((self.START-1)*48+time) % (7*48)][k])#該區域k有多少狀態為S的人會到其他區域去L_temp[k] = np.random.multinomial(self.nodes[k].latent, self.prob[((self.START-1)*48+time) % (7*48)][k])#該區域k有多少狀態為E的人會到其他區域去Iut_temp[k] = np.random.multinomial(self.nodes[k].infected_ut, self.prob[((self.START - 1)* 48 + time) % (7* 48)][k])#該區域k有多少狀態為Iu的人會到其他區域去#已經感染疾病,但是沒有檢測的人。這些人的出行不受任何限制if it_move:It_temp[k] = np.random.multinomial(self.nodes[k].infected_t, self.prob[((self.START-1)*48+time) % (7*48)][k])#該區域k有多少狀態為It的人會到其他區域去#已經感染疾病,同時被檢測出疾病的人。這些人的出行被限制在某一個區域內Ia_temp[k] = np.random.multinomial(self.nodes[k].infected_asymptomatic, self.prob[((self.START - 1)* 48 + time) % (7* 48)][k])#該區域k有多少狀態為Ia的人會到其他區域去#已經感染疾病,但是疾病監測沒有檢測出陽性的人R_temp[k] = np.random.multinomial(self.nodes[k].recovered, self.prob[((self.START - 1)* 48 + time) % (7* 48)][k])#該區域k有多少狀態為R的人會到其他區域去S_temp_sum0 = np.sum(S_temp, axis=0)L_temp_sum0 = np.sum(L_temp, axis=0)Iut_temp_sum0 = np.sum(Iut_temp, axis=0)if it_move:It_temp_sum0 = np.sum(It_temp, axis=0)Ia_temp_sum0 = np.sum(Ia_temp, axis=0)R_temp_sum0 = np.sum(R_temp, axis=0)#別的區域過來的人數S_temp_sum1 = np.sum(S_temp, axis=1)L_temp_sum1 = np.sum(L_temp, axis=1)Iut_temp_sum1 = np.sum(Iut_temp, axis=1)if it_move:It_temp_sum1 = np.sum(It_temp, axis=1)Ia_temp_sum1 = np.sum(Ia_temp, axis=1)R_temp_sum1 = np.sum(R_temp, axis=1)#到別的區域去的人數for k in range(self.region_num):self.nodes[k].set_susceptible(self.nodes[k].susceptible+S_temp_sum0[k]-S_temp_sum1[k]+S_temp[k][self.region_num])#每個區域S狀態的變化:原先的+過來的-到別的地方去的#過來的和到別的地方去的,各用了一次S_temp[k][self.region_num](自己到自己的),正好抵消了#所以需要再加一次self.nodes[k].set_latent(self.nodes[k].latent+L_temp_sum0[k]-L_temp_sum1[k]+L_temp[k][self.region_num])#每個區域E狀態的變化:原先的+過來的-到別的地方去的#過來的和到別的地方去的,各用了一次S_temp[k][self.region_num](自己到自己的),正好抵消了#所以需要再加一次self.nodes[k].set_infected_ut(self.nodes[k].infected_ut+ Iut_temp_sum0[k]- Iut_temp_sum1[k]+ Iut_temp[k][self.region_num])#每個區域Iut狀態的變化:原先的+過來的-到別的地方去的#過來的和到別的地方去的,各用了一次S_temp[k][self.region_num](自己到自己的),正好抵消了#所以需要再加一次if it_move:self.nodes[k].set_infected_t(self.nodes[k].infected_t+It_temp_sum0[k]-It_temp_sum1[k]+It_temp[k][self.region_num])#每個區域It狀態的變化:原先的+過來的-到別的地方去的#過來的和到別的地方去的,各用了一次S_temp[k][self.region_num](自己到自己的),正好抵消了#所以需要再加一次self.nodes[k].set_infected_asymptomatic(self.nodes[k].infected_asymptomatic+Ia_temp_sum0[k]-Ia_temp_sum1[k]+Ia_temp[k][self.region_num])#每個區域Ia狀態的變化:原先的+過來的-到別的地方去的#過來的和到別的地方去的,各用了一次S_temp[k][self.region_num](自己到自己的),正好抵消了#所以需要再加一次self.nodes[k].set_recovered(self.nodes[k].recovered+ R_temp_sum0[k]- R_temp_sum1[k]+ R_temp[k][self.region_num])#每個區域R狀態的變化:原先的+過來的-到別的地方去的#過來的和到別的地方去的,各用了一次S_temp[k][self.region_num](自己到自己的),正好抵消了#所以需要再加一次if(is_save):save = list()for i in range(self.region_num):temp1 = [self.nodes[i].susceptible, self.nodes[i].latent, self.nodes[i].infected_ut, self.nodes[i].infected_t, self.nodes[i].infected_asymptomatic, self.nodes[i].in_hospital, self.nodes[i].recovered, self.nodes[i].death]save.append(temp1)save = np.array(save)save = save.astype(np.float)with open(os.path.join('../result',self.city,'result_'+str(save_time*interval+time)+'.json'), 'w') as f:json.dump(save.tolist(), f)next_state = list()for i in range(self.region_num):next_state.append([self.nodes[i].susceptible,self.nodes[i].latent,self.nodes[i].infected_ut,self.nodes[i].infected_t,self.nodes[i].infected_asymptomatic,self.nodes[i].in_hospital,self.nodes[i].recovered,self.nodes[i].death])#每個區域的狀態a = self.statistic()print('S:{},L:{},Iut:{},It:{},Ia:{},Ih:{},R:{},D:{}'.format(a[0], a[1], a[2], a[3], a[4], a[5], a[6], a[7]))print('Is Full:{}'.format(self.full))return np.array(next_state), Ih_new6 node.py
import numpy as np Pa = 0.018 # false negative rate eps_1 = 250 # average length of incubation (30min) eps_1_d = 5.2 # average length of incubation (day) d = 0.15 # death rate t = 672 # average recovery time (30min) t_d = 14 # average recovery time (day) d_hospital = 0.04 # death rate in hospital t_hospital = 614 # average recovery time in hospital(30min) t_hospital_d = 12.8 # average recovery time in hospital(day) R_0 = 2.68 # basic reproduction number r_a = 0.6 r_L = 1.0 eps = 1/eps_1 beta = R_0 / (r_L * eps_1_d + (Pa * r_a + (1 - Pa)) * t_d) beta = np.power(1+beta, 1/48)-1 L_I = eps * (1 - Pa) L_Ia = eps * Pa I_D = d / t I_R = ((1 - d)/(t_d - d))/48theta = 48 # testing speed (30min) I_h = 1/theta Ia_R = 1 / t Ih_D = d_hospital / t_hospital Ih_R = ((1 - d_hospital) / (t_hospital_d - d_hospital))/48class node:def __init__(self, id):self.id = idself.susceptible = 0self.infected_ut = 0self.infected_t = 0self.death = 0self.in_hospital = 0self.infected_asymptomatic = 0self.recovered = 0self.latent = 0def set_susceptible(self, susceptible):self.susceptible = susceptibledef set_latent(self, latent):self.latent = latentdef set_infected_ut(self, infected_ut):self.infected_ut = infected_utdef set_infected_t(self, infected_t):self.infected_t = infected_tdef set_infected_asymptomatic(self, infected_asymptomatic):self.infected_asymptomatic = infected_asymptomaticdef set_death(self, death):self.death = deathdef set_in_hospital(self, in_hospital):self.in_hospital = in_hospitaldef set_recovered(self, recovered):self.recovered = recovereddef step(self,mask_number,mask_quality):if (self.susceptible + self.latent + self.infected_ut + self.infected_t + self.infected_asymptomatic + self.in_hospital + self.recovered > 0):#除去了死亡狀態,其他狀態人數和大于0(換句話說,就是人沒有全死光)mask_factor = 1 - np.clip(mask_number / (self.susceptible +self.latent +self.infected_ut +self.infected_t +self.infected_asymptomatic +self.in_hospital +self.recovered),0, 1) * mask_quality#除的這一串相當于是這個node目前有的口罩數/目前存活的人#也即論文中說的口罩覆蓋率Π(t)#也就是這一串是論文中的(1-Π(t)γ)lambda_j = ((self.infected_ut +self.infected_t +self.infected_asymptomatic * r_a +self.latent * r_L) /(self.susceptible +self.latent +self.infected_ut + self.infected_t +self.infected_asymptomatic +self.in_hospital +self.recovered)) * beta * mask_factor#前面除的那一串是該node感染人數/該node總人數 * β * (1-Π(t)γ)susceptible_to_latent, __ = np.random.multinomial(self.susceptible, [lambda_j, 1])#相當于是從S 狀態到E 狀態的人數#我們原先一共有self.susceptible個S狀態的人,每個人都有lambda_j的概率變成E狀態#于是我們就用多次二項分布的方法,判斷每一個個體是否會變成E狀態self.susceptible -= susceptible_to_latentself.latent += susceptible_to_latent#S狀態增加susceptible_to_latent個人,E狀態減少susceptible_to_latent個人latent_to_infected, latent_to_Ia, __ = np.random.multinomial(self.latent, [L_I, L_Ia, 1])self.infected_ut += latent_to_infectedself.infected_asymptomatic += latent_to_Iaself.latent -= (latent_to_Ia + latent_to_infected)#從E轉換到Iu和Ia的人數prob = I_hinfected_ut_to_t, __ = np.random.multinomial(self.infected_ut, [prob, 1])self.infected_ut -= infected_ut_to_tself.infected_t += infected_ut_to_t#從Iu轉換到Ia的人數infected_to_death, infected_to_recovered, __ = np.random.multinomial(self.infected_ut, [I_D, I_R, 1])self.death += infected_to_deathself.recovered += infected_to_recoveredself.infected_ut -= (infected_to_death + infected_to_recovered)#從Iu狀態(已經感染疾病,但是沒有檢測的人)到死亡/康復的人infected_to_death, infected_to_recovered, __ = np.random.multinomial(self.infected_t, [I_D, I_R, 1])self.death += infected_to_deathself.recovered += infected_to_recoveredself.infected_t -= (infected_to_death + infected_to_recovered)#從It狀態(已經感染疾病,同時被檢測出疾病的人)到死亡/康復的人Ia_to_recovered, __ = np.random.multinomial(self.infected_asymptomatic, [Ia_R, 1])self.recovered += Ia_to_recoveredself.infected_asymptomatic -= Ia_to_recovered#從Ia狀態(已經感染疾病,但是疾病監測沒有檢測出陽性的人)【無癥狀感染者】到康復的人數in_hospital_to_death, in_hospital_to_recovered, __ = np.random.multinomial(self.in_hospital, [Ih_D, Ih_R, 1])self.death += in_hospital_to_deathself.recovered += in_hospital_to_recoveredself.in_hospital -= (in_hospital_to_death + in_hospital_to_recovered)#從Ih狀態(送醫治療的人)到康復/死亡的人總結
以上是生活随笔為你收集整理的论文代码解读 Hierarchical Reinforcement Learning for Scarce Medical Resource Allocation的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: statemodels 笔记: lowe
- 下一篇: statsmodels 笔记:seaso