深度强化学习成名作——DQN
前言:其实很早之前就想开始写写深度强化学习(Deep reinforcement learning)了,但是一年前DQN没调出来,没好意思写哈哈,最近呢无意中把打砖块游戏Breakout训练到平均分接近40分,最高分随便上50(虽说也不算太好,但好歹也体现了DRL的优势),于是就写写吧~
提到深度强化学习的成名作,很多人可能会觉得是2016年轰动一时的AlphaGo,从大众来看是这样的,但真正让深度强化学习火起来并获得学术界蹭蹭往上涨关注度的,当属Deep Q-learning Network(DQN),最早见于2013年的论文《Playing Atari with Deep Reinforcement Learning》。
2012年,深度学习刚在ImageNet比赛大获全胜,紧接着DeepMind团队就想到把深度网络与强化学习结合起来,思想是基于强化学习领域很早就出现的值函数逼近(function approximation),但是通过深度神经网络这一神奇的工具,巧妙地解决了状态维数爆炸的问题!
怎么解决的呢?让我们走进DQN,一探究竟。
CNN实现Q(s, a)
如果我们以纯数学的角度来看动作值函数 ,不过就是建立一个从状态空间
到动作空间
的映射,而映射的具体形式是什么,完全可以自己定,只要能够接近真实的最优
就是胜利。于是用CNN完成这种映射的做法应运而生,先上一幅架构图:
DQN
通过gym模块输出Atari环境的游戏,状态空间都是(210, 160, 3),即210*160的图片大小,3个通道,在输入CNN之前需要通过图像处理二值化并缩小成84*84。由于如果将一张图片作为状态输入信息,很多隐藏信息就会忽略(比如球往哪边飞),于是论文中把连续的4帧图片作为状态输入。所以在pytorch中,CNN的输入就是 ,在原论文中与上图所示稍有出入,卷积结构如下:
- 第一层卷积核8*8,stride=4,输出通道为32,ReLU
- 第二层卷积核4*4,stride=2,输出通道为64,ReLU
- 第三层卷积核3*3,stride=1,输出通道为64,ReLU
- 第三层输出经过flat之后维度为3136,然后第四层连接一个512大小的全连接层
- 第五层为动作空间大小的输出层,Breakout游戏中为4,表示每种动作的概率

