In this article we look at another classical problem, the car racing one.

The notebook can be run on an Ubuntu computer with the following conda environment:

conda create --name car-racing python==3.7 --no-default-packages -y
conda activate car-racing
sudo apt-get install xvfb
sudo apt-get install freeglut3-dev
pip install gym[box2d] torch jupyterlab pyvirtualdisplay matplotlib tensorboard
import gym
from itertools import count
import logging
import numpy as np
import matplotlib.pylab as plt
from matplotlib import animation
import platform
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Beta
from import BatchSampler, SubsetRandomSampler
import time
from collections import deque
# we need this to run on a headless server
if platform.system() != 'Windows':
    from pyvirtualdisplay import Display
    display = Display(visible=0, size=(600, 400)).start() 
    format='[%(asctime)s] %(message)s',
logger = logging.getLogger('pytorch')'Start')
class AnimationWrapper(gym.Wrapper):

    def __init__(self, env):
        self.env = env
    def reset(self):
        state, info = self.env.reset()
        self.frames = [self.env.render()]
        self.rewards = [0]
        return state, info
    def step(self, action):
        next_state, reward, done, truncated, info = self.env.step(action)
        return next_state, reward, done, truncated, info

    def generate(self, filename):
        assert len(self.frames) == len(self.rewards)
        video = np.array(self.frames)
        total_rewards = [0] + np.cumsum(self.rewards).tolist()

        fig, ax = plt.subplots(figsize=(4, 4))
        im = ax.imshow(video[0,:,:,:])
        text = ax.text(30, 60, '', color='red')
        plt.close() # this is required to not display the generated image

        def init():

        def animate(i):
            text.set_text(f'Step {i}, total reward: {total_rewards[i]:.2f}')
            return im

        anim = animation.FuncAnimation(fig, animate, init_func=init, frames=video.shape[0],
                                    interval=100), writer='pillow', dpi=80, fps=24)
env = AnimationWrapper(gym.make("CarRacing-v2", render_mode='rgb_array'))
state = env.reset()

Let’s test the environment with a random policy, limiting the duration to 100 steps.

state, _ = env.reset()
frames = [env.render()]
rewards = [0.0]
for t in count():
    action = env.action_space.sample() 
    state, reward, done, truncated, info = env.step(action)
    # limit to the first 100 steps at most
    if done or truncated or t > 100:

class Net(nn.Module):
    Convolutional Neural Network for PPO

    def __init__(self, img_stack):
        super(Net, self).__init__()
        self.cnn_base = nn.Sequential(  # input shape (4, 96, 96)
            nn.Conv2d(img_stack, 8, kernel_size=4, stride=2),
            nn.ReLU(),  # activation
            nn.Conv2d(8, 16, kernel_size=3, stride=2),  # (8, 47, 47)
            nn.ReLU(),  # activation
            nn.Conv2d(16, 32, kernel_size=3, stride=2),  # (16, 23, 23)
            nn.ReLU(),  # activation
            nn.Conv2d(32, 64, kernel_size=3, stride=2),  # (32, 11, 11)
            nn.ReLU(),  # activation
            nn.Conv2d(64, 128, kernel_size=3, stride=1),  # (64, 5, 5)
            nn.ReLU(),  # activation
            nn.Conv2d(128, 256, kernel_size=3, stride=1),  # (128, 3, 3)
            nn.ReLU(),  # activation
        )  # output shape (256, 1, 1)
        self.v = nn.Sequential(nn.Linear(256, 100), nn.ReLU(), nn.Linear(100, 1))
        self.fc = nn.Sequential(nn.Linear(256, 100), nn.ReLU())
        self.alpha_head = nn.Sequential(nn.Linear(100, 3), nn.Softplus())
        self.beta_head = nn.Sequential(nn.Linear(100, 3), nn.Softplus())

    def _weights_init(m):
        if isinstance(m, nn.Conv2d):
            nn.init.xavier_uniform_(m.weight, gain=nn.init.calculate_gain('relu'))
            nn.init.constant_(m.bias, 0.1)

    def forward(self, x):
        x = self.cnn_base(x)
        x = x.view(-1, 256)
        v = self.v(x)
        x = self.fc(x)
        alpha = self.alpha_head(x) + 1
        beta = self.beta_head(x) + 1

        return (alpha, beta), v
