A very interesting application of reinforcement learning was presented in this 2013 article, followed in 2015 by another publication on Nature, available for download on the DeepMind website.

What we want to do in this post is to solve the MsPacman-v0 environement using the classical deep Q-Learning algorithm. MsPacman-v0 is one of the Atari 2600 games, and it uses the atari_py package to connect to the Atari emulator. The onservation is the RGB image of the screen, which is an array of size $(210, 160, 3)$. There are at least two versions in Gym: MsPacman-v0, which has as observsation the image as we said, and MsPacman-ram-v0, which returns the 128 bytes of the RAM.

The game is well known and simple to play and understand: the agent plays Ms. PacMan in a maze. There are dots; the goal is to each all the dots. A level is finished when all the dots are eaten. There are also four ghots who try to catch Ms. PacMan; if this happens a life is lost. Four power-up items are found in the corners of the maze, called power pills, which are worth 40 points each. When Ms. PacMan consumes a power-pill all ghosts become edible, i.e. the ghosts turn blue for a short period (15 seconds), they slow down and try to escape from Ms. PacMan. During this time, Ms. PacMan is able to eat them, which is worth 200, 400, 800 and 1600 points, consecutively. The ghosts move following a deterministic strategy, with some randomness added such that there is no simple optimal decision process. The observation space is discrete yet fairly large – more than a thousand positions in the maze, plus the position of the agent, the ghosts, the power pills; besides, the ghosts can be eatable or not.

The code is largely inspired for what is proposed in the book Reinforcemente Learning Hands-On. Let’s start with some imports:

import cv2
import gym
import gym.spaces
import matplotlib.pylab as plt
import numpy as np
import collections
import time

The approach we follow is similar to the one presented in the 2013 paper: images are resized to $84 \times 84$ pixels, and several images are condensed together to generate a state. This is done using Gym wrappers, of which we have a few. The first one takes care of taking a certain number of frames to skip. During these frames the agent repeats the same action, while the observations are stored and glued together using a max operation on arrays.

class MaxAndSkipEnv(gym.Wrapper):
    def __init__(self, env, skip):
        super().__init__(env)
        self._obs_buffer = collections.deque(maxlen=skip)
        self._skip = skip
    
    def step(self, action):
        total_reward = 0.0
        done = None
        for _ in range(self._skip):
            obs, reward, done, info = self.env.step(action)
            self._obs_buffer.append(obs)
            total_reward += reward
            if done:
                break
        max_frame = np.max(np.stack(self._obs_buffer), axis=0)
        return max_frame, total_reward, done, info

    def reset(self):
        self._obs_buffer.clear()
        obs = self.env.reset()
        self._obs_buffer.append(obs)
        return obs

The ProcessFrame84 wrapper resizes the frame to a smaller image. This is done to reduce the computational complexity; the number 84 comes from the paper. The three color layers are dropped and a single gray image is used instead.

class ProcessFrame84(gym.ObservationWrapper):
    def __init__(self, env=None):
        super().__init__(env)
        self.observation_space = gym.spaces.Box(low=0, high=255, shape=(84, 84, 1), dtype=np.uint8)
    
    def observation(self, obs):
        return ProcessFrame84.process(obs)
    
    @staticmethod
    def process(frame):
        assert frame.size == 210 * 160 * 3
        img = np.reshape(frame, [210, 160, 3]).astype(np.float32)
        # convert to a single color
        img = img[:, :, 0] * 0.229 + img[:, :, 1] * 0.587 + img[:, :, 2] * 0.114
        # reduce the image resolution
        resized_screen = cv2.resize(img, (84, 84), interpolation=cv2.INTER_AREA)
        x_t = np.reshape(resized_screen, [84, 84, 1])
        return x_t.astype(np.uint8)

The BufferWrapper is used to extend the observations from images to a mini-video. The idea is that a single image doesn’t convey what is needed to play the video game: speed and direction of movement. Using n_steps observations can hopefully do that. Following the 2013 paper, we use n_steps=4; a larger number would improve the performances by increase the computational cost.

class BufferWrapper(gym.ObservationWrapper):
    def __init__(self, env, n_steps, dtype=np.float32):
        super().__init__(env)
        self.dtype = dtype
        old_space = env.observation_space
        self.observation_space = gym.spaces.Box(
            old_space.low.repeat(n_steps, axis=0),
            old_space.high.repeat(n_steps, axis=0), dtype=dtype)
    
    def reset(self):
        self.buffer = np.zeros_like(self.observation_space.low, dtype=self.dtype)
        return self.observation(self.env.reset())
    
    def observation(self, observation):
        self.buffer[:-1] = self.buffer[1:]
        self.buffer[-1] = observation
        return self.buffer

Finally, we need to convert the image to the PyTorch format and scale the values to be in the $[0, 1]$ range.

class ImageToPyTorch(gym.ObservationWrapper):
    def __init__(self, env):
        super().__init__(env)
        old_shape = self.observation_space.shape
        new_shape = (old_shape[-1], old_shape[0], old_shape[1])
        self.observation_space = gym.spaces.Box(
            low=0.0, high=1.0, shape=new_shape, dtype=np.float32)
    
    def observation(self, observation):
        return np.moveaxis(observation, 2, 0)
