Dive into the world of artificial intelligence — build a deep reinforcement learning gym from scratch.
Table of Contents
If you already have a grasp of Reinforcement and Deep Q-Learning concepts, feel free to jump directly to the step-by-step tutorial. There you’ll have all the resources and code necessary to build a Deep Reinforcement Learning gym from the ground up, including the environment, agent, and training protocol.
Intro
Why Reinforcement Learning?
What you will gain
What is Reinforcement Learning?
Deep Q-Learning
Step-by-Step Tutorial
1. Initial Setup
2. The Big Picture
3. The Environment: Initial Foundations
4. Implement The Agent: Neural Architecture and Policy
5. Affect The Environment: Finishing Up
6. Learn From Experiences: Experience Replay
7. Define The Agent’s Learning Process: Fitting The NN
8. Execute The Training Loop: Putting It All Together
9. Wrapping It Up
10. Bonus: Optimize State Representation
Why Reinforcement Learning?
The recent widespread adoption of advanced AI systems, such as ChatGPT, Bard, Midjourney, Stable Diffusion, and many others, has sparked an interest in the field of artificial intelligence, machine learning, and neural networks that is often left unsatisfied because of the technical nature of implementing such systems.
For those looking to begin their journey into AI (or continue the one they are on), building a reinforcement learning gym using Deep Q-Learning is a great start, as it does not require advanced knowledge to implement, can be easily expanded to solve complex problems, and can give immediate, tangible insight into how artificial intelligence becomes “intelligent”.
What you will gain
Assuming you have a basic understanding of Python, by the end of this introduction to deep reinforcement learning, without using high-level reinforcement learning frameworks, you will have developed your own gym to train an agent to solve a simple problem — move itself from its starting point to the goal!
It’s not very glamorous, but you will have hands-on experience with topics like constructing an environment, defining reward structures and basic neural architecture, tweaking environmental parameters to observe different learning behaviors, and finding a balance between exploration and exploitation in decision-making.
You will then have all of the tools you need to implement your own, more complex environments and systems, and be well poised to dive deeper into topics like neural networks and advanced optimization strategies in reinforcement learning.
You will also gain the confidence and understanding needed to effectively utilize pre-built tools like the OpenAI Gym, as each component of the system is implemented from scratch and demystified. This allows you to seamlessly integrate these powerful resources into your own AI projects.
What is Reinforcement Learning?
Reinforcement Learning (RL) is a sub-field of Machine Learning (ML) that is specifically focused on how agents (the entities making decisions) take actions in an environment to complete a goal.
Its implementations include:
- Games
- Autonomous Vehicles
- Robotics
- Finance (algorithmic trading)
- Natural Language Processing
- and much more..
The idea of RL is based on the fundamental principles of behavioral psychology where an animal or person learns from the consequences of their actions. If an action leads to a good outcome, then the agent is rewarded; if it does not, then it is punished or no reward is given.
Before moving on, it is important to understand some commonly used terms:
- Environment: This is the world — the place where the agent operates. It sets the rules, boundaries, and rewards that the agent must navigate.
- Agent: The decision-maker within the environment. The agent takes actions based on its understanding of the state it’s in.
- State: A detailed snapshot of the agent’s current situation in the environment, including relevant metrics or sensory information used for decision-making.
- Action: The specific measure the agent takes to interact with the environment, such as moving, collecting an item, or initiating an interaction.
- Reward: The feedback given from the environment as a result of the agent’s actions, which can be positive, negative, or neutral, guiding the learning process.
- State/Action-Space: The combination of all possible states the agent can encounter and all actions it can take in the environment. This defines the scope of decisions and situations the agent must learn to navigate.
Essentially, in each step (turn) of the program the agent receives a state from the environment, chooses an action, receives a reward or punishment, and the environment is updated or the episode is complete. Information received after each step is saved as an “experience” for later training.
For a more concrete example, imagine you are playing chess. The board is the environment and you are the agent. At each step (or turn) you view the state of the board and choose from the action-space, which is the set of all possible moves you could make, and pick the action with the highest possible future reward. After the move is made you evaluate whether it was a good action or not, and learn to perform better next time.
It may seem like a lot of information at first, but as you build this out yourself these terms will come to feel quite natural.
Deep Q-Learning
Q-Learning is an algorithm used in ML where the ‘Q’ stands for “Quality”, as in the value of actions an agent can take. It works by creating a table of Q-values, actions and their associated quality, that estimate the expected future reward for taking an action in a given state.
The agent is given the state of the environment, checks the table to see if it has encountered it before, and then chooses the action with the highest reward value.
However, Q-Learning has a few drawbacks. Each state and action pair must be explored to achieve good results. If the state and action spaces (the set of all possible states and actions) are too large, then it is not feasible to store them all in a table.
This is where Deep Q-Learning (DQL), an evolution of Q-Learning, comes in. DQL utilizes a deep Neural Network (NN) to approximate a Q-value function rather than saving them in a table. This allows for handling environments that have high-dimensional state-spaces, like image inputs from a camera, which would not be practical for traditional Q-Learning.
The neural network can then generalize over similar states and actions, choosing a desirable move even if it has not been trained on the exact situation, eliminating the need for a large table.
How the neural network does this is beyond the scope of this tutorial. Thankfully, a deep understanding is not needed to implement Deep Q-Learning effectively.
Constructing The Reinforcement Learning Gym
1. Initial Setup
Before we start coding our AI agent, it is recommended that you have a solid understanding of Object Oriented Programming (OOP) principles in Python.
If you do not have Python installed already, below is a simple tutorial by Bhargav Bachina to get you started. The version I will be using is 3.11.6.
How to Install and Getting Started With Python
The only dependency you will need is TensorFlow, an open-source machine learning library by Google that we’ll use to build and train our neural network. This can be installed through pip in the terminal. My version is 2.14.0.
pip install tensorflow
Or if that doesn’t work:
pip3 install tensorflow
You will also need the package NumPy, but this should be included with TensorFlow. If you run into issues there, pip install numpy.
It is also recommended that you create a new file for each class, (e.g., environment.py). This will keep you from being overwhelmed and ease troubleshooting any errors you may run into.
For your reference, here is the GitHub repository with the completed code: https://github.com/HestonCV/rl-gym-from-scratch. Feel free to clone, explore, and use it as a reference point!
2. The Big Picture
To really understand the concepts rather than just copying code, it’s crucial to get a handle on the different parts we’re going to build and how they fit together. This way, each piece will have a place in the bigger picture.
Below is the code for one training loop with 5000 episodes. An episode is essentially one complete round of interaction between the agent and the environment, from start to finish.
This should not be implemented or fully understood at this point. As we build out each part, if you want to see how a specific class or method will be used, refer back to this.
from environment import Environment
from agent import Agent
from experience_replay import ExperienceReplay
import time
if __name__ == '__main__':
grid_size = 5
environment = Environment(grid_size=grid_size, render_on=True)
agent = Agent(grid_size=grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01)
# agent.load(f'models/model_{grid_size}.h5')
experience_replay = ExperienceReplay(capacity=10000, batch_size=32)
# Number of episodes to run before training stops
episodes = 5000
# Max number of steps in each episode
max_steps = 200
for episode in range(episodes):
# Get the initial state of the environment and set done to False
state = environment.reset()
# Loop until the episode finishes
for step in range(max_steps):
print('Episode:', episode)
print('Step:', step)
print('Epsilon:', agent.epsilon)
# Get the action choice from the agents policy
action = agent.get_action(state)
# Take a step in the environment and save the experience
reward, next_state, done = environment.step(action)
experience_replay.add_experience(state, action, reward, next_state, done)
# If the experience replay has enough memory to provide a sample, train the agent
if experience_replay.can_provide_sample():
experiences = experience_replay.sample_batch()
agent.learn(experiences)
# Set the state to the next_state
state = next_state
if done:
break
# time.sleep(0.5)
agent.save(f'models/model_{grid_size}.h5')
Each inner loop is considered one step.
In each step:
- The state is retrieved from the environment.
- The agent chooses an action based on this state.
- Environment is acted on, returning the reward, resulting state after taking the action, and whether the episode is done.
- The initial state, action, reward, next_state, and done are then saved into experience_replay as a sort of long-term memory (experience).
- The agent is then trained on a random sample of these experiences.
At the end of each episode, or however often you would like, the model weights are saved to the models folder. These can later be preloaded to keep from training from scratch each time. The environment is then reset at the start of the next episode.
This basic structure is pretty much all it takes to create an intelligent agent to solve a large variety of problems!
As stated in the introduction, our problem for the agent is quite simple: get from its initial position in a grid to the designated goal position.
3. The Environment: Initial Foundations
The most obvious place to start in developing this system is the environment.
To have a functioning RL gym, the environment needs to do a few things:
- Maintain the current state of the world.
- Keep track of the goal and agent.
- Allow the agent to make changes to the world.
- Return the state in a form the model can understand.
- Render it in a way we can understand to observe the agent.
This will be the place the agent spends its entire life. We will define the environment as a simple square matrix/2D array, or a list of lists in Python.
This environment will have a discrete state-space, meaning that the possible states the agent can encounter are distinct and countable. Each state is a separate, specific condition or scenario in the environment, unlike a continuous state space where the states can vary in an infinite, fluid manner — think of chess versus controlling a car.
DQL is specifically designed for discrete action-spaces (a finite number of actions)— this is what we will be focusing on. Other methods are used for continuous action-spaces.
In the grid, empty space will be represented by 0s, the agent will be represented by a 1, and the goal will be represented by a -1. The size of the environment can be whatever you would like, but as the environment grows larger, the set of all possible states (state-space) grows exponentially. This can slow training time significantly.
The grid will look something like this when rendered:
[0, 1, 0, 0, 0]
[0, 0, 0, 0, 0]
[0, 0, 0, 0, 0]
[0, 0, 0, -1, 0]
[0, 0, 0, 0, 0]
Constructing the Environment class and reset method
We will begin by implementing the Environment class and a way to initialize the environment. For now, it will take an integer, grid_size, but we will expand on this shortly.
import numpy as np
class Environment:
def __init__(self, grid_size):
self.grid_size = grid_size
self.grid = []
def reset(self):
# Initialize the empty grid as a 2d list of 0s
self.grid = np.zeros((self.grid_size, self.grid_size))
When a new instance is created, Environment saves grid_size and initializes an empty grid.
The reset method populates the grid using np.zeros((self.grid_size, self.grid_size)) , which takes a tuple, shape, and outputs a 2D NumPy array of that shape consisting only of zeros.
A NumPy array is a grid-like data structure that behaves similar to a list in Python, except that it enables us to efficiently store and manipulate numerical data. It allows for vectorized operations, meaning that operations are automatically applied to all elements in the array without the need for explicit loops.
This makes computations on large datasets much faster and more efficient compared to standard Python lists. Not only that, but it is the data structure that our agent’s neural network architecture will expect!
Why the name reset? Well, this method will be called to reset the environment and will eventually return the initial state of the grid.
Adding the agent and goal
Next, we will construct the methods for adding the agent and the goal to the grid.
import random
def add_agent(self):
# Choose a random location
location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))
# Agent is represented by a 1
self.grid[location[0]][location[1]] = 1
return location
def add_goal(self):
# Choose a random location
location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))
# Get a random location until it is not occupied
while self.grid[location[0]][location[1]] == 1:
location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))
# Goal is represented by a -1
self.grid[location[0]][location[1]] = -1
return location
The locations for the agent and the goal will be represented by a tuple (x, y). Both methods select random values within the boundaries of the grid and return the location. The main difference is that add_goal ensures it does not select a location already occupied by the agent.
We place the agent and goal at random starting locations to introduce variability in each episode, which helps the agent learn to navigate the environment from different starting points, rather than memorizing one route.
Finally, we will add a method to render the world in the console to enable us to see the interactions between the agent and environment.
def render(self):
# Convert to a list of ints to improve formatting
grid = self.grid.astype(int).tolist()
for row in grid:
print(row)
print('') # To add some space between renders for each step
render does three things: casts the elements of self.grid to type int, converts it into a Python list, and prints each row.
The only reason we don’t print each row from the NumPy array directly is simply that it just doesn’t look as nice.
Tying it all together..
import numpy as np
import random
class Environment:
def __init__(self, grid_size):
self.grid_size = grid_size
self.grid = []
def reset(self):
# Initialize the empty grid as a 2d array of 0s
self.grid = np.zeros((self.grid_size, self.grid_size))
def add_agent(self):
# Choose a random location
location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))
# Agent is represented by a 1
self.grid[location[0]][location[1]] = 1
return location
def add_goal(self):
# Choose a random location
location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))
# Get a random location until it is not occupied
while self.grid[location[0]][location[1]] == 1:
location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))
# Goal is represented by a -1
self.grid[location[0]][location[1]] = -1
return location
def render(self):
# Convert to a list of ints to improve formatting
grid = self.grid.astype(int).tolist()
for row in grid:
print(row)
print('') # To add some space between renders for each step
# Test Environment
env = Environment(5)
env.reset()
agent_location = env.add_agent()
goal_location = env.add_goal()
env.render()
print(f'Agent Location: {agent_location}')
print(f'Goal Location: {goal_location}')
>>>
[0, 0, 0, 0, 0]
[0, 0, -1, 0, 0]
[0, 0, 0, 0, 0]
[0, 0, 0, 1, 0]
[0, 0, 0, 0, 0]
Agent Location: (3, 3)
Goal Location: (1, 2)
When looking at the locations it may seem there was some error, but they should be read as (row, column) from the top left to the bottom right. Also, remember that the coordinates are zero indexed.
Okay, so the environment is defined. What next?
Expanding on reset
Let’s edit the reset method to handle placing the agent and goal for us. While we are at it, let’s automate render as well.
class Environment:
def __init__(self, grid_size, render_on=False):
self.grid_size = grid_size
self.grid = []
# Make sure to add the new attributes
self.render_on = render_on
self.agent_location = None
self.goal_location = None
def reset(self):
# Initialize the empty grid as a 2d array of 0s
self.grid = np.zeros((self.grid_size, self.grid_size))
# Add the agent and the goal to the grid
self.agent_location = self.add_agent()
self.goal_location = self.add_goal()
if self.render_on:
self.render()
Now, when reset is called, the agent and goal are added to the grid, their initial locations are saved, and if render_on is set to true it will render the grid.
...
# Test Environment
env = Environment(5, render_on=True)
env.reset()
# Now to access agent and goal location you can use Environment's attributes
print(f'Agent Location: {env.agent_location}')
print(f'Goal Location: {env.goal_location}')
>>>
[0, 0, 0, 0, 0]
[0, 0, 0, 0, 0]
[0, 0, 0, 0, 0]
[0, 0, 0, 0, -1]
[1, 0, 0, 0, 0]
Agent Location: (4, 0)
Goal Location: (3, 4)
Defining the state of the environment
The last method we will implement for now is get_state. At first glance it seems the state might simply be the grid itself, but the problem with this approach is it is not what the neural network will expect.
Neural networks typically need one-dimensional input, not the two-dimensional shape that grid currently is represented by. We can fix this by flattening the grid using NumPy’s built-in flatten method. This will place each row into the same array.
def get_state(self):
# Flatten the grid from 2d to 1d
state = self.grid.flatten()
return state
This will transform:
[0, 0, 0, 0, 0]
[0, 0, 0, 1, 0]
[0, 0, 0, 0, 0]
[0, 0, 0, 0, -1]
[0, 0, 0, 0, 0]
Into:
[0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0, 0, 0]
As you can see, it’s not immediately obvious which cells are which, but this will be no problem for a deep neural network.
Now we can update reset to return the state right after grid is populated. Nothing else will change.
def reset(self):
...
# Return the initial state of the grid
return self.get_state()
Full code up to this point..
import random
class Environment:
def __init__(self, grid_size, render_on=False):
self.grid_size = grid_size
self.grid = []
self.render_on = render_on
self.agent_location = None
self.goal_location = None
def reset(self):
# Initialize the empty grid as a 2d array of 0s
self.grid = np.zeros((self.grid_size, self.grid_size))
# Add the agent and the goal to the grid
self.agent_location = self.add_agent()
self.goal_location = self.add_goal()
if self.render_on:
self.render()
# Return the initial state of the grid
return self.get_state()
def add_agent(self):
# Choose a random location
location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))
# Agent is represented by a 1
self.grid[location[0]][location[1]] = 1
return location
def add_goal(self):
# Choose a random location
location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))
# Get a random location until it is not occupied
while self.grid[location[0]][location[1]] == 1:
location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))
# Goal is represented by a -1
self.grid[location[0]][location[1]] = -1
return location
def render(self):
# Convert to a list of ints to improve formatting
grid = self.grid.astype(int).tolist()
for row in grid:
print(row)
print('') # To add some space between renders for each step
def get_state(self):
# Flatten the grid from 2d to 1d
state = self.grid.flatten()
return state
You have now successfully implemented the foundation for the environment! Although, if you haven’t noticed, we can’t interact with it yet. The agent is stuck in place.
We will return to this problem later after the Agent class has been coded to provide better context.
4. Implement The Agent Neural Architecture and Policy
As stated previously, the agent is the entity that is given the state of its environment, in this case a flattened version of the world grid, and makes a decision on what action to take from the action-space.
Just to reiterate, the action-space is the set of all possible actions, in this scenario the agent can move up, down, left, and right, so the size of the action space is 4.
The state-space is the set of all possible states. This can be a massive number depending on the environment and perspective of the agent. In our case, if the world is a 5×5 grid there are 600 possible states, but if the world is a 25×25 grid there are 390,000, wildly increasing the training time.
For an agent to effectively learn to complete a goal it needs a few things:
- Neural network to approximate the Q-values (estimated total amount of future reward for an action) in the case of DQL.
- Policy or a strategy that the agent follows to choose an action.
- Reward signals from the environment to tell an agent how well it is doing.
- Ability to train on past experiences.
There are two different policies one can implement:
- Greedy Policy: Choose the action with the highest Q-value in the current state.
- Epsilon-Greedy Policy: Choose the action with the highest Q-value in the current state, but there is a small chance, epsilon (commonly denoted as ϵ), to choose a random action. If epsilon = 0.02 then there is a 2% chance that the action will be random.
What we will implement is the Epsilon-Greedy Policy.
Why would random actions help the agent learn? Exploration.
When the agent begins, it may learn a suboptimal path to the goal and continue to make this choice without ever changing or learning a new route.
Beginning with a large epsilon value and slowly decreasing it allows the agent to thoroughly explore the environment as it updates its Q-values before exploiting the learned strategies. The amount we decrease epsilon by over time is called epsilon decay, which will make more sense soon.
Like we did with the environment, we will represent the agent with a class.
Now, before we implement the policy, we need a way to get Q-values. This is where our agent’s brain — or neural network — comes in.
The neural network
Without getting too off track here, a neural network is simply a massive function. The values go in, get passed to each layer and transformed, and some different values pop out at the end. Nothing more than that. The magic comes in when training begins.
The idea is to give the NN large amounts of labeled data like, “here is an input, and here is what you should output”. It slowly adjusts the values between neurons with each training step, attempting to get as close as possible to the given outputs, finding patterns within the data, and hopefully helping us predict for inputs the network has never seen.
The Agent class and defining the neural architecture
For now we will define the neural architecture using TensorFlow and focus on the “forward pass” of the data.
from tensorflow.keras.layers import Dense
from tensorflow.keras.models import Sequential
class Agent:
def __init__(self, grid_size):
self.grid_size = grid_size
self.model = self.build_model()
def build_model(self):
# Create a sequential model with 3 layers
model = Sequential([
# Input layer expects a flattened grid, hence the input shape is grid_size squared
Dense(128, activation='relu', input_shape=(self.grid_size**2,)),
Dense(64, activation='relu'),
# Output layer with 4 units for the possible actions (up, down, left, right)
Dense(4, activation='linear')
])
model.compile(optimizer='adam', loss='mse')
return model
Again, if you are unfamiliar with neural networks, don’t get too caught up on this section. While we use activations like ‘relu’ and ‘linear’ in our model, a detailed exploration of activation functions is beyond the scope of this article.
All you really need to know is the model takes in state as input, the values are transformed at each layer in the model, and the four Q-values corresponding to each action are output.
In building our agent’s neural network, we start with an input layer that processes the state of the grid, represented as a one-dimensional array of size grid_size². This is because we’ve flattened the grid to simplify the input. This layer is our input itself and does not need to be defined in our architecture because it takes no input.
Next, we have two hidden layers. These are values we don’t see, but as our model learns, they are important for getting a closer approximation of the Q-value function:
- The first hidden layer has 128 neurons, Dense(128, activation=’relu’), and takes the flattened grid as its input.
- The second hidden layer consists of 64 neurons, Dense(64, activation=’relu’), and further processes the information.
Finally, the output layer, Dense(4, activation=’linear’), comprises 4 neurons, corresponding to the four possible actions (up, down, left, right). This layer outputs the Q-values — estimates for the future reward of each action.
Typically the more complex problems you have to solve, the more hidden layers and neurons you will need. Two hidden layers should be plenty for our simple use-case.
Neurons and layers can and should be experimented with to find a balance between speed and results — each adding to the network’s ability to capture and learn from the nuances of the data. Like the state-space, the larger the neural network, the slower training will be.
Greedy Policy
Using this neural network, we are now able to get a Q-value prediction, albeit not a very good one yet, and make a decision.
import numpy as np
def get_action(self, state):
# Add an extra dimension to the state to create a batch with one instance
state = np.expand_dims(state, axis=0)
# Use the model to predict the Q-values (action values) for the given state
q_values = self.model.predict(state, verbose=0)
# Select and return the action with the highest Q-value
action = np.argmax(q_values[0]) # Take the action from the first (and only) entry
return action
The TensorFlow neural network architecture requires input, the state, to be in batches. This is very useful for when you have a large number of inputs and you want a full batch of outputs, but it can be a little confusing when you only have one input to predict for.
state = np.expand_dims(state, axis=0)
We can fix this by using NumPy’s expand_dims method, specifying axis=0. What this does is simply make it a batch of one input. For example the state of a grid of size 5×5:
[0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0, 0, 0]
Becomes:
[[0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0, 0, 0]]
When training the model you will typically use batches of size 32 or more. It will look something like this:
[[0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, -1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
...
[0, 0, 0, 0, 0, 0, 0, 0, 1, -1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]]
Now that we have prepared the input for the model in the correct format, we can predict the Q-values for each action and choose the highest one.
...
# Use the model to predict the Q-values (action values) for the given state
q_values = self.model.predict(state, verbose=0)
# Select and return the action with the highest Q-value
action = np.argmax(q_values[0]) # Take the action from the first (and only) entry
...
We simply give the model the state and it outputs a batch of predictions. Remember, because we are feeding the network a batch of one, it will return a batch of one. Additionally, verbose=0 ensures that the console remains clear of routine debug messages every time the predict function is called.
Finally, we choose and return the index of the action with the highest value using np.argmax on the first and only entry in the batch.
In our case, the indices 0, 1, 2, and 3 will be mapped to up, down, left, and right respectively.
The Greedy-Policy always picks the action that has the highest reward according to the current Q-values, which may not always lead to the best long-term outcomes.
Epsilon-Greedy Policy
We have implemented the Greedy-Policy, but what we want to have is the Epsilon-Greedy policy. This introduces randomness into the agent’s choice to allow for exploration of the state-space.
Just to recap, epsilon is the probability that a random action will be chosen. We also want some way to decrease this over time as the agent learns, allowing exploitation of its learned policy. As briefly mentioned before, this is called epsilon decay.
The epsilon decay value should be set to a decimal number less than 1, which is used to progressively reduce the epsilon value after each step the agent takes.
Typically epsilon will start at 1, and epsilon decay will be some value very close to 1, like 0.998. After each step in the training process you multiply epsilon by the epsilon decay.
To illustrate this, below is how epsilon will change over the training process.
Initialize Values:
epsilon = 1
epsilon_decay = 0.998
-----------------
Step 1:
epsilon = 1
epsilon = 1 * 0.998 = 0.998
-----------------
Step 2:
epsilon = 0.998
epsilon = 0.998 * 0.998 = 0.996
-----------------
Step 3:
epsilon = 0.996
epsilon = 0.996 * 0.998 = 0.994
-----------------
Step 4:
epsilon = 0.994
epsilon = 0.994 * 0.998 = 0.992
-----------------
...
-----------------
Step 1000:
epsilon = 1 * (0.998)^1000 = 0.135
-----------------
...and so on
As you can see epsilon slowly approaches zero with each step. By step 1000, there is a 13.5% chance that a random action will be chosen. Epsilon decay is a value that will need to be tweaked based on the state-space. With a large state-space, more exploration may be necessary, or a higher epsilon decay.
Even when the agent is trained well, it is beneficial to keep a small epsilon value. We should define a stopping point where epsilon does not get any lower, epsilon end. This can be 0.1, 0.01, or even 0.001 depending on the use-case and complexity of the task.
In the figure above, you’ll notice epsilon stops decreasing at 0.1, the pre-defined epsilon end.
Let’s update our Agent class to incorporate epsilon.
import numpy as np
class Agent:
def __init__(self, grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01):
self.grid_size = grid_size
self.epsilon = epsilon
self.epsilon_decay = epsilon_decay
self.epsilon_end = epsilon_end
...
...
def get_action(self, state):
# rand() returns a random value between 0 and 1
if np.random.rand() <= self.epsilon:
# Exploration: random action
action = np.random.randint(0, 4)
else:
# Add an extra dimension to the state to create a batch with one instance
state = np.expand_dims(state, axis=0)
# Use the model to predict the Q-values (action values) for the given state
q_values = self.model.predict(state, verbose=0)
# Select and return the action with the highest Q-value
action = np.argmax(q_values[0]) # Take the action from the first (and only) entry
# Decay the epsilon value to reduce the exploration over time
if self.epsilon > self.epsilon_end:
self.epsilon *= self.epsilon_decay
return action
We’ve given epsilon, epsilon_decay, and epsilon_end default values of 1, 0.998, and 0.01, respectively.
Remember epsilon, and its associated values, are hyper-parameters, parameters used to control the learning process. They can and should be experimented with to achieve the best result.
The method, get_action, has been updated to incorporate epsilon. If the random value given by np.random.rand is less than or equal to epsilon, a random action is chosen. Otherwise, the process is the same as before.
Finally, if epsilon has not reached epsilon_end, we update it by multiplying by epsilon_decay like so — self.epsilon *= self.epsilon_decay.
Agent up to this point:
from tensorflow.keras.layers import Dense
from tensorflow.keras.models import Sequential
import numpy as np
class Agent:
def __init__(self, grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01):
self.grid_size = grid_size
self.epsilon = epsilon
self.epsilon_decay = epsilon_decay
self.epsilon_end = epsilon_end
self.model = self.build_model()
def build_model(self):
# Create a sequential model with 3 layers
model = Sequential([
# Input layer expects a flattened grid, hence the input shape is grid_size squared
Dense(128, activation='relu', input_shape=(self.grid_size**2,)),
Dense(64, activation='relu'),
# Output layer with 4 units for the possible actions (up, down, left, right)
Dense(4, activation='linear')
])
model.compile(optimizer='adam', loss='mse')
return model
def get_action(self, state):
# rand() returns a random value between 0 and 1
if np.random.rand() <= self.epsilon:
# Exploration: random action
action = np.random.randint(0, 4)
else:
# Add an extra dimension to the state to create a batch with one instance
state = np.expand_dims(state, axis=0)
# Use the model to predict the Q-values (action values) for the given state
q_values = self.model.predict(state, verbose=0)
# Select and return the action with the highest Q-value
action = np.argmax(q_values[0]) # Take the action from the first (and only) entry
# Decay the epsilon value to reduce the exploration over time
if self.epsilon > self.epsilon_end:
self.epsilon *= self.epsilon_decay
return action
We have effectively implemented the Epsilon-Greedy Policy, and we are almost ready to enable the agent to learn!
5. Affect The Environment: Finishing Up
Environment currently has methods for reseting the grid, adding the agent and goal, providing the current state, and printing the grid to the console.
For the environment to be complete we need to be able to not only allow the agent to affect it, but also provide feedback in the form of rewards.
Defining the reward structure
Coming up with a good reward structure is the main challenge of reinforcement learning. Your problem could be perfectly within the capabilities of the model, but if the reward structure is not set up correctly it may never learn.
The goal of the rewards is to encourage specific behavior. In our case we want to guide the agent towards the goal cell, defined by -1.
Similar to the layers and neurons in the network, and epsilon and its associated values, there can be many right (and many wrong) ways to define the reward structure.
The two main types of reward structures:
- Sparse: When rewards are only given in a handful of states.
- Dense: When rewards are common throughout the state-space.
With sparse rewards the agent has very little feedback to lead it. This would be like simply giving a set penalty for each step, and if the agent reaches the goal you provide one large reward.
The agent can certainly learn to reach the goal, but depending on the size of the state-space it can take much longer and may get stuck on a suboptimal strategy.
This is in contrast with dense reward structures, which allow the agent to train quicker and behave more predictably.
Dense reward structures either
- have more than one goal.
- give hints throughout an episode.
The agent then has more opportunities to learn desired behavior.
For instance, pretend you’re training an agent to use a body to walk, and the only reward you give it is if it reaches a goal. The agent may learn to get there by simply inching or rolling along the ground, or not even learn at all.
Instead, if you reward the agent for heading towards the goal, staying on its feet, putting one foot in front of the other, and standing up straight, you will get a much more natural and interesting gait while also improving learning.
Allowing the agent to impact the environment
To even have rewards, you must allow the agent to interact with its world. Let’s revisit the Environment class to define this interaction.
...
def move_agent(self, action):
# Map agent action to the correct movement
moves = {
0: (-1, 0), # Up
1: (1, 0), # Down
2: (0, -1), # Left
3: (0, 1) # Right
}
previous_location = self.agent_location
# Determine the new location after applying the action
move = moves[action]
new_location = (previous_location[0] + move[0], previous_location[1] + move[1])
# Check for a valid move
if self.is_valid_location(new_location):
# Remove agent from old location
self.grid[previous_location[0]][previous_location[1]] = 0
# Add agent to new location
self.grid[new_location[0]][new_location[1]] = 1
# Update agent's location
self.agent_location = new_location
def is_valid_location(self, location):
# Check if the location is within the boundaries of the grid
if (0 <= location[0] < self.grid_size) and (0 <= location[1] < self.grid_size):
return True
else:
return False
The above code first defines the change in coordinates associated with each action value. If the action 0 is chosen, then the coordinates change by (-1, 0).
Remember, in this scenario the coordinates are interpreted as (row, column). If row lowers by one, the agent moves up one cell, and if column lowers by one, the agent moves left one cell.
It then calculates the new location based on the move. If the new location is valid, agent_location is updated. Otherwise, the agent_location is left the same.
Also, is_valid_location simply checks if the new location is within the grid boundaries.
That is fairly straight forward, but what are we missing? Feedback!
Providing feedback
The environment needs to provide an appropriate reward and whether the episode is complete or not.
Let’s incorporate the done flag first to indicate that an episode is finished.
...
def move_agent(self, action):
...
done = False # The episode is not done by default
# Check for a valid move
if self.is_valid_location(new_location):
# Remove agent from old location
self.grid[previous_location[0]][previous_location[1]] = 0
# Add agent to new location
self.grid[new_location[0]][new_location[1]] = 1
# Update agent's location
self.agent_location = new_location
# Check if the new location is the reward location
if self.agent_location == self.goal_location:
# Episode is complete
done = True
return done
...
We’ve set done to false by default. If the new agent_location is the same as goal_location then done is set to true. Finally, we return this value.
We are ready for our reward structure. First, I will show the implementation for the sparse reward structure. This would be satisfactory for a grid of around 5×5, but we will update it to allow for a larger environment.
Sparse rewards
Implementing sparse rewards is quite simple. We primarily need to give a reward for landing on the goal.
Let’s also give a small negative reward for each step that doesn’t land on the goal and a larger one for hitting the boundary. This will encourage our agent to prioritize the shortest path.
...
def move_agent(self, action):
...
done = False # The episode is not done by default
reward = 0 # Initialize reward
# Check for a valid move
if self.is_valid_location(new_location):
# Remove agent from old location
self.grid[previous_location[0]][previous_location[1]] = 0
# Add agent to new location
self.grid[new_location[0]][new_location[1]] = 1
# Update agent's location
self.agent_location = new_location
# Check if the new location is the reward location
if self.agent_location == self.goal_location:
# Reward for getting the goal
reward = 100
# Episode is complete
done = True
else:
# Small punishment for valid move that did not get the goal
reward = -1
else:
# Slightly larger punishment for an invalid move
reward = -3
return reward, done
...
Make sure to initialize reward so that it can be accessed after the if blocks. Also, check carefully for each case: valid move and achieved goal, valid move and did not achieve goal, and invalid move.
Dense rewards
Putting our dense reward system into practice is still quite simple, it just involves providing feedback more often.
What would be a good way to reward the agent to move towards the goal more incrementally?
The first way is to return the negative of the Manhattan distance. The Manhattan distance is the distance in the row direction, plus the distance in the column direction, rather than as the crow flies. Here is what that looks like in code:
reward = -(np.abs(self.goal_location[0] - new_location[0]) +
np.abs(self.goal_location[1] - new_location[1]))
So, the number of steps in the row direction plus the number of steps in the column direction, negated.
The other way we can do this is provide a reward based on the direction the agent moves: if it moves away from the goal provide a negative reward and if it moves toward it provide a positive reward.
We can calculate this by subtracting the new Manhattan distance from the previous Manhattan distance. It will either be 1 or -1 because the agent can only move one cell per step.
In our case it would make most sense to choose the second option. This should provide better results because it gives immediate feedback based on that step rather than a more general reward.
The code for this option:
...
def move_agent(self, action):
...
if self.agent_location == self.goal_location:
...
else:
# Calculate the distance before the move
previous_distance = np.abs(self.goal_location[0] - previous_location[0]) +
np.abs(self.goal_location[1] - previous_location[1])
# Calculate the distance after the move
new_distance = np.abs(self.goal_location[0] - new_location[0]) +
np.abs(self.goal_location[1] - new_location[1])
# If new_location is closer to the goal, reward = 1, if further, reward = -1
reward = (previous_distance - new_distance)
...
As you can see, if the agent did not get the goal, we calculate previous_distance, new_distance, and then define reward as the difference of these.
Depending on the performance it may be appropriate to scale it, or any reward in the system. You can do this by simply multiplying by a number (e.g., 0.01, 2, 100) if it needs to be higher. Their proportions need to effectively guide the agent to the goal. For instance, a reward of 1 for moving closer to the goal and a reward of 0.1 for the goal itself would not make much sense.
Rewards are proportional. If you scale each positive and negative reward by the same factor it should not generally effect training, aside from very large or very small values.
In summary, if the agent is 10 steps away from the goal, and it moves to a space 11 steps away, then reward will be -1.
Here is the updated move_agent.
def move_agent(self, action):
# Map agent action to the correct movement
moves = {
0: (-1, 0), # Up
1: (1, 0), # Down
2: (0, -1), # Left
3: (0, 1) # Right
}
previous_location = self.agent_location
# Determine the new location after applying the action
move = moves[action]
new_location = (previous_location[0] + move[0], previous_location[1] + move[1])
done = False # The episode is not done by default
reward = 0 # Initialize reward
# Check for a valid move
if self.is_valid_location(new_location):
# Remove agent from old location
self.grid[previous_location[0]][previous_location[1]] = 0
# Add agent to new location
self.grid[new_location[0]][new_location[1]] = 1
# Update agent's location
self.agent_location = new_location
# Check if the new location is the reward location
if self.agent_location == self.goal_location:
# Reward for getting the goal
reward = 100
# Episode is complete
done = True
else:
# Calculate the distance before the move
previous_distance = np.abs(self.goal_location[0] - previous_location[0]) +
np.abs(self.goal_location[1] - previous_location[1])
# Calculate the distance after the move
new_distance = np.abs(self.goal_location[0] - new_location[0]) +
np.abs(self.goal_location[1] - new_location[1])
# If new_location is closer to the goal, reward = 1, if further, reward = -1
reward = (previous_distance - new_distance)
else:
# Slightly larger punishment for an invalid move
reward = -3
return reward, done
The reward for achieving the goal and attempting an invalid move should remain the same with this structure.
Step penalty
There is just one thing we are missing.
The agent is currently not penalized for how long it takes to reach the goal. Our implemented reward structure has many net neutral loops. It could go back and forth between two locations forever, and accumulate no penalty. We can fix this by subtracting a small value each step, causing the penalty of moving away to be greater than the reward for moving closer. This illustration should make it much clearer.
Imagine the agent is starting at the left most node and must make a decision. Without a step penalty, it could choose to go forward, then back as many times as it wants and its total reward would be 1 before finally moving to the goal.
So mathematically, looping 1000 times and then moving to the goal is just as valid as moving straight there.
Try to imagine looping in either case and see how penalty is accumulated (or not accumulated).
Let’s implement this.
...
# If new_location is closer to the goal, reward = 0.9, if further, reward = -1.1
reward = (previous_distance - new_distance) - 0.1
...
That’s it. The agent should now be incentivized to take the shortest path, preventing looping behavior.
Okay, but what is the point?
At this point you may be thinking it is a waste of time to define a reward system and train an agent for a task that could be completed with much simpler algorithms.
And you would be correct.
The reason we are doing this is to learn how to think about guiding your agent to its goal. In this case it may seem trivial, but what if the agent’s environment included items to pick up, enemies to battle, obstacles to go through, and more?
Or a robot in the real world with dozens of sensors and motors that it needs to coordinate in sequence to navigate complex and varied environments?
Designing a system to do these things using traditional programming would be quite difficult and most certainly would not behave near as organic or general as using RL and a good reward structure to encourage an agent to learn optimal strategies.
Reinforcement learning is most useful in applications where defining the exact sequence of steps required to complete the task is difficult or impossible due to the complexity and variability of the environment. The only thing you need for RL to work is to be able to define what is useful behavior and what behavior should be discouraged.
The final Environment method — step.
With the each component of Environment in place we can now define the heart of the interaction between the agent and the environment.
Thankfully, it is quite simple.
def step(self, action):
# Apply the action to the environment, record the observations
reward, done = self.move_agent(action)
next_state = self.get_state()
# Render the grid at each step
if self.render_on:
self.render()
return reward, next_state, done
step first moves the agent in the environment and records reward and done. Then it gets the state immediately following this interaction, next_state. Then if render_on is set to true the grid is rendered.
Finally, step returns the recorded values, reward, next_state and done.
These will be essential to building the experiences our agent will learn from.
Congratulations! You have officially completed the construction of the environment for your DRL gym.
Below is the completed Environment class.
import random
import numpy as np
class Environment:
def __init__(self, grid_size, render_on=False):
self.grid_size = grid_size
self.render_on = render_on
self.grid = []
self.agent_location = None
self.goal_location = None
def reset(self):
# Initialize the empty grid as a 2d array of 0s
self.grid = np.zeros((self.grid_size, self.grid_size))
# Add the agent and the goal to the grid
self.agent_location = self.add_agent()
self.goal_location = self.add_goal()
# Render the initial grid
if self.render_on:
self.render()
# Return the initial state
return self.get_state()
def add_agent(self):
# Choose a random location
location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))
# Agent is represented by a 1
self.grid[location[0]][location[1]] = 1
return location
def add_goal(self):
# Choose a random location
location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))
# Get a random location until it is not occupied
while self.grid[location[0]][location[1]] == 1:
location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))
# Goal is represented by a -1
self.grid[location[0]][location[1]] = -1
return location
def move_agent(self, action):
# Map agent action to the correct movement
moves = {
0: (-1, 0), # Up
1: (1, 0), # Down
2: (0, -1), # Left
3: (0, 1) # Right
}
previous_location = self.agent_location
# Determine the new location after applying the action
move = moves[action]
new_location = (previous_location[0] + move[0], previous_location[1] + move[1])
done = False # The episode is not done by default
reward = 0 # Initialize reward
# Check for a valid move
if self.is_valid_location(new_location):
# Remove agent from old location
self.grid[previous_location[0]][previous_location[1]] = 0
# Add agent to new location
self.grid[new_location[0]][new_location[1]] = 1
# Update agent's location
self.agent_location = new_location
# Check if the new location is the reward location
if self.agent_location == self.goal_location:
# Reward for getting the goal
reward = 100
# Episode is complete
done = True
else:
# Calculate the distance before the move
previous_distance = np.abs(self.goal_location[0] - previous_location[0]) +
np.abs(self.goal_location[1] - previous_location[1])
# Calculate the distance after the move
new_distance = np.abs(self.goal_location[0] - new_location[0]) +
np.abs(self.goal_location[1] - new_location[1])
# If new_location is closer to the goal, reward = 0.9, if further, reward = -1.1
reward = (previous_distance - new_distance) - 0.1
else:
# Slightly larger punishment for an invalid move
reward = -3
return reward, done
def is_valid_location(self, location):
# Check if the location is within the boundaries of the grid
if (0 <= location[0] < self.grid_size) and (0 <= location[1] < self.grid_size):
return True
else:
return False
def get_state(self):
# Flatten the grid from 2d to 1d
state = self.grid.flatten()
return state
def render(self):
# Convert to a list of ints to improve formatting
grid = self.grid.astype(int).tolist()
for row in grid:
print(row)
print('') # To add some space between renders for each step
def step(self, action):
# Apply the action to the environment, record the observations
reward, done = self.move_agent(action)
next_state = self.get_state()
# Render the grid at each step
if self.render_on:
self.render()
return reward, next_state, done
We have gone through a lot at this point. It may be beneficial to return to the big picture at the beginning and reevaluate how each part interacts using your new knowledge before moving on.
6. Learn From Experiences: Experience Replay
The agent’s model and policy, along with the environment’s reward structure and mechanism for taking steps have all been completed, but we need some way to remember the past so that the agent can learn from it.
This can be done by saving the experiences.
Each experience consists of a few things:
- State: The state before an action is taken.
- Action: What action was taken in this state.
- Reward: Positive or negative feedback the agent received from the environment based on its action.
- Next State: The state immediately following the action, allowing the agent to act, not just based on the consequences of the current state, but many states in advance.
- Done: Indicates the end of an experience, letting the agent know if the task has been completed or not. It can be either true or false at each step.
These terms should not be new to you, but it never hurts to see them again!
Each experience is associated with exactly one step from the agent. This will provide all of the context needed to train it.
The ExperienceReplay class
To keep track of and serve these experiences when needed, we will define one last class, ExperienceReplay.
from collections import deque, namedtuple
class ExperienceReplay:
def __init__(self, capacity, batch_size):
# Memory stores the experiences in a deque, so if capacity is exceeded it removes
# the oldest item efficiently
self.memory = deque(maxlen=capacity)
# Batch size specifices the amount of experiences that will be sampled at once
self.batch_size = batch_size
# Experience is a namedtuple that stores the relevant information for training
self.Experience = namedtuple('Experience', ['state', 'action', 'reward', 'next_state', 'done'])
This class will take capacity, an integer value that defines the maximum number of experiences we will save at a time, and batch_size, an integer value that determines how many experiences we sample at a time for training.
Batching the experiences
If you remember, the neural network in the Agent class takes batches of input. While we only used a batch of size one to predict, this would be incredibly inefficient for training. Typically, batches of size 32 or higher are more common.
Batching the input for training does two things:
- Increases efficiency because it allows for parallel processing of multiple data points, reducing computational overhead and making better use of GPU or CPU resources.
- Helps the model learn more consistently, as it’s learning from a variety of examples at once, which can make it better at handling new, unseen data.
Memory
The memory will be a deque (short for double-ended queue). This allows us to add new experiences to the front, and as the max length defined by capacity is reached, the deque will remove them without having to shift each element as you would with a Python list. This can greatly improve speed when capacity is set to 10,000 or more.
Experience
Each experience will be defined as a namedtuple. Although, many other data structures would work, this will improve readability as we extract each part as needed in training.
add_experience and sample_batch implementation
Adding a new experience and sampling a batch are rather straightforward.
import random
def add_experience(self, state, action, reward, next_state, done):
# Create a new experience and store it in memory
experience = self.Experience(state, action, reward, next_state, done)
self.memory.append(experience)
def sample_batch(self):
# Batch will be a random sample of experiences from memory of size batch_size
batch = random.sample(self.memory, self.batch_size)
return batch
The method add_experience creates a namedtuple with each part of an experience, state, action, reward, next_state, and done, and appends it to memory.
sample_batch is just as simple. It gets and returns a random sample from memory of size batch_size.
The last method needed — can_provide_sample
Finally, it would be useful to be able to check if memory contains enough experiences to provide us with a full sample before attempting to get a batch for training.
def can_provide_sample(self):
# Determines if the length of memory has exceeded batch_size
return len(self.memory) >= self.batch_size
Completed ExperienceReplay class…
import random
from collections import deque, namedtuple
class ExperienceReplay:
def __init__(self, capacity, batch_size):
# Memory stores the experiences in a deque, so if capacity is exceeded it removes
# the oldest item efficiently
self.memory = deque(maxlen=capacity)
# Batch size specifices the amount of experiences that will be sampled at once
self.batch_size = batch_size
# Experience is a namedtuple that stores the relevant information for training
self.Experience = namedtuple('Experience', ['state', 'action', 'reward', 'next_state', 'done'])
def add_experience(self, state, action, reward, next_state, done):
# Create a new experience and store it in memory
experience = self.Experience(state, action, reward, next_state, done)
self.memory.append(experience)
def sample_batch(self):
# Batch will be a random sample of experiences from memory of size batch_size
batch = random.sample(self.memory, self.batch_size)
return batch
def can_provide_sample(self):
# Determines if the length of memory has exceeded batch_size
return len(self.memory) >= self.batch_size
With the mechanism for saving each experience and sampling from them in place, we can return to the Agent class to finally enable learning.
7. Define The Agent’s Learning Process: Fitting The NN
The goal, when training the neural network, is to get the Q-values it produces to accurately represent the future reward each choice will provide.
Essentially, we want the network to learn to predict how valuable each decision is, considering not just the immediate reward, but also the rewards it could lead to in the future.
Incorporating future rewards
To achieve this, we incorporate the Q-values of the subsequent state into the training process.
When the agent takes an action and moves to a new state, we look at the Q-values in this new state to help inform the value of the previous action. In other words, the potential future rewards influence the perceived value of the current choices.
The learn method
import numpy as np
def learn(self, experiences):
states = np.array([experience.state for experience in experiences])
actions = np.array([experience.action for experience in experiences])
rewards = np.array([experience.reward for experience in experiences])
next_states = np.array([experience.next_state for experience in experiences])
dones = np.array([experience.done for experience in experiences])
# Predict the Q-values (action values) for the given state batch
current_q_values = self.model.predict(states, verbose=0)
# Predict the Q-values for the next_state batch
next_q_values = self.model.predict(next_states, verbose=0)
...
Using the provided batch, experiences, we will extract each part using list comprehension and the namedtuple values we defined earlier in ExperienceReplay. Then we convert each one into a NumPy array to improve efficiency and to align with what the model expects, as explained previously.
Finally, we use the model to predict the Q-values of the current state the action was taken in and the state immediately following it.
Before continuing with the learn method, I need to explain something called the discount factor.
Discounting future rewards — the role of gamma
Intuitively, we know that immediate rewards are generally prioritized when all else is equal. (Would you like your paycheck today or next week?)
Representing this mathematically can seem much less intuitive. When considering the future, we don’t want it to be equally important (weighted) as the present. By how much we discount the future, or lower its effect on each decision, is defined by gamma (commonly denoted by the greek letter γ).
Gamma can be adjusted, with higher values encouraging planning and lower values encouraging more short sighted behavior. We will use a default value of 0.99.
The discount factor will pretty much always be between 0 and 1. A discount factor greater than 1, prioritizing the future over the present, would introduce unstable behavior and has little to no practical applications.
Implementing gamma and defining the target Q-values
Recall that in the context of training a neural network, the process hinges on two key elements: the input data we provide and the corresponding outputs we want the network to learn to predict.
We will need to provide the network with some target Q-values that are updated based on the reward given by the environment at this specific state and action, plus the discounted (by gamma) predicted reward of the best action at the next state.
I know that is a lot to take in, but it will be best explained through implementation and example.
import numpy as np
...
class Agent:
def __init__(self, grid_size, epsilon=1, epsilon_decay=0.995, epsilon_end=0.01, gamma=0.99):
...
self.gamma = gamma
...
...
def learn(self, experiences):
...
# Initialize the target Q-values as the current Q-values
target_q_values = current_q_values.copy()
# Loop through each experience in the batch
for i in range(len(experiences)):
if dones[i]:
# If the episode is done, there is no next Q-value
# [i, actions[i]] is the numpy equivalent of [i][actions[i]]
target_q_values[i, actions[i]] = rewards[i]
else:
# The updated Q-value is the reward plus the discounted max Q-value for the next state
# [i, actions[i]] is the numpy equivalent of [i][actions[i]]
target_q_values[i, actions[i]] = rewards[i] + self.gamma * np.max(next_q_values[i])
...
We’ve defined the class attribute, gamma, with a default value of 0.99.
Then, after getting the prediction for state and next_state that we implemented above, we initialize target_q_values to the current Q-values. These will be updated in the following loop.
Updating target_q_values
We loop through each experience in the batch with two cases for updating the values:
- If the episode is done, the target_q_value for that action is simply the reward given because there is no relevant next_q_value.
- Otherwise, the episode is not done, and the target_q_value for that action becomes the reward given, plus the discounted Q-value of the predicted next action in next_q_values.
Update if done is true:
target_q_values[i, actions[i]] = rewards[i]
Update if done is false:
target_q_values[i, actions[i]] = rewards[i] + self.gamma * np.max(next_q_values[i])
The syntax here, target_q_values[i, actions[i]], can seem confusing but it’s essentially the Q-value of the i-th experience, for the action actions[i].
Experience in batch Reward from environment
v v
target_q_values[i, actions[i]] = rewards[i]
^
Index of the action chosen
This is NumPy’s equivalent to [i][actions[i]] in Python lists. Remember each action is an index (0 to 3).
How target_q_values is updated
Just to illustrate this more clearly I will show how target_q_values more closely aligns with the actual rewards given as we train. Remember that we are working with a batch. This will be a batch of three with example values for simplicity.
Also, ensure that you understand that the entries in experiences are independent. Meaning this is not a sequence of steps, but a random sample from a collection of individual experiences.
Pretend the values of actions, rewards, dones, current_q_values, and next_q_values are as follows.
gamma = 0.99
actions = [1, 2, 2] # (down, left, left)
rewards = [1, -1, 100] # Rewards given by the environment for the action
dones = [False, False, True] # Indicating whether the episode is complete
current_q_values = [
[2, 5, -2, -3], # In this state, action 2 (index 1) is best so far
[1, 3, 4, -1], # Here, action 3 (index 2) is currently favored
[-3, 2, 6, 1] # Action 3 (index 2) has the highest Q-value in this state
]
next_q_values = [
[1, 4, -1, -2], # Future Q-values after taking each action from the first state
[2, 2, 5, 0], # Future Q-values from the second state
[-2, 3, 7, 2] # Future Q-values from the third state
]
We then copy current_q_values into target_q_values to be updated.
target_q_values = current_q_values
Then, for every experience in the batch we can show the associated values.
This is not code, but simply an example of the values at each stage. If you get lost, be sure to refer back to the initial values to see where each is coming from.
Entry 1
i = 0 # This is the first entry in the batch (first loop)
# First entries of associated values
actions[i] = 1
rewards[i] = 1
dones[i] = False
target_q_values[i] = [2, 5, -2, -3]
next_q_values[i] = [1, 4, -1, -2]
Because dones[i] is false for this experience we need to consider the next_q_values and apply gamma (0.99).
target_q_values[i, actions[i]] = rewards[i] + 0.99 * max(next_q_values[i])
Why get the largest of next_q_values[i]? Because that would be the next action chosen and we want the estimated reward (Q-value).
Then we update the i-th target_q_values at the index corresponding to actions[i] to the reward for this state/action pair plus the discounted reward for the next state/action pair.
Here are the target values in this experience after being updated.
# Updated target_q_values[i]
target_q_values[i] = [2, 4.96, -2, -3]
^ ^
i = 0 action[i] = 1
As you can see, for the current state, choosing 1 (down) is now even more desirable because the value is higher and this behavior has been reinforced.
It may help to calculate these yourself to really make it clear.
Entry 2
i = 1 # This is the second entry in the batch
# Second entries of associated values
actions[i] = 2
rewards[i] = -1
dones[i] = False
target_q_values[i] = [1, 3, 4, -1]
next_q_values[i] = [2, 2, 5, 0]
dones[i] is also false here, so we do need to consider the next_q_values.
target_q_values[i, actions[i]] = rewards[i] + 0.99 * max(next_q_values[i])
Again, updating the i-th experience’s target_q_values at the index actions[i].
# Updated target_q_values[i]
target_q_values[i] = [1, 3, 3.95, -1]
^ ^
i = 1 action[i] = 2
Choosing 2 (left) is now less desirable because the Q-value is lower and this behavior is discouraged.
Entry 3
Finally, the last entry in the batch.
i = 2 # This is the third and final entry in the batch
# Second entries of associated values
actions[i] = 2
rewards[i] = 100
dones[i] = True
target_q_values[i] = [-3, 2, 6, 1]
next_q_values[i] = [-2, 3, 7, 2]
dones[i] for this entry is true, indicating that the episode is complete and there will be no further actions taken. This means we do not consider next_q_values in our update.
target_q_values[i, actions[i]] = rewards[i]
Notice that we simply set target_q_values[i, action[i]] to the value of rewards[i], because no more actions will be taken — there is no future to consider.
# Updated target_q_values[i]
target_q_values[i] = [-3, 2, 100, 1]
^ ^
i = 2 action[i] = 2
Choosing 2 (left) in this and similar states will now be much more desirable.
This is the state where the goal was to the left of the agent, so when that action was chosen the full reward was given.
Although it can seem rather confusing, the idea is simply to make updated Q-values that accurately represent the rewards given by the environment to provide to the neural network. That is what the NN is supposed to approximate.
Try to imagine it in reverse. Because the reward for reaching the goal is substantial, it will create a propagation effect throughout the states leading to the one where the agent achieves the goal. This is the power of gamma in considering the next state and its role in the rippling of reward values backward through the state-space.
Above is a simplified version of the Q-values and the effect of the discount factor, only considering the reward for the goal, not the incremental rewards or penalties.
Pick any cell in the grid and move to the highest quality adjacent cell. You will see that it always provides an optimal path to the goal.
This effect is not immediate. It requires the agent to explore the state and action-space to gradually learn and adjust its strategy, building an understanding of how different actions lead to varying rewards over time.
If the reward structure was carefully crafted, this will slowly guide our agent towards taking more advantageous actions.
Fitting the neural network
For the learn method, the last thing there is to do is provide the agent’s neural network with states and their associated target_q_values. TensorFlow will then handle updating the weights to more closely predict these values on similar states.
...
def learn(self, experiences):
states = np.array([experience.state for experience in experiences])
actions = np.array([experience.action for experience in experiences])
rewards = np.array([experience.reward for experience in experiences])
next_states = np.array([experience.next_state for experience in experiences])
dones = np.array([experience.done for experience in experiences])
# Predict the Q-values (action values) for the given state batch
current_q_values = self.model.predict(states, verbose=0)
# Predict the Q-values for the next_state batch
next_q_values = self.model.predict(next_states, verbose=0)
# Initialize the target Q-values as the current Q-values
target_q_values = current_q_values.copy()
# Loop through each experience in the batch
for i in range(len(experiences)):
if dones[i]:
# If the episode is done, there is no next Q-value
target_q_values[i, actions[i]] = rewards[i]
else:
# The updated Q-value is the reward plus the discounted max Q-value for the next state
# [i, actions[i]] is the numpy equivalent of [i][actions[i]]
target_q_values[i, actions[i]] = rewards[i] + self.gamma * np.max(next_q_values[i])
# Train the model
self.model.fit(states, target_q_values, epochs=1, verbose=0)
The only new part is self.model.fit(states, target_q_values, epochs=1, verbose=0). fit takes two main arguments: the input data and the target values we want. In this case, our input is a batch states and the target values are the updated Q-values for each state.
epochs=1 simply sets the number of times you want the network to try to fit to the data. One is enough because we want it to be able to generalize well, not to fit to this specific batch. verbose=0 simply tells TensorFlow not to print debug messages like progress bars.
The Agent class is now equipped with the ability to learn from experiences but it needs two more simple methods — save and load.
Saving and loading trained models
Saving and loading the model prevents us from having to completely retrain every time we need it. We can use the simple TensorFlow methods that only take one argument, file_path.
from tensorflow.keras.models import load_model
def load(self, file_path):
self.model = load_model(file_path)
def save(self, file_path):
self.model.save(file_path)
Make a directory called models, or whatever you like, and then you can save your trained model at set intervals. These files end in .h5. So whenever you want to save your model you simply call agent.save(‘models/model_name.h5’). The same goes for when you want to load one.
Full Agent class
from tensorflow.keras.layers import Dense
from tensorflow.keras.models import Sequential, load_model
import numpy as np
class Agent:
def __init__(self, grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01, gamma=0.99):
self.grid_size = grid_size
self.epsilon = epsilon
self.epsilon_decay = epsilon_decay
self.epsilon_end = epsilon_end
self.gamma = gamma
def build_model(self):
# Create a sequential model with 3 layers
model = Sequential([
# Input layer expects a flattened grid, hence the input shape is grid_size squared
Dense(128, activation='relu', input_shape=(self.grid_size**2,)),
Dense(64, activation='relu'),
# Output layer with 4 units for the possible actions (up, down, left, right)
Dense(4, activation='linear')
])
model.compile(optimizer='adam', loss='mse')
return model
def get_action(self, state):
# rand() returns a random value between 0 and 1
if np.random.rand() <= self.epsilon:
# Exploration: random action
action = np.random.randint(0, 4)
else:
# Add an extra dimension to the state to create a batch with one instance
state = np.expand_dims(state, axis=0)
# Use the model to predict the Q-values (action values) for the given state
q_values = self.model.predict(state, verbose=0)
# Select and return the action with the highest Q-value
action = np.argmax(q_values[0]) # Take the action from the first (and only) entry
# Decay the epsilon value to reduce the exploration over time
if self.epsilon > self.epsilon_end:
self.epsilon *= self.epsilon_decay
return action
def learn(self, experiences):
states = np.array([experience.state for experience in experiences])
actions = np.array([experience.action for experience in experiences])
rewards = np.array([experience.reward for experience in experiences])
next_states = np.array([experience.next_state for experience in experiences])
dones = np.array([experience.done for experience in experiences])
# Predict the Q-values (action values) for the given state batch
current_q_values = self.model.predict(states, verbose=0)
# Predict the Q-values for the next_state batch
next_q_values = self.model.predict(next_states, verbose=0)
# Initialize the target Q-values as the current Q-values
target_q_values = current_q_values.copy()
# Loop through each experience in the batch
for i in range(len(experiences)):
if dones[i]:
# If the episode is done, there is no next Q-value
target_q_values[i, actions[i]] = rewards[i]
else:
# The updated Q-value is the reward plus the discounted max Q-value for the next state
# [i, actions[i]] is the numpy equivalent of [i][actions[i]]
target_q_values[i, actions[i]] = rewards[i] + self.gamma * np.max(next_q_values[i])
# Train the model
self.model.fit(states, target_q_values, epochs=1, verbose=0)
def load(self, file_path):
self.model = load_model(file_path)
def save(self, file_path):
self.model.save(file_path)
Each class of your deep reinforcement learning gym is now complete! You have successfully coded Agent, Environment, and ExperienceReplay. The only thing left is the main training loop.
8. Executing The Training Loop: Putting It All Together
We are at the final stretch of the project! Every piece we have coded, Agent, Environment, and ExperienceReplay, needs some way to interact.
This will be the main program where each episode is run and where we define our hyper-parameters like epsilon.
Although it is fairly simple, I will break up each part as we code it to make it more clear.
Initialize each part
First, we set grid_size and use the classes we have made to initialize each instance.
from environment import Environment
from agent import Agent
from experience_replay import ExperienceReplay
if __name__ == '__main__':
grid_size = 5
environment = Environment(grid_size=grid_size, render_on=True)
agent = Agent(grid_size=grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01)
experience_replay = ExperienceReplay(capacity=10000, batch_size=32)
...
Now we have each part we need for the main training loop.
Episode and step cap
Next, we will define the number of episodes we want the training to run, and the max number of steps allowed in each episode.
Capping the number of steps helps ensure our agent doesn’t get stuck in a loop and encourages shorter paths. We will be fairly generous and for a 5×5 we will set the max to 200. This will need to be increased for larger environments.
from environment import Environment
from agent import Agent
from experience_replay import ExperienceReplay
if __name__ == '__main__':
grid_size = 5
environment = Environment(grid_size=grid_size, render_on=True)
agent = Agent(grid_size=grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01)
experience_replay = ExperienceReplay(capacity=10000, batch_size=32)
# Number of episodes to run before training stops
episodes = 5000
# Max number of steps in each episode
max_steps = 200
...
Episode loop
In each episode we will reset environment and save the initial state. Then we perform each step until either done is true or max_steps is reached. Finally, we save the model. The logic for each step has not been implemented quite yet.
from environment import Environment
from agent import Agent
from experience_replay import ExperienceReplay
if __name__ == '__main__':
grid_size = 5
environment = Environment(grid_size=grid_size, render_on=True)
agent = Agent(grid_size=grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01)
experience_replay = ExperienceReplay(capacity=10000, batch_size=32)
# Number of episodes to run before training stops
episodes = 5000
# Max number of steps in each episode
max_steps = 200
for episode in range(episodes):
# Get the initial state of the environment and set done to False
state = environment.reset()
# Loop until the episode finishes
for step in range(max_steps):
# Logic for each step
...
if done:
break
agent.save(f'models/model_{grid_size}.h5')
Notice we name the model using grid_size because the NN architecture will be different for each input size. Trying to load a 5×5 model into a 10×10 architecture will throw an error.
Step logic
Finally, inside of the step loop we will lay out the interaction between each piece as discussed before.
from environment import Environment
from agent import Agent
from experience_replay import ExperienceReplay
if __name__ == '__main__':
grid_size = 5
environment = Environment(grid_size=grid_size, render_on=True)
agent = Agent(grid_size=grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01)
experience_replay = ExperienceReplay(capacity=10000, batch_size=32)
# Number of episodes to run before training stops
episodes = 5000
# Max number of steps in each episode
max_steps = 200
for episode in range(episodes):
# Get the initial state of the environment and set done to False
state = environment.reset()
# Loop until the episode finishes
for step in range(max_steps):
print('Episode:', episode)
print('Step:', step)
print('Epsilon:', agent.epsilon)
# Get the action choice from the agents policy
action = agent.get_action(state)
# Take a step in the environment and save the experience
reward, next_state, done = environment.step(action)
experience_replay.add_experience(state, action, reward, next_state, done)
# If the experience replay has enough memory to provide a sample, train the agent
if experience_replay.can_provide_sample():
experiences = experience_replay.sample_batch()
agent.learn(experiences)
# Set the state to the next_state
state = next_state
if done:
break
agent.save(f'models/model_{grid_size}.h5')
For every step of the episode, we start by printing the episode and step number to give us some information about where we are in training. Additionally, you can print epsilon to see what percentage of the agent’s actions are random. It also helps because if you want to stop for any reason you can restart the agent at the same epsilon value.
After printing the information, we use the agent policy to get action from this state to take a step in environment, recording the returned values.
Then we save state, action, reward, next_state, and done as an experience. If experience_replay has enough memory we train agent on a random batch of experiences.
Finally, we set state to next_state and check if the episode is done.
Once you’ve run at least one episode you’ll have a model saved you can load and either continue where you left off or evaluate the performance.
After you initialize agent simply use its load method similar to how we saved — agent.load(f’models/model_{grid_size}.h5′)
You can also add a slight delay at each step when you are evaluating the model using time — time.sleep(0.5). This causes each step to pause for half a second. Make sure you include import time.
Completed training loop
from environment import Environment
from agent import Agent
from experience_replay import ExperienceReplay
import time
if __name__ == '__main__':
grid_size = 5
environment = Environment(grid_size=grid_size, render_on=True)
agent = Agent(grid_size=grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01)
# agent.load(f'models/model_{grid_size}.h5')
experience_replay = ExperienceReplay(capacity=10000, batch_size=32)
# Number of episodes to run before training stops
episodes = 5000
# Max number of steps in each episode
max_steps = 200
for episode in range(episodes):
# Get the initial state of the environment and set done to False
state = environment.reset()
# Loop until the episode finishes
for step in range(max_steps):
print('Episode:', episode)
print('Step:', step)
print('Epsilon:', agent.epsilon)
# Get the action choice from the agents policy
action = agent.get_action(state)
# Take a step in the environment and save the experience
reward, next_state, done = environment.step(action)
experience_replay.add_experience(state, action, reward, next_state, done)
# If the experience replay has enough memory to provide a sample, train the agent
if experience_replay.can_provide_sample():
experiences = experience_replay.sample_batch()
agent.learn(experiences)
# Set the state to the next_state
state = next_state
if done:
break
# Optionally, pause for half a second to evaluate the model
# time.sleep(0.5)
agent.save(f'models/model_{grid_size}.h5')
When you need time.sleep or agent.load you can simply uncomment them.
Running the program
Give it a run! You should be able to successfully train the agent to complete the goal up to an 8×8 or so grid environment. Any grid size much larger than this and the training begins to struggle.
Try to see how large you can get the environment. You can do a few things such as adding layers and neurons to the neural network, changing epsilon_decay, or giving more time to train. Doing this can solidify your understanding of each part.
For instance, you may notice epsilon reaches epsilon_end rather fast. Don’t be afraid to change the epsilon_decay to values of 0.9998 or 0.99998 if you would like.
As the grid size grows, the state the network is fed gets exponentially larger.
I’ve included a short bonus section at the end to fix this and to demonstrate that there are many ways you can represent the environment for the agent.
9. Wrapping It Up
Congratulations on completing this comprehensive journey through the world of Reinforcement and Deep Q-Learning!
Although there is always more to cover, you could walk away having acquired important insights and skills.
In this guide you:
- Were introduced to the core concepts of reinforcement learning and why it’s a crucial area in AI.
- Built a simple environment, laying the groundwork for agent interaction and learning.
- Defined the agent’s Neural Network architecture for use with Deep Q-Learning, enabling your agent to make decisions in more complex environments than traditional Q-Learning.
- Understood why exploration is important before exploiting the learned strategy and implemented the Epsilon-Greedy policy.
- Implemented the reward system to guide the agent to the goal and learned the differences between sparse and dense rewards.
- Designed the experience replay mechanism, allowing the agent to learn from past experiences.
- Gained hands-on experience in fitting the neural network, a critical process where the agent improves its performance based on feedback from the environment.
- Put all these pieces together in a training loop, witnessing the agent’s learning process in action and tweaking it for optimal performance.
By now, you should feel confident in your understanding of Reinforcement Learning and Deep Q-Learning. You’ve built a solid foundation, not just in theory but also in practical application, by constructing a DRL gym from scratch.
This knowledge equips you to tackle more complex RL problems and paves the way for further exploration in this exciting field of AI.
Above is a grid game inspired by Agar.io where agents are encouraged to grow in size, often from eating one another. At each step the environment was plotted on a graph using the Python library, Matplotlib. The boxes around the agents are their field of view. This is fed to them as their state from the environment as a flattened grid, similar to what we’ve done in our system.
Games like this, and a myriad of other uses, can be crafted with simple modifications to what you have made here.
Remember though, Deep Q-Learning is only suitable for a discrete action-space — one that has a finite number of distinct actions. For a continuous action-space, like in a physics based environment, you will need to explore other methods in the world of DRL.
10. Bonus: Optimize State Representation
Believe it or not, the way we have currently been representing state is not the most optimal for this use.
It is actually incredibly inefficient.
For a grid of 100×100 there are 99,990,000 possible states. Not only would the model need to be quite large considering the size of the input — 10,000 values, it would require a significant volume of training data. Depending on the computational resources one has available this could take days or weeks.
Another downfall is flexibility. The model currently is stuck at one grid size. If you want to use a different sized grid, you need to train another model completely from scratch.
We need a way to represent the state that significantly reduces the state-space and translates well to any grid size.
The better way
While there are several ways to do this, the simplest, and probably most effective, is to use the relative distance from the goal.
Rather than the state for a 5×5 grid looking like this:
[0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0, 0, 0]
It can be represented with only two values:
[-2, -1]
Using this method would lower the state-space of a 100×100 grid from 99,990,000 to 39,601!
Not only that, but it can generalize much better. It simply has to learn that moving down is the right choice when the first value is negative, and moving right is appropriate when the second value is negative, with the opposite actions applying for positive values.
This enables the model to only explore a fraction of the state-space.
Above is the progression of a model’s learning, trained on a 25×25 grid. It shows the agent’s choice color coded at each cell with the goal in the center.
At first, during the exploration stage, the agent’s strategy is completely off. You can see that it chooses to go up when it is above the target, down when it is below, and so on.
But in under 10 episodes it learns a strategy that allows it to reach the goal in the shortest number of steps from any cell.
This also applies with the goal at any location.
And finally it generalizes its learning incredibly well.
This model has only ever seen a 25×25 grid, but it could use its strategy on a far larger environment — 201×201. With an environment this size there are 1,632,200,400 agent-goal permutations!
Let’s update our code with this radical improvement.
Implementation
There really isn’t much we need to do to get this working, thankfully.
The first thing is update get_state in Environment.
def get_state(self):
# Calculate row distance and column distance
relative_distance = (self.agent_location[0] - self.goal_location[0],
self.agent_location[1] - self.goal_location[1])
# Unpack tuple into numpy array
state = np.array([*relative_distance])
return state
Rather than a flattened version of the grid, we calculate the distance from the target and return it as a NumPy array. The * operator simply unpacks the tuple into individual components. It will have the same effect as doing this — state = np.array([relative_distance[0], relative_distance[1]).
Also, in move_agent we can update the penalty for hitting the boundary to be the same as moving away from the target. This is so that when you change the grid size, the agent is not discouraged from moving outside where it was originally trained.
def move_agent(self, action):
...
else:
# Same punishment for an invalid move
reward = -1.1
return reward, done
Updating the neural architecture
Currently our TensorFlow model looks like this. I’ve excluded everything else for simplicity.
class Agent:
def __init__(self, grid_size, ...):
self.grid_size = grid_size
...
self.model = self.build_model()
def build_model(self):
# Create a sequential model with 3 layers
model = Sequential([
# Input layer expects a flattened grid, hence the input shape is grid_size squared
Dense(128, activation='relu', input_shape=(self.grid_size**2,)),
Dense(64, activation='relu'),
# Output layer with 4 units for the possible actions (up, down, left, right)
Dense(4, activation='linear')
])
model.compile(optimizer='adam', loss='mse')
return model
...
If you remember, our model architecture needs to have a consistent input. In this case, the input size relied on grid_size.
With our updated state representation, each state will only have two values no matter what grid_size is. We can update the model to expect this. Also, we can remove self.grid_size altogether because the Agent class no longer relies on it.
class Agent:
def __init__(self, ...):
...
self.model = self.build_model()
def build_model(self):
# Create a sequential model with 3 layers
model = Sequential([
# Input layer expects a flattened grid, hence the input shape is grid_size squared
Dense(64, activation='relu', input_shape=(2,)),
Dense(32, activation='relu'),
# Output layer with 4 units for the possible actions (up, down, left, right)
Dense(4, activation='linear')
])
model.compile(optimizer='adam', loss='mse')
return model
...
The input_shape parameter expects a tuple representing the state of the input.
(2,) specifies a one-dimensional array with two values. Looking something like this:
[-2, 0]
While (2,1), a two-dimensional array for example, specifies two rows and one column. Looking something like this:
[[-2],
[0]]
Finally, we’ve lowered the number of neurons in our hidden layers to 64 and 32 respectively. With this simple state representation it’s still probably overkill, but should run plenty fast enough.
When you start training, try to see how few neurons you need for the model to effectively learn. You can even try removing the second layer if you like.
Fixing the main training loop
The training loop requires very few adjustments. Let’s update it to match our changes.
from environment import Environment
from agent import Agent
from experience_replay import ExperienceReplay
import time
if __name__ == '__main__':
grid_size = 5
environment = Environment(grid_size=grid_size, render_on=True)
agent = Agent(epsilon=1, epsilon_decay=0.998, epsilon_end=0.01)
# agent.load(f'models/model.h5')
experience_replay = ExperienceReplay(capacity=10000, batch_size=32)
# Number of episodes to run before training stops
episodes = 5000
# Max number of steps in each episode
max_steps = 200
for episode in range(episodes):
# Get the initial state of the environment and set done to False
state = environment.reset()
# Loop until the episode finishes
for step in range(max_steps):
print('Episode:', episode)
print('Step:', step)
print('Epsilon:', agent.epsilon)
# Get the action choice from the agents policy
action = agent.get_action(state)
# Take a step in the environment and save the experience
reward, next_state, done = environment.step(action)
experience_replay.add_experience(state, action, reward, next_state, done)
# If the experience replay has enough memory to provide a sample, train the agent
if experience_replay.can_provide_sample():
experiences = experience_replay.sample_batch()
agent.learn(experiences)
# Set the state to the next_state
state = next_state
if done:
break
# Optionally, pause for half a second to evaluate the model
# time.sleep(0.5)
agent.save(f'models/model.h5')
Because agent no longer needs the grid_size, we can remove it to prevent any errors.
We also no longer have to give the model different names for each grid_size, since one model now works on any size.
If you’re curious about ExperienceReplay, it will remain the same.
Please note that there is no one-size-fits-all state representation. In some cases it may make sense to provide the full grid like we did, or a subsection of it like I’ve done with the multi-agent system in section 9. The goal is to find a balance between simplifying the state-space and providing adequate information for the agent to learn.
Hyper-parameters
Even a simple environment like ours requires adjustments of the hyper-parameters. Remember that these are the values we can change that effect training.
Each one we have discussed includes:
- epsilon, epsilon_decay, epsilon_end (exploration/exploitation)
- gamma (discount factor)
- number of neurons and layers
- batch_size, capacity (experience replay)
- max_steps
There are plenty of others, but there is just one more we will discuss that will be critical for learning.
Learning rate
The Learning Rate (LR) is a hyper-parameter of the neural network model.
It basically tells the neural network how much to adjust its weights — values used for transformation of the input — each time it is fit to the data.
The values of LR typically range from 1 down to 0.0000001, with the most common being values like 0.01, 0.001, and 0.0001.
If the learning rate is too low, it might not update the Q-values quickly enough to learn an optimal strategy, a process known as convergence. If you notice that there seems to be a stagnation in learning, or none at all, this could be a sign that the learning rate is not high enough.
While these diagrams on learning rate are greatly simplified, they should get the basic idea across.
One the other side, a learning rate that is too high can cause your values to “explode” or become increasingly large. The adjustments the model makes are too great, causing it to diverge — or get worse over time.
What is the perfect learning rate?
How long is a piece of string?
In many cases you just have to use simple trial and error. A good way to determine if your learning rate is the issue is to check the output of the model.
This is exactly the issue I was facing when training this model. After switching to the simplified state representation, it refused to learn. The agent would actually continue to go to the bottom right of the grid after extensively testing each hyper-parameter.
It did not make sense to me, so I decided to take a look at the Q-values output by the model in the Agent get_action method.
Step 10
[[ 0.29763165 0.28393078 -0.01633328 -0.45749056]]
Step 50
[[ 7.173178 6.3558702 -0.48632553 -3.1968129 ]]
Step 100
[[ 33.015953 32.89661 33.11674 -14.883122]]
Step 200
[[573.52844 590.95685 592.3647 531.27576]]
...
Step 5000
[[37862352. 34156752. 35527612. 37821140.]]
This is an example of exploding values.
In TensorFlow the optimizer we are using to adjust the weights, Adam, has a default learning rate of 0.001. For this specific case it happened to be much too high.
After testing various values, a sweet spot seems to be at 0.00001.
Let’s implement this.
from tensorflow.keras.optimizers import Adam
def build_model(self):
# Create a sequential model with 3 layers
model = Sequential([
# Input layer expects a flattened grid, hence the input shape is grid_size squared
Dense(64, activation='relu', input_shape=(2,)),
Dense(32, activation='relu'),
# Output layer with 4 units for the possible actions (up, down, left, right)
Dense(4, activation='linear')
])
# Update learning rate
optimizer = Adam(learning_rate=0.00001)
# Compile the model with the custom optimizer
model.compile(optimizer=optimizer, loss='mse')
return model
Feel free to adjust this and observe how the Q-values are affected. Also, make sure to import Adam.
Finally, you can once again begin training!
Heat-map code
Below is the code for plotting your own heat-map as shown previously if you are interested.
import matplotlib.pyplot as plt
import numpy as np
from tensorflow.keras.models import load_model
def generate_heatmap(episode, grid_size, model_path):
# Load the model
model = load_model(model_path)
goal_location = (grid_size // 2, grid_size // 2) # Center of the grid
# Initialize an array to store the color intensities
heatmap_data = np.zeros((grid_size, grid_size, 3))
# Define colors for each action
colors = {
0: np.array([0, 0, 1]), # Blue for up
1: np.array([1, 0, 0]), # Red for down
2: np.array([0, 1, 0]), # Green for left
3: np.array([1, 1, 0]) # Yellow for right
}
# Calculate Q-values for each state and determine the color intensity
for x in range(grid_size):
for y in range(grid_size):
relative_distance = (x - goal_location[0], y - goal_location[1])
state = np.array([*relative_distance]).reshape(1, -1)
q_values = model.predict(state)
best_action = np.argmax(q_values)
if (x, y) == goal_location:
heatmap_data[x, y] = np.array([1, 1, 1])
else:
heatmap_data[x, y] = colors[best_action]
# Plotting the heatmap
plt.imshow(heatmap_data, interpolation='nearest')
plt.xlabel(f'Episode: {episode}')
plt.axis('off')
plt.tight_layout(pad=0)
plt.savefig(f'./figures/heatmap_{grid_size}_{episode}', bbox_inches='tight')
Simply import it into your training loop and run it however often you would like.
Next steps
Once you have effectively trained your model and experimented with the hyper-parameters, I encourage you to truly make it your own.
Some ideas for expanding the system:
- Add obstacles between the agent and goal
- Create a more varied environment, possibly with randomly generated rooms and pathways
- Implement a multi-agent cooperation/competition system — hide and seek
- Create a Pong inspired game
- Implement resource management such as a hunger or energy system where the agent needs to collect food on the way to the goal
Here is an example that goes beyond our simple grid system:
Using Pygame, a popular Python library for making 2d games, I constructed a Flappy Bird clone. Then I defined the interactions, constraints, and reward structure in our prebuilt Environment class.
I represented the state as the current velocity and location of the agent, the distance to the closest pipe, and the location of the opening.
For the Agent class I simply updated the input size to (4,), added more layers to the NN, and updated the network to only output two values — jump or not jump.
You can find and run this in the flappy_bird directory on the GitHub repo. Make sure to pip install pygame.
This shows that what you’ve built is applicable with a variety of environments. You can even have the agent explore a 3d environment or perform more abstract tasks like stock trading.
While expanding your system don’t be afraid to get creative with your environment, state representation, and reward system. Like the agent, we learn best by exploration!
I hope building a DRL gym from scratch has opened your eyes to the beauty of AI and has inspired you to dive deeper.
This article was inspired by the Neural Networks From Scratch In Python Book and youtube series by Harrison Kinsley (sentdex) and Daniel Kukieł. The conversational style and from scratch code implementations really solidified my understanding of Neural Networks.
Develop Your First AI Agent: Deep Q-Learning was originally published in Towards Data Science on Medium, where people are continuing the conversation by highlighting and responding to this story.
Originally appeared here:
Develop Your First AI Agent: Deep Q-Learning
Go Here to Read this Fast! Develop Your First AI Agent: Deep Q-Learning