Building a DQN AI Agent with Python, PyTorch, and Pygame
1. Introduction
Deep Q-Networks (DQN) have become a popular method for solving Reinforcement Learning (RL) tasks, especially when dealing with high-dimensional state spaces. This article will guide you through building a DQN AI agent that learns to navigate a simple 5×5 grid, avoiding obstacles and reaching a goal. Using PyTorch for neural network implementation and Pygame for visualizing the environment, we’ll cover all essential steps, from setting up the environment to optimizing the DQN.
2. Prerequisites
Before we begin, ensure you have Python installed and set up a virtual environment. Install necessary packages such as torch
, pygame
, and numpy
.
# Setup virtual environment
python -m venv venv
venv/Scripts/activate
python.exe -m pip install --upgrade pip
pip install torch pygame numpy
3. Overview of DQN and Reinforcement Learning
A DQN is a type of Q-learning algorithm that uses a neural network to approximate the Q-value function. The Q-value function helps the agent choose actions that maximize rewards over time, allowing it to learn an optimal policy in an unknown environment.
Key parameters and terms in DQN:
- State: The current position or situation of the agent.
- Action: Choices available to the agent, such as moving up, down, left, or right.
- Reward: The feedback the agent receives after taking an action, guiding it toward the goal.
- Gamma (γ): The discount factor, which prioritizes immediate rewards over future rewards.
3.1 Step 1: Set Up the Environment with Pygame
Our environment is a 5×5 grid where the agent (AI) starts at (0, 0)
, the goal is at (4, 4)
, and there are obstacles at defined locations. We use Pygame for grid rendering, showing the agent, goal, and obstacles on the screen.
# Game parameters
width, height = 500, 500
grid_size = 5
cell_size = width // grid_size
goal_position = (4, 4)
obstacles = [(1, 1), (2, 2), (3, 3)]
colors = {
"agent": (0, 128, 255),
"goal": (0, 255, 0),
"obstacle": (255, 0, 0),
"grid": (200, 200, 200)
}
In this Python reinforcement learning program, the game parameters set up the visual and logical aspects of a grid-based environment. Let’s break down each part of this section:
3.1.1 Grid and Display Dimensions
width, height = 500, 500
: The grid is displayed on a 500×500 pixel window. This ensures that the entire environment is visible to both the user and the agent.grid_size = 5
: The grid is divided into 5 rows and 5 columns, creating a 5×5 environment. This setup is simple enough for an agent to learn basic navigation while still posing a challenge with obstacles.
3.1.2. Cell Size Calculation
cell_size = width // grid_size
: This determines the size of each cell by dividing the window width by the grid size. Here, each cell becomes 100×100 pixels (500 // 5
), allowing the agent and objects to be positioned neatly within each cell on the screen.
3.1.3. Goal and Obstacle Positions
goal_position = (4, 4)
: Sets the goal at the bottom-right corner of the grid. The agent’s objective is to reach this cell to complete its task.obstacles = [(1, 1), (2, 2), (3, 3)]
: Lists three obstacles placed at specific coordinates. The agent learns to navigate around these obstacles to avoid penalties.
3.1.4. Colors Dictionary
colors
: A dictionary specifying RGB color values for various elements in the grid:"agent": (0, 128, 255)
: Blue color represents the agent."goal": (0, 255, 0)
: Green color marks the goal cell."obstacle": (255, 0, 0)
: Red color designates obstacles."grid": (200, 200, 200)
: Light gray color is used for grid lines, visually dividing the cells.
These parameters define the core layout and visual style of the environment, helping to create a simple, grid-based world where an agent can learn to navigate toward a goal while avoiding obstacles.
3.2 Step 2: Define the DQN Model in PyTorch
The DQN model consists of three fully connected layers. It accepts a 2D input (agent’s x and y position) and outputs Q-values for the four possible actions.
class DQN(nn.Module):
def __init__(self, input_dim, output_dim):
super(DQN, self).__init__()
self.fc1 = nn.Linear(input_dim, 64)
self.fc2 = nn.Linear(64, 64)
self.fc3 = nn.Linear(64, output_dim)
def forward(self, x):
x = torch.relu(self.fc1(x))
x = torch.relu(self.fc2(x))
return self.fc3(x)
The DQN
class defines the Deep Q-Network (DQN) model used in this reinforcement learning setup. The network architecture is a simple feedforward neural network with fully connected (linear) layers and ReLU activation functions. Here’s a breakdown:
3.2.1. Class Definition and Initialization
- The class
DQN
inherits fromtorch.nn.Module
, a base class in PyTorch for creating neural network models. __init__(self, input_dim, output_dim)
: The constructor method initializes the network layers based on the dimensions of the input and output. Here:input_dim
: The number of features in the input (2, representing the(x, y)
coordinates).output_dim
: The number of possible actions (4, representing up, down, left, right).
3.2.2. Layers of the Neural Network
self.fc1 = nn.Linear(input_dim, 64)
: The first fully connected layer takes the input and maps it to 64 neurons.self.fc2 = nn.Linear(64, 64)
: The second layer, also with 64 neurons, processes the output from the first layer. This hidden layer allows the model to learn complex patterns in the data.self.fc3 = nn.Linear(64, output_dim)
: The final layer maps the 64 neurons down to theoutput_dim
, which represents the Q-values for each action. Each value is associated with a potential action that the agent can take.
3.2.3. Forward Pass (Inference)
def forward(self, x)
: The forward method defines how the input data moves through each layer.x = torch.relu(self.fc1(x))
: The input is passed throughfc1
and activated withReLU
(Rectified Linear Unit), adding non-linearity to allow the network to learn complex representations.x = torch.relu(self.fc2(x))
: The output from the first layer is processed throughfc2
and activated again.return self.fc3(x)
: The final layer (fc3
) outputs Q-values without an activation function, as the Q-values represent expected rewards for each action.
In summary, this DQN model maps a state input (agent’s position) to a set of Q-values, each representing the expected reward for one of the possible actions. The agent will choose the action with the highest Q-value to maximize its expected reward.
3.3 Step 3: Initialize DQN Components
Initialize two networks: policy_net
for learning and target_net
for stable target values. Set the optimizer, loss function, and experience replay memory.
state_dim = 2 # (x, y) coordinates
action_dim = 4 # up, down, left, right
policy_net = DQN(state_dim, action_dim)
target_net = DQN(state_dim, action_dim)
target_net.load_state_dict(policy_net.state_dict())
optimizer = optim.Adam(policy_net.parameters(), lr=0.001)
loss_fn = nn.MSELoss()
memory = deque(maxlen=2000)
This section initializes important components for training the DQN agent. Let’s go over each part:
3.3.1. State and Action Dimensions
state_dim = 2
: The state dimension is set to 2 because the agent’s state is represented by its(x, y)
coordinates on the grid.action_dim = 4
: The action dimension is set to 4, representing the four possible actions the agent can take (up, down, left, right).
3.3.2. Policy and Target Networks
policy_net = DQN(state_dim, action_dim)
: The policy network (policy_net
) is the main model the agent uses to decide actions based on the current state. It’s a DQN model with input and output dimensions set tostate_dim
andaction_dim
, respectively.target_net = DQN(state_dim, action_dim)
: The target network (target_net
) is a separate copy of the policy network. It’s used to provide stable target Q-values during training, which helps prevent the model from becoming unstable.target_net.load_state_dict(policy_net.state_dict())
: This copies the weights frompolicy_net
totarget_net
, ensuring they initially match. During training, the target network is updated less frequently than the policy network (e.g., every 10 episodes) to stabilize learning.
3.3.3. Optimizer and Loss Function
optimizer = optim.Adam(policy_net.parameters(), lr=0.001)
: The Adam optimizer is used to update the weights of the policy network based on the gradients calculated during backpropagation. The learning rate (lr=0.001
) controls the step size for each update.loss_fn = nn.MSELoss()
: The mean squared error (MSE) loss function calculates the difference between predicted Q-values and target Q-values, helping the agent to minimize this error during training.
3.3.4. Replay Memory (Experience Replay)
memory = deque(maxlen=2000)
: This is a deque (double-ended queue) with a maximum length of 2000, used to store past experiences (state, action, reward, next_state, done). The agent samples batches from this memory to train, which improves learning stability by reusing past experiences and breaking the temporal correlation in the data.
These components together provide the basis for the DQN agent to learn by storing experiences, using the policy network to make decisions, and periodically updating the target network to stabilize training.
3.4 Step 4: Define Helper Functions
The helper functions include movement and reward calculation, action selection based on an epsilon-greedy policy, and model optimization.
Movement and Reward Functions
def get_new_position(position, action):
actions = [(-1, 0), (1, 0), (0, -1), (0, 1)] # up, down, left, right
new_position = (position[0] + actions[action][0],
position[1] + actions[action][1])
if 0 <= new_position[0] < grid_size and 0 <= new_position[1] < grid_size:
return new_position
return position
def get_reward(position):
if position == goal_position:
return 100
elif position in obstacles:
return -100
return -1
These two functions, get_new_position
and get_reward
, define the movement and rewards for the agent as it navigates the grid environment.
3.4.1. get_new_position(position, action)
- This function takes the agent’s current position and an action as inputs and returns the new position after applying that action.
actions = [(-1, 0), (1, 0), (0, -1), (0, 1)]
: This list defines the possible moves:(-1, 0)
for moving up,(1, 0)
for moving down,(0, -1)
for moving left,(0, 1)
for moving right.
new_position = (position[0] + actions[action][0], position[1] + actions[action][1])
: This calculates the agent’s new position by adding the values inactions[action]
to the current position coordinates.if 0 <= new_position[0] < grid_size and 0 <= new_position[1] < grid_size
: This condition checks if the new position is within the grid boundaries.- If the new position is within bounds, the function returns
new_position
. - If out of bounds, it returns the current
position
, so the agent doesn’t move outside the grid.
- If the new position is within bounds, the function returns
3.4.2. get_reward(position)
- This function assigns rewards based on the agent’s position, guiding its learning by encouraging reaching the goal and avoiding obstacles.
if position == goal_position
: If the agent reaches the goal, it receives a reward of100
, a large positive reward indicating a successful episode.elif position in obstacles
: If the agent lands on an obstacle, it receives a reward of-100
, a large penalty, discouraging this action in future moves.return -1
: For all other positions, the agent receives a small negative reward (-1
) to encourage it to reach the goal quickly, avoiding unnecessary moves.
Together, these functions manage the agent’s movement within the grid and help it learn effective navigation by assigning rewards based on its actions and outcomes.
Action Selection and Model Optimization
The epsilon-greedy policy is used for exploration. The optimize_model
function retrieves a random batch from memory and trains the policy network.
def choose_action(state):
if random.uniform(0, 1) < epsilon:
return random.choice(range(4))
else:
with torch.no_grad():
state_tensor = torch.FloatTensor(state).unsqueeze(0)
q_values = policy_net(state_tensor)
return torch.argmax(q_values).item()
def optimize_model():
if len(memory) < batch_size:
return
batch = random.sample(memory, batch_size)
states, actions, rewards, next_states, dones = zip(*batch)
states_tensor = torch.FloatTensor(states)
actions_tensor = torch.LongTensor(actions)
rewards_tensor = torch.FloatTensor(rewards)
next_states_tensor = torch.FloatTensor(next_states)
dones_tensor = torch.FloatTensor(dones)
current_q_values = policy_net(states_tensor).gather(1, actions_tensor.unsqueeze(1)).squeeze()
max_next_q_values = target_net(next_states_tensor).max(1)[0]
target_q_values = rewards_tensor + gamma * max_next_q_values * (1 - dones_tensor)
loss = loss_fn(current_q_values, target_q_values)
optimizer.zero_grad()
loss.backward()
optimizer.step()
These functions, choose_action
and optimize_model
, are key parts of the agent’s learning process. They handle action selection based on the exploration-exploitation tradeoff and the training of the policy network using experiences stored in memory.
1. choose_action(state)
- This function decides which action the agent should take based on an epsilon-greedy strategy, balancing exploration (random actions) and exploitation (using the learned policy).
if random.uniform(0, 1) < epsilon
: With a probability defined byepsilon
, the agent takes a random action. This promotes exploration, especially in early episodes, to help the agent discover effective moves.random.choice(range(4))
: If exploring, a random action (0 to 3, corresponding to up, down, left, right) is chosen.else
: If not exploring, the agent uses its policy network to exploit learned values.state_tensor = torch.FloatTensor(state).unsqueeze(0)
: Converts the state into a tensor and adds an extra dimension for batch processing.q_values = policy_net(state_tensor)
: Feeds the state through the policy network to get Q-values for each action.torch.argmax(q_values).item()
: Chooses the action with the highest Q-value, representing the agent’s best estimated move.
2. optimize_model()
- This function trains the policy network by sampling from the replay memory, calculating loss, and performing backpropagation.
if len(memory) < batch_size
: Returns immediately if there aren’t enough experiences in memory to form a full batch.batch = random.sample(memory, batch_size)
: Randomly samples a batch of experiences from memory to reduce temporal correlation and improve training stability.states, actions, rewards, next_states, dones = zip(*batch)
: Unpacks the batch into separate lists for states, actions, rewards, next states, and done flags.- Tensor conversion: Each list is converted into a tensor (
states_tensor
,actions_tensor
, etc.) for PyTorch compatibility. - Current and Target Q-Values Calculation:
current_q_values = policy_net(states_tensor).gather(1, actions_tensor.unsqueeze(1)).squeeze()
: Computes Q-values for the actions taken in each state.gather
retrieves Q-values specifically for the actions chosen in memory.max_next_q_values = target_net(next_states_tensor).max(1)[0]
: Computes maximum Q-values for the next states using the target network.target_q_values = rewards_tensor + gamma * max_next_q_values * (1 - dones_tensor)
: Calculates target Q-values based on the Bellman equation. The reward is added to the discounted maximum future Q-value, adjusted if the episode is done (i.e.,dones_tensor
is 1).
- Backpropagation and Optimization:
loss = loss_fn(current_q_values, target_q_values)
: Computes the mean squared error loss between current and target Q-values.optimizer.zero_grad()
: Resets the gradients to prevent accumulation from previous updates.loss.backward()
: Backpropagates the loss to compute gradients.optimizer.step()
: Updates the policy network weights based on computed gradients.
These functions allow the agent to improve its policy by choosing actions that maximize Q-values and using experience replay to generalize learning.
3.5 Step 5: Training Loop
The training loop iterates over episodes. The agent takes an action, receives a reward, and updates the network to improve future actions.
for episode in range(num_episodes):
state = (0, 0)
for step in range(max_steps_per_episode):
action = choose_action(state)
new_state = get_new_position(state, action)
reward = get_reward(new_state)
done = new_state == goal_position
memory.append((state, action, reward, new_state, done))
optimize_model()
state = new_state
if done:
break
if episode % 10 == 0:
target_net.load_state_dict(policy_net.state_dict())
epsilon = max(epsilon_min, epsilon * epsilon_decay)
This segment defines the main training loop for the DQN agent. In each episode, the agent starts at an initial position and explores the environment while updating its policy and target networks. Let’s break down each part:
3.5.1. Outer Loop – Episode Iteration
for episode in range(num_episodes)
: Loops over a set number of episodes. Each episode represents a full attempt for the agent to reach the goal from the starting position(0, 0)
.
3.5.2. Initialize State
state = (0, 0)
: At the start of each episode, the agent begins at the initial position(0, 0)
on the grid.
3.5.3. Inner Loop – Step Iteration within an Episode
for step in range(max_steps_per_episode)
: Controls the maximum steps allowed per episode. This prevents the agent from taking excessive steps and encourages efficient solutions.- Action Selection and Transition:
action = choose_action(state)
: Chooses an action based on the current state and epsilon-greedy strategy (explore vs. exploit).new_state = get_new_position(state, action)
: Calculates the agent’s next position based on the chosen action.reward = get_reward(new_state)
: Retrieves the reward for the new position, which guides the agent’s learning.done = new_state == goal_position
: Checks if the agent has reached the goal, ending the episode.
- Store Experience and Train:
memory.append((state, action, reward, new_state, done))
: Adds the current experience (state, action, reward, new_state, done) to the replay memory.optimize_model()
: Calls the function to train the policy network using a batch of experiences sampled from memory.
- Update State:
state = new_state
: Updates the agent’s state to the new position for the next step.
- Episode Completion:
if done: break
: If the agent reaches the goal (done
isTrue
), the inner loop breaks, and the episode ends.
3.5.4. Target Network Update
if episode % 10 == 0
: Every 10 episodes, the weights of thepolicy_net
are copied to thetarget_net
, stabilizing training by making the target network less frequently updated.
3.5.5. Epsilon Decay
epsilon = max(epsilon_min, epsilon * epsilon_decay)
: After each episode, epsilon is decayed by multiplying it byepsilon_decay
, gradually reducing exploration in favor of exploitation. Themax
function ensures epsilon doesn’t fall below a minimum threshold (epsilon_min
), maintaining some exploration throughout training.
This loop enables the agent to iteratively improve its Q-values through experience, balancing exploration with exploitation, and refining its policy to reach the goal efficiently.
Complete Code
# python -m venv venv
# venv/Scripts/activate
# python.exe -m pip install --upgrade pip
# pip install torch pygame numpy --cache-dir D:/internship/reinforcement_learning/path_finder/.cache
import random
import numpy as np
import pygame
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
# Pygame setup
pygame.init()
width, height = 500, 500
screen = pygame.display.set_mode((width, height))
pygame.display.set_caption("DQN AI Agent")
# Game parameters
grid_size = 5
cell_size = width // grid_size
goal_position = (4, 4)
obstacles = [(1, 1), (2, 2), (3, 3)]
colors = {
"agent": (0, 128, 255),
"goal": (0, 255, 0),
"obstacle": (255, 0, 0),
"grid": (200, 200, 200)
}
# DQN Parameters
num_episodes = 1000
max_steps_per_episode = 100
gamma = 0.99 # Discount factor
epsilon = 1.0 # Exploration rate
epsilon_min = 0.01
epsilon_decay = 0.995
batch_size = 32
memory = deque(maxlen=2000)
# Define the DQN model
class DQN(nn.Module):
def __init__(self, input_dim, output_dim):
super(DQN, self).__init__()
self.fc1 = nn.Linear(input_dim, 64)
self.fc2 = nn.Linear(64, 64)
self.fc3 = nn.Linear(64, output_dim)
def forward(self, x):
x = torch.relu(self.fc1(x))
x = torch.relu(self.fc2(x))
return self.fc3(x)
# Initialize DQN
state_dim = 2 # (x, y) coordinates
action_dim = 4 # up, down, left, right
policy_net = DQN(state_dim, action_dim)
target_net = DQN(state_dim, action_dim)
target_net.load_state_dict(policy_net.state_dict())
optimizer = optim.Adam(policy_net.parameters(), lr=0.001)
loss_fn = nn.MSELoss()
# Helper functions
def get_new_position(position, action):
actions = [(-1, 0), (1, 0), (0, -1), (0, 1)] # up, down, left, right
new_position = (position[0] + actions[action][0],
position[1] + actions[action][1])
if 0 <= new_position[0] < grid_size and 0 <= new_position[1] < grid_size:
return new_position
return position # Stay in place if out of bounds
def get_reward(position):
if position == goal_position:
return 100
elif position in obstacles:
return -100
return -1 # Small penalty per step to encourage faster solutions
def draw_grid():
for x in range(grid_size):
for y in range(grid_size):
rect = pygame.Rect(y * cell_size, x * cell_size, cell_size, cell_size)
pygame.draw.rect(screen, colors["grid"], rect, 1)
if (x, y) == goal_position:
pygame.draw.rect(screen, colors["goal"], rect)
elif (x, y) in obstacles:
pygame.draw.rect(screen, colors["obstacle"], rect)
def choose_action(state):
if random.uniform(0, 1) < epsilon:
return random.choice(range(4)) # Explore action space
else:
with torch.no_grad():
state_tensor = torch.FloatTensor(state).unsqueeze(0)
q_values = policy_net(state_tensor)
return torch.argmax(q_values).item() # Exploit learned values
def optimize_model():
if len(memory) < batch_size:
return
batch = random.sample(memory, batch_size)
states, actions, rewards, next_states, dones = zip(*batch)
states_tensor = torch.FloatTensor(states)
actions_tensor = torch.LongTensor(actions)
rewards_tensor = torch.FloatTensor(rewards)
next_states_tensor = torch.FloatTensor(next_states)
dones_tensor = torch.FloatTensor(dones)
current_q_values = policy_net(states_tensor).gather(1, actions_tensor.unsqueeze(1)).squeeze()
max_next_q_values = target_net(next_states_tensor).max(1)[0]
target_q_values = rewards_tensor + gamma * max_next_q_values * (1 - dones_tensor)
loss = loss_fn(current_q_values, target_q_values)
optimizer.zero_grad()
loss.backward()
optimizer.step()
# Training loop with event handling
for episode in range(num_episodes):
state = (0, 0)
for step in range(max_steps_per_episode):
# Check for pygame events during training
for event in pygame.event.get():
if event.type == pygame.QUIT:
running = False
pygame.quit()
exit() # Ensure the program fully exits if quit is requested
# Render the environment
screen.fill((255, 255, 255))
draw_grid()
agent_rect = pygame.Rect(state[1] * cell_size, state[0] * cell_size, cell_size, cell_size)
pygame.draw.rect(screen, colors["agent"], agent_rect)
pygame.display.flip()
pygame.time.delay(50)
# Choose and take an action
action = choose_action(state)
new_state = get_new_position(state, action)
reward = get_reward(new_state)
done = new_state == goal_position
# Store experience in replay memory
memory.append((state, action, reward, new_state, done))
# Train the DQN
optimize_model()
state = new_state
if done:
break
# Update target network every 10 episodes
if episode % 10 == 0:
target_net.load_state_dict(policy_net.state_dict())
# Decay epsilon for exploration
epsilon = max(epsilon_min, epsilon * epsilon_decay)
# Main event loop for final display
running = True
while running:
for event in pygame.event.get():
if event.type == pygame.QUIT:
running = False
# Display final state or any message after training is complete if desired
pygame.quit()
Conclusion
This code creates a foundational DQN agent that can navigate a simple grid environment. As the agent learns, it becomes better at avoiding obstacles and reaching the goal efficiently. This project demonstrates the integration of PyTorch for DQN and Pygame for visualization, making it an excellent starting point for exploring reinforcement learning in more complex environments.