class ScaledFloatFrame(gym.ObservationWrapper):
    def observation(self, obs):
        return np.array(obs).astype(np.float32) / 255.0

We put together all the wrappers. The frameskip variable is set to one because we take care of this ourselves; also, we use a smaller action space. The meaning of the actions are NOOP, UP, RIGHT, LEFT, DOWN, UPRIGHT, UPLEFT, DOWNRIGHT, DOWNLEFT and can be obtained using gym.make('MsPacman-v0').get_action_meanings().

def make_env(env_name):
    env = gym.make(env_name, frameskip=1, full_action_space=False)
    env = MaxAndSkipEnv(env, skip=16)
    env = ProcessFrame84(env)
    env = ImageToPyTorch(env)
    env = BufferWrapper(env, n_steps=4)
    return ScaledFloatFrame(env)

We are ready for the interesting part: creating the Deep Q-Learning model.

import torch
import torch.nn as nn
class DQN(nn.Module):
    def __init__(self, input_shape, n_actions):
        super().__init__()
    
        self.conv = nn.Sequential(
            nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=2),
            nn.ReLU())
    
        conv_out_size = self._get_conv_out(input_shape)
        self.fc = nn.Sequential(
            nn.Linear(conv_out_size, 512),
            nn.ReLU(),
            nn.Linear(512, n_actions))
    
    def _get_conv_out(self, shape):
        o = self.conv(torch.zeros(1, *shape))
        return int(np.prod(o.size()))
    
    def forward(self, x):
        conv_out = self.conv(x).view(x.size()[0], -1)
        return self.fc(conv_out)
import torch.optim as optim
ENV_NAME = 'MsPacman-v0'

As we said above, an observation in our case is more of short video, composed of only four frames. One single frame (that is, a static image) wouldn’t suffice to describe the movement and the speed at which the little ghosts chase our agent. Using more than four frames would help, but also slow the training. What we do in the cell below is to use a random agent and perform one hundred steps (each of which is indeed composed by four steps), then visualize the state in four different images.

env = make_env(ENV_NAME)
env.reset()
for _ in range(100):
    observation, reward, _, _ = env.step(env.action_space.sample())
    
fig, axes = plt.subplots(figsize=(12, 4), ncols=4)
for i in range(4):
    axes[i].imshow(observation[i])
    axes[i].set_axis('off')
    axes[i].set_title(f'Frame #{i}')

env.close()
del env

png

As suggested in the paper, it is important to use the experience replay buffer. An entry in the experience replay buffer is implemented in Python using a namedtuple with five fields that map the current state, the action, the reward, a boolean to mark final states, and the new state.

Experience = collections.namedtuple('Experience',
                                    field_names=['state', 'action', 'reward', 'done', 'new_state'])

The ExperienceBuffer class is quite simple: we want to keep the last capacity entries, and we do this using a deque. The sample() method selects a few entry randomly (to reduce the correlation among the samples) and wraps them in NumPy arrays.

class ExperienceBuffer:
    def __init__(self, capacity):
        self.buffer = collections.deque(maxlen=capacity)
    
    def __len__(self):
        return len(self.buffer)
    
    def append(self, experience):
        self.buffer.append(experience)
    
    def sample(self, batch_size):
        indices = np.random.choice(len(self.buffer), batch_size, replace=False)
        states, actions, rewards, dones, next_states = zip(*[self.buffer[idx] for idx in indices])
        return np.array(states), np.array(actions), \
               np.array(rewards, dtype=np.float32), np.array(dones, dtype=np.uint8), \
               np.array(next_states)

The Agent class encapsulates the logic of the agent.

class Agent:
    def __init__(self, env, exp_buffer):
        self.env = env
        self.exp_buffer = exp_buffer
        self._reset()
    
    def _reset(self):
        self.state = self.env.reset()
        self.total_reward = 0.0
        
    @torch.no_grad()
    def play_step(self, net, epsilon=0.0, device):
        done_reward = None

        if np.random.random() < epsilon:
            action = self.env.action_space.sample()
        else:
            state_a = np.array([self.state], copy=False)
            state_v = torch.tensor(state_a).to(device)
            q_vals_v = net(state_v)
            _, act_v = torch.max(q_vals_v, dim=1)
            action = int(act_v.item())
    
        new_state, reward, is_done, _ = self.env.step(action)
        self.total_reward += reward
        
        exp = Experience(self.state, action, reward, is_done, new_state)
        self.exp_buffer.append(exp)
        self.state = new_state
        if is_done:
            done_reward = self.total_reward
            self._reset()
        return done_reward