GAMMA = 0.99
MAX_SIZE = 2000 ## CUDA out of mem for max_size=10000
BATCH = 128 
EPS = 0.1
LEARNING_RATE = 0.001 # bettr than 0.005 or 0.002 
transition = np.dtype([('s', np.float64, (IMG_STACK, 96, 96)), 
                       ('a', np.float64, (3,)), ('a_logp', np.float64),
                       ('r', np.float64), ('s_', np.float64, (IMG_STACK, 96, 96))])
class Agent():
    def __init__(self, device):
        self.training_step = 0 = Net(IMG_STACK).double().to(device)
        self.buffer = np.empty(MAX_SIZE, dtype=transition)
        self.counter = 0
        self.device = device
        self.optimizer = optim.Adam(, lr=LEARNING_RATE)  ## lr=1e-3

    def select_action(self, state):
        state = torch.from_numpy(state).double().to(self.device).unsqueeze(0)
        with torch.no_grad():
            alpha, beta =[0]
        dist = Beta(alpha, beta)
        action = dist.sample()
        a_logp = dist.log_prob(action).sum(dim=1)

        action = action.squeeze().cpu().numpy()
        a_logp = a_logp.item()
        return action, a_logp

    def store(self, transition):
        self.buffer[self.counter] = transition
        self.counter += 1
        if self.counter == MAX_SIZE:
            self.counter = 0
            return True
            return False

    def update(self):
        self.training_step += 1

        s = torch.tensor(self.buffer['s'], dtype=torch.double).to(self.device)
        a = torch.tensor(self.buffer['a'], dtype=torch.double).to(self.device)
        r = torch.tensor(self.buffer['r'], dtype=torch.double).to(self.device).view(-1, 1)
        next_s = torch.tensor(self.buffer['s_'], dtype=torch.double).to(self.device)

        old_a_logp = torch.tensor(self.buffer['a_logp'], dtype=torch.double).to(self.device).view(-1, 1)

        with torch.no_grad():
            target_v = r + GAMMA *[1]
            adv = target_v -[1]
            # adv = (adv - adv.mean()) / (adv.std() + 1e-8)

        for _ in range(EPOCH):
            for index in BatchSampler(SubsetRandomSampler(range(MAX_SIZE)), BATCH, False):

                alpha, beta =[index])[0]
                dist = Beta(alpha, beta)
                a_logp = dist.log_prob(a[index]).sum(dim=1, keepdim=True)
                ratio = torch.exp(a_logp - old_a_logp[index])

                surr1 = ratio * adv[index]
                # clipped function
                surr2 = torch.clamp(ratio, 1.0 - EPS, 1.0 + EPS) * adv[index]
                action_loss = -torch.min(surr1, surr2).mean()
                value_loss = F.smooth_l1_loss([index])[1], target_v[index])
                loss = action_loss + 2. * value_loss

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print('device: ', device)
device:  cpu
def rgb2gray(rgb, norm=True):
        # rgb image -> gray [0, 1]
    gray =[..., :], [0.299, 0.587, 0.114])
    if norm:
        # normalize
        gray = gray / 128. - 1.
    return gray
frame, _, _, _, _ = env.step(np.array([2., 1., 1.]))
img_gray = rgb2gray(frame)

fig, (ax0, ax1) = plt.subplots(ncols=2, nrows=1, figsize=(10, 5))
ax0.set_title('original image')
ax1.imshow(img_gray, cmap='Greys')
ax1.set_title('preprocessed image')


class ObservationWrapper():

    def __init__(self, env):
        self.env = env  

    def reset(self):
        self.counter = 0
        self.av_r = self.reward_memory()

        self.die = False
        img_rgb, _ = self.env.reset()
        img_gray = rgb2gray(img_rgb)
        self.stack = [img_gray] * IMG_STACK  # four frames for decision
        return np.array(self.stack), None

    def step(self, action):
        total_reward = 0
        for i in range(ACTION_REPEAT):
            img_rgb, reward, die, truncated, _ = self.env.step(action)
            if truncated: die = True
            # don't penalize "die state"
            if die:
                reward += 100
            # green penalty
            if np.mean(img_rgb[:, :, 1]) > 185.0:
                reward -= 0.05
            total_reward += reward
            # if no reward recently, end the episode
            done = True if self.av_r(reward) <= -0.1 else False
            if done or die:
        img_gray = rgb2gray(img_rgb)
        assert len(self.stack) == IMG_STACK
        return np.array(self.stack), total_reward, done, False, die

    def close(self):
        return self.env.close()

    def reward_memory():
        # record reward for last 100 steps
        count = 0
        length = 100
        history = np.zeros(length)

        def memory(reward):
            nonlocal count
            history[count] = reward
            count = (count + 1) % length
            return np.mean(history)

        return memory
