Here we explore tabular reinforcement learning, and Q-learning in particular, without using any packages or classes. This only works for simple tabular methods, and it is a good step to do to learn the basic steps and methods of reinforcement learning. The goal of this code is to explore every single implementation detail; once this is done, the OpenAI Gym environments are the fast bet. Tabular methods have limited utility and rarely work, so in the future we will focus on more advanced methods.
The environment we want to solve is a maze, defined as a Cartesian grid of positions containing a |
, +
and +
to indicate a wall where the agent cannot go and a space to indicate a position that can be filled by the agent. The initial position is shown with I
(in). The agent has to move to reach the O
(out) position, at which he has reached his goal.
The maze is defined as a string. The agent cannot exit the grid or otherwise fall out – if he is close to the border, his actions will be limited such that he remains on the grid. We assume we have one starting position and one final position, and all transictions and actions are deterministic.
import copy
import numpy as np
import pandas as pd
import random
from typing import Any, Tuple, List
from abc import ABC, abstractmethod
from matplotlib.pylab import plt
%matplotlib inline
The fist class we define is the Space
class.
class Space(ABC):
"""Generic definition of space. It supports continuous and discrete spaces in arbitrary dimensions.
The elements of the space are tuples."""
@abstractmethod
def num_dimensions(self) -> int:
pass
@abstractmethod
def random_element(self) -> Tuple[Any]:
pass
class DiscreteSpace(Space):
"""A space with only one dimension and a finite number of elements. Each element of the space has an associated rank,
whic is a number between 0 and n - 1, where n is the number of elements in the space. The element order is not important."""
def num_dimensions(self) -> int:
return 1
@abstractmethod
def size(self) -> int:
"""Returns the size of the space, that is the number of elements contains into it."""
pass
@abstractmethod
def rank(self, element: Tuple[Any]) -> int:
"Returns a rank from 0 to size - 1 for the given element."
pass
@abstractmethod
def is_valid(self, element: Tuple[Any]) -> bool:
"Returns True if the element is a valid element of the space or False otherwise"
pass
@abstractmethod
def element(self, rank: int) -> Tuple[Any]:
"Returns the element corresponding to the given rank."
pass
@abstractmethod
def all_elements(self) -> List[Any]:
pass
class DoubleIdentityDiscreteSpace(DiscreteSpace):
"""A trivial implementation to be used when the elements are already the range from 0 to n-1.
In this case we know that the rank of element i is i itself, and vice-versa."""
def __init__(self, nx: int, ny: int):
assert nx > 0 and ny > 0, "input parameters 'nx' and 'ny' must be positive"
self.nx = nx
self.ny = ny
self.n = nx * ny
def size(self) -> int:
return self.n
def random_element(self) -> Tuple[Any]:
"We select a random element with uniform probability."
rank = np.random.choice(self.n)
return self.element(rank)
def rank(self, element: Tuple[Any]) -> int:
assert self.is_valid(element), "invalid input element"
i, j = element
return i + (j - 1) * self.nx
def is_valid(self, element: Tuple[Any]) -> bool:
i, j, = element
assert isinstance(i, int) and isinstance(j, int), "input element i and j must be an integer"
if i < 0 or i >= self.nx:
return False
if j < 0 or j >= self.ny:
return False
return True
def element(self, rank: int) -> Tuple[Any]:
i = rank % self.nx
j = rank // self.ny
return (i, j)
def all_elements(self) -> List[Any]:
retval = []
for rank in range(self.size()):
retval.append(self.element(rank))
return retval
class DictDiscreteSpace(DiscreteSpace):
"""Implementation of the DiscreteSpace class that uses dictionaries to move from elements to ranks and viceversa."""
def __init__(self, elements):
self.elements = elements
self.element_to_rank = {}
self.rank_to_element = {}
for rank, element in enumerate(elements):
self.element_to_rank[element] = rank
self.rank_to_element[rank] = element
self.n = len(elements)
def size(self) -> int:
return self.n
def random_element(self) -> Tuple[Any]:
"We select a random element with uniform probability."
random_rank = np.random.choice(self.n)
random_element = self.element(random_rank)
return random_element
def rank(self, element: Tuple[Any]) -> int:
e, = element
return self.element_to_rank[e]
def is_valid(self, element: Tuple[Any]) -> bool:
e, = element
return e in self.elements
def element(self, rank: int) -> Tuple[Any]:
return self.rank_to_element[rank],
def all_elements(self) -> List[Any]:
retval = []
for rank in range(self.size()):
retval.append(self.element(rank))
return retval
class Environment(ABC):
@abstractmethod
def state_space(self) -> Space:
"""Returns the state space."""
pass
@abstractmethod
def action_space(self) -> Space:
"""Returns the action space."""
pass
@abstractmethod
def initialize(self) -> None:
"""Initializes the environment; to be called before each episode."""
pass
@abstractmethod
def initial_state(self) -> Tuple[Any]:
"""Returns the rank of the initial state."""
pass
@abstractmethod
def step(self, state: Tuple[Any], action: Tuple[Any]) -> (Tuple[Any], float, bool, Any):
"""For the given current state and action, it returns the new state, the reward and a boolean
that is True if the next state is final or zero otherwise, as well as a generic info object that can be used
for debugging."""
pass
class LearningRate(ABC):
"""Class that defines the learning rate, also called $\alpha$ in the literature."""
@abstractmethod
def initialize(self, state_space: DiscreteSpace, action_space: DiscreteSpace) -> None:
"""Initializes the object for the given state and action spaces."""
pass
@abstractmethod
def __call__(self, time_step: int, state_rank: int, action_rank: int) -> float:
"""Returns the learning rate for the given episode, state and action ranks."""
pass
class ConstantLearningRate(LearningRate):
"A constant learning rate."
def __init__(self, alpha):
self.alpha = alpha
def initialize(self, state_space: Space, action_space: Space) -> None:
pass
def __call__(self, time_step: int, state_rank: int, action_rank: int) -> float:
return self.alpha
class OneOverTimeStepLearningRate(LearningRate):
"Learning rate defined as one over the episode."
def __init__(self, omega: float = 1.0):
assert omega > 0.5 and omega <= 1
self.omega = omega
def initialize(self, state_space: Space, action_space: Space) -> None:
pass
def __call__(self, time_step: int, state_rank: int, action_rank: int) -> float:
return 1.0 / (time_step**self.omega)
class OneOverVisitsLearningRate(LearningRate):
"Learning rate $\alpha defined as 1 over the number of times the state/action pair has been visited, plus one."
def initialize(self, state_space: Space, action_space: Space) -> None:
num_states = state_space.size()
num_actions = action_space.size()
self.visits = np.zeros((num_states, num_actions))
def __call__(self, time_step: int, state_rank: int, action_rank: int) -> float:
self.visits[state_rank, action_rank] += 1
return 1.0 / (self.visits[state_rank, action_rank])
It is typical to use the so-called $\epsilon-$greedy strategy to add some randomness to the algorithm. Given an number $\epsilon \in (0, 1)$, we choose with probability $1-\epsilon$ what currently is the best strategy, and with probability $\epsilon$ we select a random action. By selecting the current best strategy we exploit the policy we currently have; with the random action instead we explore different, and possibly untested, actions. It is paramount to find the good balance between exploration (of uncharted territory) and exploitation (of current knowledge). For that, it is often convenient to use $\epsilon = \epsilon(t)$. At the beginning, the exploration rate is at its highest value, because we don’t know anything much about the environment, so we need to do a lot of exploration by randomly choosing our actions. As the environment is explored, $\epsilon$ gets small, reflecting increased confidence in the estimated Q values.
class ExplorationRate(ABC):
"""Class defining the exploration rate, also called $\epsilon$ in the literature.
An exploration rate of 1 means that we explore 100% of times; an exploration rate of 0% means we never explore."""
@abstractmethod
def __call__(self, episode: int) -> float:
"""Returns the epsilon value for the given episode"""
pass
class ConstantExplorationRate(ExplorationRate):
"A constant value."
def __init__(self, epsilon: float):
assert epsilon > 0.0 and epsilon <= 1.0, "bad epsilon value"
self.epsilon = epsilon
def __call__(self, episode: int) -> float:
return self.epsilon
class ExponentiallyDecayingExplorationRate(ExplorationRate):
"""Exponentially decaying. The initial value is decayed after each episode and floored to a minimum value."""
def __init__(self, max_epsilon: float, min_epsilon: float, decay_rate: float):
assert max_epsilon >= min_epsilon, "input parameter 'max_epsilon' cannot be less than 'min_epsilon'"
assert min_epsilon >= 0, "input parameter 'min_epsilon' cannot be negative"
assert decay_rate >= 0.0, "input parameter 'decay_rate' cannot be negative"
self.min_epsilon = min_epsilon
self.max_epsilon = max_epsilon
self.decay_rate = decay_rate
def __call__(self, episode: int) -> float:
assert episode >= 0
return self.min_epsilon + (self.max_epsilon - self.min_epsilon) * np.exp(-self.decay_rate * episode)
class PureExplorationRate(ExplorationRate):
def __call__(self, episode: int) -> float:
return 1.0
time_steps = range(1, 10000)
exp_epsilon = ExponentiallyDecayingExplorationRate(0.25, 0.05, 1.0 / 1000)
plt.plot(time_steps, list(map(exp_epsilon, time_steps)))
[<matplotlib.lines.Line2D at 0x189d95025b0>]
class ReinforcementLearning(ABC):
"""Abstract interface for all reinforcement learning methods."""
@abstractmethod
def set_environment(self, env: Environment) -> None:
"""Initializes the object with the given environment, learning rate and exploration rate."""
pass
@abstractmethod
def train(self, num_episodes: int, discount: float, params=None) -> None:
"""Trains the model using the specified number of episodes and learning rate."""
pass
@abstractmethod
def optimal_action(self, state: Tuple[Any]) -> Tuple[Any]:
"""Returns the action defined by the optimal policy for the specified state."""
pass
@staticmethod
def get_param(params, key, default_value):
if params is None or key not in params:
return default_value
return params[key]
class Observer(ABC):
@abstractmethod
def observe(self, episode: int, time: int, alpha: float, state: Tuple[Any], action: Tuple[Any], reward: float, is_final: bool, Q) -> None:
pass
class RewardsObserver(Observer):
def __init__(self):
self.rewards = {}
def observe(self, episode: int, time: int, alpha: float, state: Tuple[Any], action: Tuple[Any], reward: float, is_final: bool, Q) -> None:
# in this simplified version we only care about the final step
if episode not in self.rewards:
self.rewards[episode] = []
self.rewards[episode].append(reward)
Tabular-based RL methods.
class TabularReinforcementLearning(ReinforcementLearning):
"""Base class for all tabular-based reinforcement learning methods."""
@abstractmethod
def set_learning_rate(self, learning_rate: LearningRate) -> None:
pass
@abstractmethod
def set_exploration_rate(self, exploration_rate: ExplorationRate) -> None:
pass
@abstractmethod
def set_observer(self, observer) -> None:
pass
@abstractmethod
def optimal_action_rank(self, state_rank: int) -> int:
"""Returns the action rank defined by the optimal policy for the specified state rank."""
pass
We focus on Temporal Difference (TD) methods, of which the most important are SARSA and Q Learning. Since SARSA and Q Learning are very similar, we adopt a semi-virtual class that defines the initial Q matrix in the initialize()
method and also implements the optimal_action()
method.
class MatrixBasedReinforcementLearning(TabularReinforcementLearning):
"""Semi-abstract class that provides common constructor, initialize() method and optimal_action() to Q Learning and SARSA.
It describes the learning methods that require the matrix Q explicitly.
Utility functions are provided as well."""
def __init__(self):
pass
def set_environment(self, env: Environment) -> None:
self.env = env
assert isinstance(env.state_space(), DiscreteSpace), "a discrete state space is needed"
self.num_states = env.state_space().size()
assert isinstance(env.action_space(), DiscreteSpace), "a discrete action space is needed"
self.num_actions = env.action_space().size()
def set_learning_rate(self, learning_rate: LearningRate) -> None:
self.learning_rate = learning_rate
def set_exploration_rate(self, exploration_rate: ExplorationRate) -> None:
self.exploration_rate = exploration_rate
def set_observer(self, observer) -> None:
self.observer = observer
def initialize(self) -> None:
# define a zero Q matrix
self.Q = np.zeros((self.num_states, self.num_actions))
print("We have {} states and {} actions".format(self.num_states, self.num_actions))
# keep track of the number of visits
self.N = np.zeros((self.num_states, self.num_actions))
# for debugging
self.num_explorations = 0
self.num_exploitations = 0
self.epsilons = []
def choose_epsilon(self, time_step: int, state_rank: int) -> float:
eps = self.exploration_rate(time_step)
self.epsilons.append(eps)
return eps
def nanmax(self, values) -> float:
# FIXME -- to be removed
# max value discarding nans
return values[~np.isnan(values)].max()
def argmax(self, values) -> float:
# position of max values
max_value = self.nanmax(values)
for i, item in enumerate(values):
if np.isnan(item): continue
if item == max_value:
return i
raise ValueError("Cannot find maximum")
def select_action(self, epsilon: float, state_rank: int) -> int:
do_exploration = np.random.binomial(1, epsilon) == 1
values = self.Q[state_rank]
if do_exploration:
# with probability epsilon, explore randomly among all choices (all equally probable)
valid_action_ranks = []
for i in range(self.num_actions):
if values[i] > -np.inf:
valid_action_ranks.append(i)
assert len(valid_action_ranks) > 0, "no valid actions!"
action = np.random.choice(valid_action_ranks)
self.num_explorations += 1
else:
# with probability 1 - epsilon, choose the best action. If many actions are the best ones,
# choose randomly amongst those
max_value = self.nanmax(values)
best_action_ranks = []
for i in range(self.num_actions):
if values[i] == max_value:
best_action_ranks.append(i)
action = random.choice(best_action_ranks)
self.num_exploitations += 1
return action
def optimal_action_rank(self, state_rank: int) -> int:
return self.argmax(self.Q[state_rank])
def optimal_action(self, state: Tuple[Any]) -> Tuple[Any]:
state_rank = self.env.state_space().rank(state)
action_rank = self.optimal_action_rank(state_rank)
return self.env.action_space().element(action_rank)
def print_stats(self):
total = self.num_exploitations + self.num_explorations
print("")
print("Exploitation = {:.2f}%, exploration = {:.2f}%".format(100 * self.num_exploitations / total, 100 * self.num_explorations / total))
class QLearning(MatrixBasedReinforcementLearning):
def __init__(self):
pass
def train(self, num_episodes: int, discount: float, params=None) -> None:
max_steps_per_episode = QLearning.get_param(params, 'MaxStepsPerEpisode', 1e6)
print_frequency = QLearning.get_param(params, 'PrintFrequency', 1000)
# initialize the learning rate object
self.learning_rate.initialize(self.env.state_space(), self.env.action_space())
time = 1
# loop over all the episodes
for episode in range(num_episodes):
total_reward = 0.0
self.env.initialize()
state = self.env.initial_state()
state_rank = self.env.state_space().rank(state)
episode_time = 0
# continue till when the episode has reached the end or we have exhausted the allotted time
while True:
epsilon = self.choose_epsilon(episode, state)
# select the action to perform amongst the possible one, with an epsilon-greedy strategy
action_rank = self.select_action(epsilon, state_rank)
action = self.env.action_space().element(action_rank)
# find the next state corresponding to the action selected
next_state, reward, is_final, _ = self.env.step(state, action)
if is_final:
Q_max = 0.0
else:
next_state_rank = self.env.state_space().rank(next_state)
Q_max = self.nanmax(self.Q[next_state_rank])
new_estimate = reward + discount * Q_max
alpha = self.learning_rate(time, state_rank, action_rank)
self.Q[state_rank, action_rank] = (1 - alpha) * self.Q[state_rank, action_rank] + alpha * new_estimate
self.N[state_rank, action_rank] += 1
#
self.observer.observe(episode, episode_time, alpha, state, action, reward, is_final, self.Q)
# total (undiscounted) reward for this episode
total_reward += reward
time += 1
# if final we stop
if is_final:
break
# go to next state unless we have been living too long
if episode_time > max_steps_per_episode:
raise ValueError("episode {} did not finish in {} steps".format(episode, time))
# reset the state for the next iteration
state_rank = next_state_rank
state = next_state
if episode > 0 and episode % print_frequency == 0 and print_frequency != -1:
print("step {} finished".format(episode))
self.print_stats()
class SARSA(MatrixBasedReinforcementLearning):
def __init__(self):
pass
def train(self, num_episodes: int, discount: float, learning_rate: LearningRate, params=None):
max_steps_per_episode = 1e6 if params is None else params['MaxStepsPerEpisode']
# initialize the learning rate object
learning_rate.initialize(self.env.state_space(), self.env.action_space())
retval = ConvergenceInfo()
# global counter of steps
time_step = 0
# loop over all the episodes
for episode in range(num_episodes):
total_reward = 0.0
env.initialize()
state_rank = env.initial_state()
episode_time_step = 0
# first action
epsilon = self.choose_epsilon(episode, state_rank)
# select the action to perform amongst the possible one, with an epsilon-greedy strategy
action_rank = self.select_action(epsilon, state_rank)
# continue till when the episode has reached the end or we have exhausted the allotted time
num_steps = 0
while True:
# find the next state corresponding to the action selected
next_state_rank, reward, is_final = self.env.step(state_rank, action_rank)
# find the best action for the next state
next_epsilon = self.choose_epsilon(episode, next_state_rank)
next_action_rank = self.select_action(next_epsilon, next_state_rank)
# update part
row_mean = self.nanmean(self.Q[next_state_rank])
row_max = self.nanmax(self.Q[next_state_rank])
delta = reward + discount * (epsilon * row_mean + (1 - epsilon) * row_max) - self.Q[state_rank, action_rank]
self.Q[state_rank, action_rank] += self.learning_rate(episode, state_rank, action_rank) * delta
self.N[state_rank, action_rank] += 1
total_reward += reward
if is_final:
retval.append(total_reward, num_steps)
break
# go to next state
time_step += 1
episode_time_step += 1
if episode_time_step > max_steps_per_episode:
raise ValueError("episode {} did not finish in {} steps".format(episode, max_steps_per_episode))
# reset the state for the next iteration
state_rank = next_state_rank
action_rank = next_action_rank
epsilon = next_epsilon
if episode > 0 and episode % 500 == 0:
print("step {} finished".format(episode))
self.print_stats(retval)
return retval
def nanmean(self, values) -> float:
# max value discarding nans
return values[~np.isnan(values)].mean()
This final class is for visualization and debugging. It provides an abstract interface to paint an environment.
class Visualization(ABC):
@abstractmethod
def initialize(self):
"Initializes the visualization to the a blank state."
pass
@abstractmethod
def write(self, state: Tuple[Any], content: Any) -> None:
"Writes the given content at the specified state."
pass
@abstractmethod
def finalize(self):
"Finalizes the visualization and paints to the screen."
pass
Sample problems, from quite small, to medium, to relatively large. The data has been generated from http://www.delorie.com/game-room/mazes/genmaze.cgi
, with the added initial and final positions.
# a trival example (3x3)
data1 = """
+--+--+--+
I |
+ +--+--+
| |
+--+--+ +
| O
+--+--+--+
"""
# a slightly bigger one (10x10)
data2 = """
+--+--+--+--+--+--+--+--+--+--+
I | | |
+ +--+--+ + +--+ + +--+--+
| | | | | |
+ + +--+--+--+ +--+--+--+ +
| | | | | | |
+--+--+ + + + + + + + +
| | | | | | |
+ +--+--+ + +--+--+--+--+ +
| | | | | |
+ + + + +--+--+--+ +--+ +
| | | | | |
+ +--+ + + + +--+--+ +--+
| | | | | |
+ +--+--+--+ +--+--+--+--+ +
| | | | | |
+--+ + + +--+--+ + + + +
| | | | | | | | |
+ +--+ + +--+ + + + + +
| | | | O
+--+--+--+--+--+--+--+--+--+--+
"""
# a challenging one (20x20)
data3 = """
+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+
I | | | | |
+--+ + + +--+ +--+--+ + +--+ +--+--+--+--+--+--+--+ +
| | | | | | | | | |
+ +--+ +--+ + + + +--+--+ +--+--+--+--+--+ + + + +
| | | | | | | | | |
+ + + + + + +--+--+ +--+--+ + +--+ + +--+--+--+ +
| | | | | | | | | | | | |
+--+--+--+ + +--+ +--+--+--+ +--+ + +--+--+ + +--+--+
| | | | | | | | | | |
+ + + +--+--+--+ + + +--+--+ + + + + + +--+--+ +
| | | | | | | | | | |
+ +--+--+ +--+ +--+ +--+--+ +--+--+--+--+--+--+--+--+--+
| | | | | | | | |
+ +--+--+--+ + +--+--+ +--+--+ +--+--+ + +--+--+ + +
| | | | | | | | | | | | | |
+ + +--+ + + + + +--+--+ + + + + + +--+ + + +
| | | | | | | | | | | |
+ +--+ +--+--+ + +--+--+ +--+ +--+--+--+--+--+--+ + +
| | | | | | | | | | | | |
+ + +--+--+--+--+ + + + + + + + + +--+ + + + +
| | | | | | | | | | | | | |
+ +--+--+ + + +--+ + + +--+ +--+ + + +--+ +--+--+
| | | | | | | | | | | | |
+--+--+--+--+ + + + + +--+--+--+--+ + + + + +--+ +
| | | | | | | | | | | |
+ + +--+ + +--+--+--+--+ + + + +--+ + + + + +--+
| | | | | | | | | | | | |
+--+--+ +--+ + +--+ + +--+--+ + +--+--+--+--+--+ + +
| | | | | | | | |
+ +--+--+ +--+--+ +--+ +--+--+--+--+ + + +--+ +--+ +
| | | | | | | | | | |
+ +--+ +--+ + +--+--+--+ + + +--+--+ + + +--+ + +
| | | | | | | | | | | |
+ + + + + +--+--+ +--+--+--+--+ + + +--+--+--+--+--+
| | | | | | | | | | | | |
+ + + + + + + + + +--+--+--+ + +--+--+--+ + + +
| | | | | | | | | | | |
+ +--+ +--+--+ + +--+--+--+--+ + +--+--+--+--+ +--+ +
| | | | O
+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+
"""
# which problem we want to solve
data = data2
class MazeEnvironment(Environment):
"""
Wrapper of the world grid.
A state is a two-element tuple with the current row and the column entry.
An action is a character giving the direction, which is one of north, east, south and west.
"""
NORTH = 'N'
SOUTH = 'S'
EAST = 'E'
WEST = 'W'
INITIAL = 'I'
FINAL = 'O'
WALLS = ['+', '|', '-']
HOLE = 'H'
EMPTY = ' '
def __init__(self, data):
self.world = self._build_world(data)
self.num_rows = self.world.shape[0]
self.num_cols = self.world.shape[1]
self.states = DoubleIdentityDiscreteSpace(nx=self.num_rows, ny=self.num_cols)
# all possible actions
all_actions = [self.NORTH, self.EAST, self.SOUTH, self.WEST]
self.actions = DictDiscreteSpace(all_actions)
self._initial_state = self._get_initial_state()
def state_space(self) -> Space:
return self.states
def action_space(self) -> Space:
return self.actions
def initialize(self) -> None:
pass
def initial_state(self) -> int:
return self._initial_state
def is_final_state(self, state_rank) -> bool:
state = self.states.element(state_rank)
cell = self.world[state]
return cell in [FINAL, HOLE]
def step(self, state: Tuple[Any], action_tuple: Tuple[Any]) -> (int, float, bool, None):
row, col = state
cell = self.world[row, col]
action, = action_tuple
assert cell in [self.EMPTY, self.INITIAL], "bad input state" + cell
# get the location on the grid
if action == self.WEST:
new_state = (row, col - 1)
if action == self.EAST:
new_state = (row, col + 1)
if action == self.NORTH:
new_state = (row - 1, col)
if action == self.SOUTH:
new_state = (row + 1, col)
# all invalid actions bring the agent back to the initial position, with a hefty negative reward
if new_state[0] < 0 or new_state[0] >= self.world.shape[0] or new_state[1] < 0 or new_state[1] >= self.world.shape[1]:
new_state = state
reward = -np.inf
is_final = True
else:
new_cell = self.world[new_state]
if new_cell in self.WALLS:
new_state = None
reward = -np.inf
is_final = True
elif new_cell == self.FINAL:
reward = 0
is_final = True
elif new_cell == self.HOLE:
reward = -100.0
is_final = True
else:
reward = -1
is_final = False
return new_state, reward, is_final, None
def new_world(self):
return copy.deepcopy(self.world)
def _build_world(self, data):
"""Given the input as a strings, it builds the matrix with the world description.
The world is a matrix, with each entry defined as (row, column). The top left element
is (0, 0), the top right (0, num_cols-1), and so on.
Going north means moving one row up (so decreasing the row counter), moving east
means moving one more column.
"""
assert len([k for k in data if k == self.INITIAL]) == 1, "no initial position found or more than one"
assert len([k for k in data if k == self.FINAL]) > 0, "no final position found"
world = []
for row in data.split('\n'):
if len(row) == 0: continue
world.append([x for x in row])
lengths = map(len, world)
if len(set(lengths)) != 1:
raise ValueError("world must be a rectangular matrix")
return np.array(world)
def _get_initial_state(self):
for i in range(self.num_rows):
for j in range(self.num_cols):
entry = self.world[i, j]
if entry == self.INITIAL:
return (i, j)
def visualize(self, matrix) -> None:
pass
class MazeVisualization(Visualization):
def __init__(self, env: MazeEnvironment, algorithm: ReinforcementLearning):
self.env = env
self.algorithm = algorithm
def initialize(self):
self.world = env.new_world()
# mark the object as initialized
self.initialized = True
def write(self, state : Any, content : Any) -> None:
assert self.initialized, "object has not been initialized"
self.world[state] = content
def finalize(self):
return '\n'.join(map(lambda x: ''.join(x), self.world))
def optimal_policy(self):
self.initialize()
counter = 0
MAX_STEPS = 1000
state = self.env.initial_state()
total_steps = 1
while True:
action = self.algorithm.optimal_action(state)
new_state, _, is_final, _ = self.env.step(state, action)
total_steps += 1
self.write(state, '*')
if is_final or counter > MAX_STEPS:
break
state = new_state
counter += 1
#
print("Total number of steps = {}".format(total_steps))
return self.finalize()
env = MazeEnvironment(data)
algorithm = QLearning()
algorithm.set_environment(env)
algorithm.set_learning_rate(ConstantLearningRate(0.25))
algorithm.set_exploration_rate(ConstantExplorationRate(0.25))
algorithm.set_observer(RewardsObserver())
algorithm.initialize()
algorithm.train(10_000, discount=1, params={'PrintFrequency': 2000})
We have 651 states and 4 actions
step 2000 finished
step 4000 finished
step 6000 finished
step 8000 finished
Exploitation = 75.03%, exploration = 24.97%
viz = MazeVisualization(env, algorithm)
print(viz.optimal_policy())
Total number of steps = 113
+--+--+--+--+--+--+--+--+--+--+
*********** | | |
+ +--+--+* + +--+ + +--+--+
| | ****** | | | |
+ + *+--+--+--+ +--+--+--+ +
| | *** |*****| | | |
+--+--+* +* + *+ + + + + +
| ****** |* | *| | | |
+ *+--+--+* + *+--+--+--+--+ +
| *|*****|* | ********* | |
+ *+* + *+* +--+--+--+* +--+ +
| *** | *|* | ********* | |
+ +--+ *+* + *+ +--+--+ +--+
| | *** | *| | |
+ +--+--+--+ *+--+--+--+--+ +
| | | *******|*****| |
+--+ + + +--+--+ *+* + *+ +
| | | | | *|* | *| |
+ +--+ + +--+ + *+* + *+ +
| | | *** | ****O
+--+--+--+--+--+--+--+--+--+--+