搭建CNN的py文件在q_model.py中,并手动随机生成torch.randn(32, 4, 84, 84)向量用于测试网络架构的正确性:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import cv2
import gym
import matplotlib.pyplot as plt
class QNetwork(nn.Module):
"""Actor (Policy) Model."""
def __init__(self, state_size, action_size, seed):
"""Initialize parameters and build model.
Params
======
state_size (int): Dimension of each state
action_size (int): Dimension of each action
seed (int): Random seed
"""
super(QNetwork, self).__init__()
self.seed = torch.manual_seed(seed)
"*** YOUR CODE HERE ***"
self.conv = nn.Sequential(
nn.Conv2d(state_size[1], 32, kernel_size=8, stride=4),
nn.ReLU(),
nn.Conv2d(32, 64, kernel_size=4, stride=2),
nn.ReLU(),
nn.Conv2d(64, 64, kernel_size=3, stride=1),
nn.ReLU()
)
self.fc = nn.Sequential(
nn.Linear(64*7*7, 512),
nn.ReLU(),
nn.Linear(512, action_size)
)
def forward(self, state):
"""Build a network that maps state -> action values."""
conv_out = self.conv(state).view(state.size()[0], -1)
return self.fc(conv_out)
def pre_process(observation):
"""Process (210, 160, 3) picture into (1, 84, 84)"""
x_t = cv2.cvtColor(cv2.resize(observation, (84, 84)), cv2.COLOR_BGR2GRAY)
ret, x_t = cv2.threshold(x_t, 1, 255, cv2.THRESH_BINARY)
return np.reshape(x_t, (1, 84, 84)), x_t
def stack_state(processed_obs):
"""Four frames as a state"""
return np.stack((processed_obs, processed_obs, processed_obs, processed_obs), axis=0)
if __name__ == '__main__':
env = gym.make('Breakout-v0')
print('State shape: ', env.observation_space.shape)
print('Number of actions: ', env.action_space.n)
obs = env.reset()
x_t, img = pre_process(obs)
state = stack_state(img)
print(np.shape(state[0]))
# plt.imshow(img, cmap='gray')
# 用cv2模块显示
# cv2.imshow('Breakout', img)
# cv2.waitKey(0)
state = torch.randn(32, 4, 84, 84) # (batch_size, color_channel, img_height,img_width)
state_size = state.size()
cnn_model = QNetwork(state_size, action_size=4, seed=1)
outputs = cnn_model(state)
print(outputs)
完成了解决维数灾难的一步,接下来我们就应该考虑训练的稳定性和效率了,这也是深度学习领域常考虑的问题。而在DQN算法中,作者提出了两大技巧来解决,就是著名的replay buffer和target network,我们一一讨论。
Replay Buffer
replay这个词很形象,在英语中用于影视剧之类的回放;buffer这个词则是计算机里的术语;两个词合起来,形象地体会一下,就是把过去的数据从一个缓存中又拿出来用,这样一用,就比较好地解决了困扰Q-learning算法的样本效率以及相关性问题。从Q-learning的原始公式和算法流程来看,每一次更新Q值的样本都只能用一次,而且在连续获取游戏画面的情景下,状态样本存在极高的相关性。针对这两个问题,如果我们使用一个较大的buffer来储存这些样本,每次随机均匀采样,既能多次使用样本,还能打破样本之间的相关性。
在dqn_agent.py中以ReplayBuffer来实现:
class ReplayBuffer:
"""Fixed-size buffer to store experience tuples."""
def __init__(self, action_size, buffer_size, batch_size, seed):
"""Initialize a ReplayBuffer object.
Params
======
action_size (int): dimension of each action
buffer_size (int): maximum size of buffer
batch_size (int): size of each training batch
seed (int): random seed
"""
self.action_size = action_size
self.memory = deque(maxlen=buffer_size)
self.batch_size = batch_size
self.experience = namedtuple("Experience", field_names=["state", "action", "reward", "next_state", "done"])
self.seed = random.seed(seed)
def add(self, state, action, reward, next_state, done):
"""Add a new experience to memory."""
e = self.experience(state, action, reward, next_state, done)
self.memory.append(e)
def sample(self):
"""Randomly sample a batch of experiences from memory."""
experiences = random.sample(self.memory, k=self.batch_size)
states = torch.from_numpy(np.stack([e.state for e in experiences if e is not None])).float().to(device)
actions = torch.from_numpy(np.vstack([e.action for e in experiences if e is not None])).long().to(device)
rewards = torch.from_numpy(np.vstack([e.reward for e in experiences if e is not None])).float().to(device)
next_states = torch.from_numpy(np.stack([e.next_state for e in experiences if e is not None])).float().to(device)
dones = torch.from_numpy(np.vstack([e.done for e in experiences if e is not None]).astype(np.uint8)).float().to(device)
return (states, actions, rewards, next_states, dones)
def __len__(self):
"""Return the current size of internal memory."""
return len(self.memory)Target Network


