conda create --name pendulum python==3.7 --no-default-packages -y
conda activate pendulum
sudo apt-get install xvfb
sudo apt-get install freeglut3-dev
pip install gym[classic_control] torch jupyterlab pyvirtualdisplay matplotlib tensorboard
from typing import Tuple
from collections import deque
import gym
from itertools import count
import matplotlib.pylab as plt
import numpy as np
from pathlib import Path
import os
import platform
import random
import torch
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter
# we need this to run on a headless server
if platform.system() != 'Windows':
    from pyvirtualdisplay import Display
    display = Display(visible=0, size=(640, 480)).start() 
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
class DuelingDeepQNetwork(torch.nn.Module):

    def __init__(self, num_inputs, num_actions, num_hidden, name):
        super().__init__()

        self.name = name
        
        # common part
        self.H1 = torch.nn.Linear(num_inputs, num_hidden)
        self.H2 = torch.nn.Linear(num_hidden, num_hidden)

        # first head, the value
        self.V1 = torch.nn.Linear(num_hidden, num_hidden)
        self.V2 = torch.nn.Linear(num_hidden, num_hidden)
        self.value = torch.nn.Linear(num_hidden, 1)

        # second head, the advantage
        self.A1 = torch.nn.Linear(num_hidden, num_hidden)
        self.A2 = torch.nn.Linear(num_hidden, num_hidden)
        self.advantage = torch.nn.Linear(num_hidden, num_actions)

    def forward(self, state) -> torch.Tensor:
        state = F.relu(self.H1(state))
        state = F.relu(self.H2(state))

        value = F.relu(self.V1(state))
        value = F.relu(self.V2(value))
        value = self.value(value)

        adv = F.relu(self.A1(state))
        adv = F.relu(self.A2(adv))
        adv = self.advantage(adv)

        return value + adv - torch.mean(adv, dim=-1, keepdim=True)

    def pick_action(self, state):
        with torch.no_grad():
            Q = self.forward(state)
            action = torch.argmax(Q, dim=-1)
            return action.cpu().numpy()

    def save(self, name):
        torch.save(self.state_dict(), 'model-'+ name + '.pt')

    def load(self, name):
        self.load_state_dict(torch.load('model-' + name + '.pt'))
class ReplayMemory:
    
    def __init__(self, capacity):
        self.capacity = capacity
        self.buffer = []
        self.position = 0

    def add(self, state, action, reward, next_state, done):
        if len(self.buffer) < self.capacity:
            self.buffer.append(None)
        self.buffer[self.position] = (state, action, reward, next_state, done)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        state, action, reward, next_state, done = map(np.stack, zip(*batch))
        return state, action, reward, next_state, done

    def __len__(self):
        return len(self.buffer)
def init_weights(m):
    if type(m) == torch.nn.Linear:
        torch.nn.init.xavier_uniform_(m.weight)
class Agent():
    
    def __init__(self,
                 env,
                 memory_size,
                 batch_size,
                 learning_rate: float = 0.5e-4,
                 gamma: float = 0.99,
                 epsilon: float = 1.0,
                 eps_min: float = 0.01,
                 eps_dec: float = 1e-3,
                 tau: float = 0.001,
                 training: bool = True):

        self.env = env
        self.gamma = gamma
        self.epsilon = epsilon
        self.lr = learning_rate

        self.action_dim = env.action_space.n
        self.input_dim = env.observation_space.shape[0]
        self.batch_size = batch_size

        self.eps_min = eps_min
        self.eps_dec = eps_dec
        self.tau = tau

        self.training = training

        self.memory = ReplayMemory(memory_size)

        self.online_network = DuelingDeepQNetwork(num_inputs=self.input_dim,
                                                  num_actions=self.action_dim,
                                                  num_hidden=128,
                                                  name='OnlinePolicy')
        self.online_network.apply(init_weights)
        self.optimizer = optim.Adam(self.online_network.parameters(), lr=learning_rate)

        self.target_network = DuelingDeepQNetwork(num_inputs=self.input_dim,
                                                  num_actions=self.action_dim,
                                                  num_hidden=128,
                                                  name='TargetPolicy')
        self.target_network.apply(init_weights)

        self.update_networks(tau=1.0)

    def choose_action(self, observation) -> int:
        if self.training:
            if np.random.rand(1) > self.epsilon:
                self.online_network.eval()
                state = torch.as_tensor(observation, dtype=torch.float32, device=device)
                with torch.no_grad():
                    action = self.online_network.pick_action(state)
            else:
                action = self.env.action_space.sample()

            return action
            
        else:
            state = torch.as_tensor(observation, dtype=torch.float32, device=device)
            with torch.no_grad():
                return self.online_network.pick_action(state)

    def store_transition(self, state, action, reward, next_state, done):
        self.memory.add(state, action, reward, next_state, done)

    def update_networks(self, tau) -> None:
        for online_weights, target_weights in zip(self.online_network.parameters(), self.target_network.parameters()):
            target_weights.data.copy_(tau * online_weights.data + (1 - tau) * target_weights.data)

    def epsilon_update(self) -> None:
        '''Decrease epsilon iteratively'''
        if self.epsilon > self.eps_min:
            self.epsilon -= self.eps_dec

    def save_models(self) -> None:
        self.online_network.save()
        self.target_network.save()

    def load_models(self) -> None:
        self.online_network.load()
        self.target_network.load()

    def optimize(self):
        if self.memory.__len__() < self.batch_size:
            return

        states, actions, rewards, next_states, dones = self.memory.sample(self.batch_size)

        states = torch.as_tensor(np.vstack(states), dtype=torch.float32, device=device)
        rewards = torch.as_tensor(np.vstack(rewards), dtype=torch.float32, device=device)
        dones = torch.as_tensor(np.vstack(dones), dtype=torch.float32, device=device)
        actions = torch.as_tensor(np.vstack(actions), dtype=torch.float32, device=device)
        next_states = torch.as_tensor(np.vstack(next_states), dtype=torch.float32, device=device)

        self.online_network.train()
        self.target_network.train()

        with torch.no_grad():
            next_q_values = self.target_network(next_states)
            next_q_values, _ = next_q_values.max(dim=1)
            next_q_values = next_q_values.reshape(-1, 1)
            target_q_values = rewards + (1 - dones) * self.gamma * next_q_values

        current_q_values = self.online_network(states)
        current_q_values = torch.gather(current_q_values, dim=1, index=actions.long())

        # Compute Huber loss (less sensitive to outliers)
        loss = F.huber_loss(current_q_values, target_q_values)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        self.update_networks(tau=self.tau)