def calc_loss(batch, net, tgt_net, device):
    states, actions, rewards, dones, next_states = batch

    states_v = torch.tensor(np.array(states, copy=False)).to(device)
    next_states_v = torch.tensor(np.array(next_states, copy=False)).to(device)
    actions_v = torch.tensor(actions, dtype=torch.int64).to(device)
    rewards_v = torch.tensor(rewards).to(device)
    done_mask = torch.BoolTensor(dones).to(device)

    state_action_values = net(states_v).gather(1, actions_v.unsqueeze(-1)).squeeze(-1)
    with torch.no_grad():
        next_state_values = tgt_net(next_states_v).max(1)[0]
        next_state_values[done_mask] = 0.0
        next_state_values = next_state_values.detach()

    expected_state_action_values = next_state_values * GAMMA + rewards_v
    return nn.MSELoss()(state_action_values, expected_state_action_values)

We are now ready to start the training.

GAMMA = 0.99
BATCH_SIZE = 128
REPLAY_SIZE = 10_000
REPLAY_START_SIZE = 10_000
LEARNING_RATE = 1e-4
SYNC_TARGET_FRAMES = 1_000
EPSILON_DECAY_LAST_FRAME = 250_000
EPSILON_START = 1.0
EPSILON_FINAL = 0.01
MAX_ITERATIONS = 2_000_000
device = "cuda"  # or use "cpu" if not available

env = make_env(ENV_NAME)

net = DQN(env.observation_space.shape, env.action_space.n).to(device)
tgt_net = DQN(env.observation_space.shape, env.action_space.n).to(device)

buffer = ExperienceBuffer(REPLAY_SIZE)
agent = Agent(env, buffer)
epsilon = EPSILON_START
optimizer = optim.Adam(net.parameters(), lr=LEARNING_RATE)
total_rewards = []
frame_idx = 0
ts_frame = 0
ts = time.time()
best_m_reward = None

history = []
%%time
while frame_idx < MAX_ITERATIONS:
    frame_idx += 1
    epsilon = max(EPSILON_FINAL, EPSILON_START - frame_idx / EPSILON_DECAY_LAST_FRAME)
    reward = agent.play_step(net, epsilon, device=device)
    if reward is not None:
        total_rewards.append(reward)
        speed = (frame_idx - ts_frame) / (time.time() - ts)
        ts_frame = frame_idx
        ts = time.time()
        m_reward = np.mean(total_rewards[-100:])
        #print(f"{frame_idx:,}: done {len(total_rewards)} games, reward {m_reward:.3f}, "
        #      f"eps {epsilon:.2f}, speed {speed:.2f} f/s")
        history.append((frame_idx, m_reward, epsilon, speed))
        if best_m_reward is None or best_m_reward < m_reward:
            torch.save(net.state_dict(), ENV_NAME + ".dat")
            if best_m_reward is not None:
                print(f"{frame_idx}: best reward updated {best_m_reward:.3f} -> {m_reward:.3f} (eps: {epsilon:.2}, speed: {speed:.2f} f/s)")
            best_m_reward = m_reward

    if len(buffer) < REPLAY_START_SIZE:
        continue

    if frame_idx % SYNC_TARGET_FRAMES == 0:
        tgt_net.load_state_dict(net.state_dict())

    optimizer.zero_grad()
    batch = buffer.sample(BATCH_SIZE)
    loss_t = calc_loss(batch, net, tgt_net, device=device)
    loss_t.backward()
    optimizer.step()

By plotting the history we can see the progress of the agent, which goes from a few hundreds to just above fie thousands.

history_array = np.array(history)
plt.plot(history_array[:, 0], history_array[:, 1])
[<matplotlib.lines.Line2D at 0x18c1dba6f10>]

png

After training it is time to see the model in action.

FPS = 5
env = make_env(ENV_NAME)
net = DQN(env.observation_space.shape, env.action_space.n)
state = torch.load(ENV_NAME + ".dat", map_location=lambda stg, _: stg)
net.load_state_dict(state)
observation = env.reset()
total_reward = 0.0
c = collections.Counter()
frames = [env.render(mode='rgb_array')]

done = False
while not done:
    start_ts = time.time()
    frames.append(env.render(mode='rgb_array'))
    observation_v = torch.tensor(np.array([observation], copy=False))
    q_vals = net(observation_v).data.numpy()[0]
    action = np.argmax(q_vals)
    c[action] += 1
    
    observation, reward, done, _ = env.step(action)
    total_reward += reward
    if done:
        break
    delta = 1 / FPS - (time.time() - start_ts)
    if delta > 0:
        time.sleep(delta)
env.close()
frames = frames[1:]
from matplotlib import pyplot as plt
from matplotlib import animation

# np array with shape (frames, height, width, channels)
video = np.array(frames[:]) 

fig = plt.figure(figsize=(8, 10))
im = plt.imshow(video[0,:,:,:])
plt.axis('off')
plt.close() # this is required to not display the generated image

def init():
    im.set_data(video[0,:,:,:])

def animate(i):
    im.set_data(video[i,:,:,:])
    return im

anim = animation.FuncAnimation(fig, animate, init_func=init, frames=video.shape[0],
                               interval=100)
anim.save('./MsPacman-video.mp4')
from IPython.display import Video

Video('./MsPacman-video.mp4')