假设真实的动作值函数为 ,我们的训练目标是训练一个CNN使
能够逼近它,经过对梯度的求导,感觉上我们希望TD目标值
与
应该是一回事,但实际上由于
是不依赖于参数w的,而TD目标值依赖于参数w,这在数学上是不合理的。也就是说TD目标值本来就是估计值,与旧的估计值作差再去更新
,就感觉像追着一个移动的目标却一直够不着。论文作者提出设置一个target network,而之前与环境交互产生动作的网络称为behavior network,训练开始时二者使用一样的架构和参数,训练过程中每完成一定数目的迭代,behavior network的参数就同步给target network。这是原始论文中提出更新方法,而在笔者的代码中,借鉴了DDPG中soft update法。
import numpy as np
import random
from collections import namedtuple, deque
from q_model import QNetwork
import torch
import torch.nn.functional as F
import torch.optim as optim
BUFFER_SIZE = int(1e6) # replay buffer size
BATCH_SIZE = 32 # minibatch size
GAMMA = 0.99 # discount factor
TAU = 1e-3 # for soft update of target parameters
LR = 1e-5 # learning rate
UPDATE_EVERY = 4 # how often to update the network
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)
class Agent():
"""Interacts with and learns from the environment."""
def __init__(self, state_size, action_size, seed):
"""Initialize an Agent object.
Params
======
state_size (int): dimension of each state
action_size (int): dimension of each action
seed (int): random seed
"""
self.state_size = state_size
self.action_size = action_size
self.seed = random.seed(seed)
# Q-Network
self.qnetwork_local = QNetwork(state_size, action_size, seed).to(device) # behavior network
self.qnetwork_target = QNetwork(state_size, action_size, seed).to(device) # target network
self.optimizer = optim.Adam(self.qnetwork_local.parameters(), lr=LR)
# Replay memory
self.memory = ReplayBuffer(action_size, BUFFER_SIZE, BATCH_SIZE, seed)
# Initialize time step (for updating every UPDATE_EVERY steps)
self.t_step = 0
def step(self, state, action, reward, next_state, done):
# Save experience in replay memory
self.memory.add(state, action, reward, next_state, done)
# Learn every UPDATE_EVERY time steps.
self.t_step = (self.t_step + 1) % UPDATE_EVERY
if self.t_step == 0:
# If enough samples are available in memory, get random subset and learn
if len(self.memory) > BATCH_SIZE:
experiences = self.memory.sample()
self.learn(experiences, GAMMA)
def act(self, state, eps=0.):
"""Returns actions for given state as per current policy.
Params
======
state (array_like): current state
eps (float): epsilon, for epsilon-greedy action selection
"""
state = torch.from_numpy(state).float().unsqueeze(0).to(device)
self.qnetwork_local.eval()
with torch.no_grad():
action_values = self.qnetwork_local(state)
self.qnetwork_local.train()
# Epsilon-greedy action selection
if random.random() > eps:
return np.argmax(action_values.cpu().data.numpy())
else:
return random.choice(np.arange(self.action_size))
def learn(self, experiences, gamma):
"""Update value parameters using given batch of experience tuples.
Params
======
experiences (Tuple[torch.Tensor]): tuple of (s, a, r, s', done) tuples
gamma (float): discount factor
"""
states, actions, rewards, next_states, dones = experiences
## TODO: compute and minimize the loss
# Get max predicted Q values (for next states) from target model
Q_targets_next = self.qnetwork_target(next_states).detach().max(1)[0].unsqueeze(1)
# Compute Q targets for current states
Q_targets = rewards + (gamma * Q_targets_next * (1 - dones))
Q_expected = self.qnetwork_local(states).gather(1, actions) # 固定行号,确认列号
# Compute loss
loss = F.mse_loss(Q_expected, Q_targets)
# Minimize the loss
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# ------------------- update target network ------------------- #
self.soft_update(self.qnetwork_local, self.qnetwork_target, TAU)
def soft_update(self, local_model, target_model, tau):
"""Soft update model parameters.
θ_target = τ*θ_local + (1 - τ)*θ_target
Params
======
local_model (PyTorch model): weights will be copied from
target_model (PyTorch model): weights will be copied to
tau (float): interpolation parameter
"""
for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
target_param.data.copy_(tau*local_param.data + (1.0-tau)*target_param.data)DQN算法流程