writer = SummaryWriter(log_dir=os.path.abspath('tensorboard'))
ENV_NAME = 'MountainCar-v0'
NUM_EPISODES = 5_000
MEMORY_SIZE = 1_000
BATCH_SIZE = 64
LEARNING_RATE = 1e-3
SEED = 42
env = gym.make(ENV_NAME)
env.reset()

total_rewards, averages = [], []

random.seed(SEED)
torch.manual_seed(SEED)

agent = Agent(env=env, memory_size=MEMORY_SIZE * env._max_episode_steps,
              batch_size=BATCH_SIZE, learning_rate=LEARNING_RATE, eps_dec=0.01, eps_min=0.01)

for i in range(NUM_EPISODES):
    total_reward = 0.0
    state, _ = env.reset()
    for t in count():
        action = agent.choose_action(state)
        next_state, reward, done, truncated, _ = env.step(action)
        agent.memory.add(state, action, reward, next_state, done or truncated)
        state = next_state
        total_reward += reward
        agent.optimize()
        if done or truncated:
            agent.epsilon_update()
            break

    total_rewards.append(total_reward)
    average = np.mean(total_rewards[-100:])
    averages.append(average)

    #print(f'episode: {i}, total reward: {total_reward:.3f}, average: {average:.3f}, {agent.epsilon:.3f}', end='\r')

agent.save_models()
writer.close()
plt.plot(total_rewards)
plt.plot(averages)
plt.xlabel('Episode')
plt.ylabel('Total Reward');

png

test_env = gym.make('MountainCar-v0', render_mode='rgb_array')
agent.training = False
rewards = [0.0]

state, _ = test_env.reset()
frames = [test_env.render()]

done = False
state, _ = test_env.reset()

while not done:
    action = agent.choose_action(state)

    next_state, reward, done, truncated, _ = test_env.step(action)
    frames.append(test_env.render())
    if truncated: done = True

    state = next_state
    rewards.append(reward)

test_env.close()
total_rewards = [0] + np.cumsum(rewards).tolist()
# np array with shape (frames, height, width, channels)
video = np.array(frames[:]) 

fig, ax = plt.subplots(figsize=(4, 4))
im = ax.imshow(video[0,:,:,:])
ax.set_axis_off()
text = ax.text(30, 60, '', color='red')
plt.close() # this is required to not display the generated image

def init():
    im.set_data(video[0,:,:,:])

def animate(i):
    im.set_data(video[i,:,:,:])
    text.set_text(f'Step {i}, total reward: {total_rewards[i]:.2f}')
    return im

from matplotlib import animation
anim = animation.FuncAnimation(fig, animate, init_func=init, frames=video.shape[0],
                               interval=100)
anim.save('mountain-car-video.gif', writer='pillow', dpi=80, fps=24)

def predict_value(agent: Agent, state: np.ndarray) -> float:
    state = torch.as_tensor([state], dtype=torch.float32)
    value = agent.online_network(state).detach().numpy()
    return -np.max(value)


x = np.linspace(env.observation_space.low[0], env.observation_space.high[0], num=500)
y = np.linspace(env.observation_space.low[1], env.observation_space.high[1], num=500)
x, y = np.meshgrid(x, y)
z = np.apply_along_axis(lambda _: predict_value(agent, _), 2, np.dstack([x, y]))
z = z[:-1, :-1]
z_min, z_max = z.min(), z.max()
fig, ax = plt.subplots()

ax.pcolormesh(x, y, z, cmap='RdBu', vmin=z_min, vmax=z_max)
ax.axis([x.min(), x.max(), y.min(), y.max()])
ax.set_xlabel('Position')
ax.set_ylabel('Velocity')
ax.set_title("Agent Value Estimation")
fig.colorbar(ax.pcolormesh(x, y, z, cmap='RdBu', vmin=z_min, vmax=z_max));

png