def ppo_train(env, agent, n_episodes, save_every=100):
    scores_deque = deque(maxlen=100)
    scores_array = []
    avg_scores_array = []    

    timestep_after_last_save = 0
    time_start = time.time()

    running_score = 0
    state = env.reset()
    i_lim = 0
    for i_episode in range(n_episodes):
        timestep = 0
        total_reward = 0
        ## score = 0
        state, _ = env.reset()

        while True:
            action, a_logp = agent.select_action(state)
            next_state, reward, done, truncated, die = env.step( 
                action * np.array([2., 1., 1.]) + np.array([-1., 0., 0.]))
            if truncated: done = True

            if, action, a_logp, reward, next_state)):
            total_reward += reward
            state = next_state
            timestep += 1  
            timestep_after_last_save += 1
            if done or die:
        running_score = running_score * 0.99 + total_reward * 0.01


        avg_score = np.mean(scores_deque)
        s = (int)(time.time() - time_start)
        msg = 'Episode: {} {}  score: {:.2f}  avg score: {:.2f}  run score {:.2f}, \
time: {:02}:{:02}:{:02} '\
            .format(i_episode, timestep, \
                    total_reward, avg_score, running_score, s//3600, s%3600//60, s%60)
    return scores_array, avg_scores_array    
agent = Agent(device)
env = ObservationWrapper(gym.make('CarRacing-v2'))


seed = 0 

scores, avg_scores  = ppo_train(env, agent, NUM_EPISODES), '')
Episode: 0 111  score: -22.46  avg score: -22.46  run score -0.22, time: 00:00:07 
Episode: 1 108  score: -17.98  avg score: -20.22  run score -0.40, time: 00:00:15 
Episode: 2 123  score: -23.11  avg score: -21.18  run score -0.63, time: 00:00:23 
Episode: 3 116  score: -17.93  avg score: -20.37  run score -0.80, time: 00:00:31 
Episode: 4 111  score: -17.99  avg score: -19.90  run score -0.97, time: 00:00:39 
Episode: 5 111  score: -18.03  avg score: -19.58  run score -1.14, time: 00:00:46 
Episode: 6 110  score: -17.91  avg score: -19.35  run score -1.31, time: 00:00:54 
Episode: 7 104  score: -18.03  avg score: -19.18  run score -1.48, time: 00:01:01 
Episode: 8 91  score: 11.97  avg score: -15.72  run score -1.35, time: 00:01:07 
Episode: 9 108  score: -17.84  avg score: -15.93  run score -1.51, time: 00:01:14 
Episode: 10 125  score: 89.07  avg score: -6.39  run score -0.60, time: 00:01:23 
Episode: 11 125  score: 105.69  avg score: 2.95  run score 0.46, time: 00:01:31 
Episode: 12 117  score: -18.03  avg score: 1.34  run score 0.27, time: 00:01:39 
Episode: 13 111  score: -18.03  avg score: -0.04  run score 0.09, time: 00:01:47 
Episode: 14 125  score: 75.60  avg score: 5.00  run score 0.85, time: 00:01:56 
Episode: 15 107  score: -18.23  avg score: 3.55  run score 0.66, time: 00:02:03 
Episode: 16 117  score: -27.43  avg score: 1.72  run score 0.37, time: 00:02:11 
# agent = Agent(device)
env = ObservationWrapper(AnimationWrapper(gym.make('CarRacing-v2', render_mode='rgb_array')))
state, _ = env.reset()
for t in count():
    action, a_logp = agent.select_action(state)
    next_state, reward, done, truncated, die = env.step( \
               action * np.array([2., 1., 1.]) + np.array([-1., 0., 0.]))
    if done or truncated or t > 1_000:
    state = next_state
print(f"# steps: {t}")
# steps: 1001