我们以打砖块游戏Breakout来测试整套算法流程,整个流程的代码在Deep_Q_network.py中,超参数设置在dqn_agent.py中,其中最为重要的超参数设置是replay buffer的大小、迭代次数、学习率和 衰减率。replay buffer大小至少设置为100万,迭代次数也是越多越好。
import gym
import random
import torch
import numpy as np
from collections import deque
from dqn_agent import Agent
import matplotlib.pyplot as plt
import cv2
import time
env = gym.make('Breakout-v0')
state_size = env.observation_space.shape
action_size = env.action_space.n
print('Original state shape: ', state_size)
print('Number of actions: ', env.action_space.n)
agent = Agent((32, 4, 84, 84), action_size, seed=1) # state size (batch_size, 4 frames, img_height, img_width)
TRAIN = False # train or test flag
def pre_process(observation):
"""Process (210, 160, 3) picture into (1, 84, 84)"""
x_t = cv2.cvtColor(cv2.resize(observation, (84, 84)), cv2.COLOR_BGR2GRAY)
ret, x_t = cv2.threshold(x_t, 1, 255, cv2.THRESH_BINARY)
return x_t
def init_state(processed_obs):
return np.stack((processed_obs, processed_obs, processed_obs, processed_obs), axis=0)
def dqn(n_episodes=30000, max_t=40000, eps_start=1.0, eps_end=0.01, eps_decay=0.9995):
"""Deep Q-Learning.
Params
======
n_episodes (int): maximum number of training episodes
max_t (int): maximum number of timesteps per episode, maximum frames
eps_start (float): starting value of epsilon, for epsilon-greedy action selection
eps_end (float): minimum value of epsilon
eps_decay (float): multiplicative factor (per episode) for decreasing epsilon
"""
scores = [] # list containing scores from each episode
scores_window = deque(maxlen=100) # last 100 scores
eps = eps_start # initialize epsilon
for i_episode in range(1, n_episodes + 1):
obs = env.reset()
obs = pre_process(obs)
state = init_state(obs)
score = 0
for t in range(max_t):
action = agent.act(state, eps)
next_state, reward, done, _ = env.step(action)
# last three frames and current frame as the next state
next_state = np.stack((state[1], state[2], state[3], pre_process(next_state)), axis=0)
agent.step(state, action, reward, next_state, done)
state = next_state
score += reward
if done:
break
scores_window.append(score) # save most recent score
scores.append(score) # save most recent score
eps = max(eps_end, eps_decay * eps) # decrease epsilon
print('\tEpsilon now : {:.2f}'.format(eps))
print('\rEpisode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_window)), end="")
if i_episode % 1000 == 0:
print('\rEpisode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_window)))
print('\rEpisode {}\tThe length of replay buffer now: {}'.format(i_episode, len(agent.memory)))
if np.mean(scores_window) >= 50.0:
print('\nEnvironment solved in {:d} episodes!\tAverage Score: {:.2f}'.format(i_episode - 100,
np.mean(scores_window)))
torch.save(agent.qnetwork_local.state_dict(), 'checkpoint/dqn_checkpoint_solved.pth')
break
torch.save(agent.qnetwork_local.state_dict(), 'checkpoint/dqn_checkpoint_8.pth')
return scores
if __name__ == '__main__':
if TRAIN:
start_time = time.time()
scores = dqn()
print('COST: {} min'.format((time.time() - start_time)/60))
print("Max score:", np.max(scores))
# plot the scores
fig = plt.figure()
ax = fig.add_subplot(111)
plt.plot(np.arange(len(scores)), scores)
plt.ylabel('Score')
plt.xlabel('Episode #')
plt.show()
else:
# load the weights from file
agent.qnetwork_local.load_state_dict(torch.load('checkpoint/dqn_checkpoint_8.pth'))
rewards = []
for i in range(10): # episodes, play ten times
total_reward = 0
obs = env.reset()
obs = pre_process(obs)
state = init_state(obs)
for j in range(10000): # frames, in case stuck in one frame
action = agent.act(state)
env.render()
next_state, reward, done, _ = env.step(action)
state = np.stack((state[1], state[2], state[3], pre_process(next_state)), axis=0)
total_reward += reward
# time.sleep(0.01)
if done:
rewards.append(total_reward)
break
print("Test rewards are:", *rewards)
print("Average reward:", np.mean(rewards))
env.close()在Titan XP显卡运行,分布经过1万次和3万次迭代,统计回报曲线如下两幅图。据图中分析可知,回报曲线波动性较大,但整体趋势是在上升;在三万次游戏中,最高的一次接近350,但这一次结果十分良好的训练并不会对后面造成过多影响,这也是DQN饱受困扰的“灾难性遗忘”问题(catastrophic forgetting)。
最后的最后,上效果视频和GitHub仓库(代码是基于Udacity课程代码改的)!




11 条评论
求问,视频是怎么从gym里面导出的呀
请问这4种动作代表什么意思呢?向左,向右还有什么?
我出来的训练效果比题主差一点 但是也差不多有30分的平均分 最高分差不多40多分 运气好的话能达到80-100分左右 但是相比于其他算法还有相当大的差距 比如A3C 在仅仅三个小时之内就能达到800分左右 楼主知道是为啥吗 为啥差距会这么大
还有一件事想问题主 我看某些博客或者paper上说他们能用DQN玩到几百分以上 但是我的30-40分左右就收敛了并且就开始剧烈的震荡 我想问下这又是啥原因呀
我想问作者两个问题,这两个问题在困扰我:
1、在NatureDQN论文中,Breakout的表现是401.2分,这似乎与您训练出来的得分相差甚远,我也没有复现出接近的得分,请问您知道是什么原因吗?
2、在原文中训练次数的标注是Epoch,这与强化学习中经常说的Episode有什么区别和联系?