Q* Illustrated ¶

Minimal Implementation for Lighteval/MATH benchmark¶

¶

Ime 07/2024 ¶

👋🏻Welcom to this tutorial on implementing the Q* framework, a cutting-edge approach designed to enhance multi-step reasoning capabilities in large language models (LLMs) without the need for extensive fine-tuning. This tutorial will guide you through the minimal yet effective adaptation of Q*, demonstrating its significant impact on performance with a focus on the MATH benchmark.¶

Here, we will explore the integration of Q-Learning and A* search to form Q, a methodology that strategically guides the decision-making process of LLMs. Our case study on the MATH dataset reveals an impressive 20% improvement in performance, showcasing Q's potential in real-world applications. This introduction will pave the way for a deeper understanding of each component—Q-Learning, A, and their synergistic combination in Q—to provide a structured path towards mastering this innovative technique¶

image.png

(*) illustration from code_your_own_AI (https://www.youtube.com/watch?v=LCDofAE5gYw).¶

Section 1: Introduction to Q-Learning and Reinforcement Learning ¶

We begin by exploring the fundamentals of Q-Learning and reinforcement learning. This section will cover the basic concepts, algorithms, and their applications, supplemented by sample code snippets and illustrative diagrams to clarify complex ideas. This foundational knowledge will set the stage for understanding how these techniques can be adapted to improve LLMs.¶

Section 2: A* and Monte Carlo Tree Search (MCTS)* ¶

Next, we will introduce the A* search algorithm and extend our discussion to Monte Carlo Tree Search (MCTS), highlighting their roles in planning and decision-making in complex environments. Through examples and visual aids, we'll examine how these methods contribute to enhancing the logical reasoning capabilities of models.¶

Section 3: Implementing Q* with PyTorch ¶

The core of this tutorial focuses on the Q* framework, where we merge the principles of Q-Learning with A* search. We'll walk through a detailed implementation using PyTorch, providing practical insights into integrating this framework with LLMs. This section will include step-by-step coding tutorials, aimed at both beginners and experienced practitione¶

image.png

Figure from official paper (https://arxiv.org/pdf/2406.14283)¶

.

Section 4: Case Study - Testing on the 🤗 lighteval/MATH Dataset ¶

To validathis simplest implementationch, we'll apply tproposed he Q* framework to the MATH dataset and analyze the performance improvements. This case study will not only demonstrate the practical effectiveness of Q* but also provide benchmarks and comparisons to show the advancement over traditional methods.¶

By the end of this tutorial, I hope that you will have a thorough understanding of how to implement and apply the Q* framework to complex reasoning tasks, enhancing the capabilities of LLMs with strategic planning and decision-making tools. Join us as we explore these advanced techniques that are set to redefine the boundaries of what LLMs can achieve.¶

Q Learning ¶

Q-learning is a model-free reinforcement learning algorithm used to find the optimal action-selection policy for any given finite Markov decision process. It functions by learning an action-value function that ultimately gives the expected utility of taking a given action in a given state and following the optimal policy thereafter. Here’s how Q-learning works in a simplified form:¶

1. Initialize the Q-values arbitrarily for all state-action pairs.¶

2. Observe the current state.¶

3. Select an action based on the current Q-values (typically using an epsilon-greedy strategy to balance exploration and exploitation).¶

4. Take the action, observe the reward and the new state.¶

5. Update the Q-value for the state-action pair based on the reward received and the maximum Q-value of the next state.¶

6. Repeat the process until the Q-values converge.¶

Pseudo code¶

In [ ]:
Initialize Q(s, a) arbitrarily for all s, a
Repeat (for each episode):
    Initialize state s
    Repeat (for each step of episode):
        Choose action a from s using policy derived from Q (e.g., ε-greedy)
        Take action a, observe reward r, and new state s'
        Q(s, a) <- Q(s, a) + α * (r + γ * max_a' Q(s', a') - Q(s, a))
        s <- s'
    until s is terminal

Let's code a simple implementation to illustrate it¶

Our Environement will be a grill with a givent start cell, goal cell aand some obstacles celles¶

We must find the best path from start to goal¶

In [178]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import colors
from matplotlib.animation import FuncAnimation
from IPython.display import HTML
from collections import deque
import random
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
In [179]:
import numpy as np
import matplotlib.pyplot as plt
import random
import copy
from tqdm import tqdm

class QLearningAgent:
    def __init__(self, states, actions, alpha=0.1, gamma=0.99, epsilon=0.1):
        self.states = states          # Liste ou ensemble des états possibles
        self.actions = actions        # Liste ou ensemble des actions possibles
        self.alpha = alpha            # Taux d'apprentissage
        self.gamma = gamma            # Facteur de décote
        self.epsilon = epsilon        # Probabilité d'exploration (politique epsilon-greedy)
        self.Q = {}                   # Table des valeurs Q initialisée à 0
        
        # Initialisation de la table Q
        for state in states:
            self.Q[state] = {}
            for action in actions:
                self.Q[state][action] = 0.0

    def act(self, state):
        """ Choix d'une action selon une politique epsilon-greedy """
        if random.uniform(0, 1) < self.epsilon:
            return random.choice(self.actions)  # Exploration: choix aléatoire
        else:
            # Exploitation: choisir l'action avec la valeur Q maximale pour l'état actuel
            q_values = self.Q[state]
            max_value = max(q_values.values())
            # En cas de plusieurs actions ayant la même valeur maximale, choisir aléatoirement parmi elles
            return random.choice([k for k, v in q_values.items() if v == max_value])

    def update_q_value(self, state, action, reward, next_state):
        """ Met à jour la table Q en utilisant l'équation de Bellman """
        current_q = self.Q[state][action]
        # Calcul de la valeur Q max pour le nouvel état
        max_next_q = max(self.Q[next_state].values())
        # Mise à jour de la valeur Q
        self.Q[state][action] = current_q + self.alpha * (reward + self.gamma * max_next_q - current_q)

    def train(self, env, episodes):
        """ Entraînement de l'agent sur un environnement spécifié """
        for episode in tqdm(range(episodes)):
            state = env.reset()  # Réinitialisation de l'environnement
            while True:
                action = self.act(state)
                next_state, reward, done, _ = env.step(action)  # Exécute une action
                self.update_q_value(state, action, reward, next_state)
                state = next_state
                if done:
                    break

Explanations¶

Initialization: The agent initializes the Q table with zero values ​​for each state-action pair.¶
Action choice: An action is chosen according to an epsilon-greedy policy which allows both exploration (random choice) and exploitation (choice of the action with the maximum Q value).¶
Q update: After each action, the Q table is updated according to the Bellman equation, taking into account the reward obtained and the maximum Q value of the new state.¶
Training: The agent trains by repeating episodes, where each episode corresponds to a sequence of states, actions and rewards, ending when a terminal state is reached.¶

NB: For this agent to work, you must define env, which is the environment in which the agent acts. The environment should have reset() methods to reset the initial state, and step(action) which should return the new state, the reward, a boolean indicating whether the state is terminal, and additional information. ¶

Test env¶

Grid NxP cells, start on position (0,0) and goal on (N-1,P-1).¶

Actions: "up", "down", "left", "right".¶

Reward: +100 if goral is reached, -100 if obstacle, -1 for all other cases.¶

In [182]:
class GridEnvironment:
    def __init__(self, N, P , diff=2):
        self.grid_lines = N
        self.grid_cols = P
        self.obstacles = self.generate_couples(N, P,diff)
        self.goal = (N-1, P-1)
        self.state = (0, 0)
        self.actions = [0,1,2,3] # ["up", "down", "left", "right"]

    def generate_couples(self, N, P,diff):
        num_couples = N // diff
        couples = [(random.randint(1, N-1), random.randint(1, P-1)) for _ in range(num_couples)]
        return couples


    def reset(self):
        self.state = (0, 0)
        return self.state

    def step(self, action):
        ori=copy.copy(self.state)
        x, y = self.state
        if action == 0: # "up":
            x = max(0, x - 1)
        elif action == 1: # "down":
            x = min(self.grid_lines - 1, x + 1)
        elif action == 2: # "left":
            y = max(0, y - 1)
        elif action == 3: # "right":
            y = min(self.grid_cols - 1, y + 1)

        new_state = (x, y)
        obs=False
        if new_state in self.obstacles:
            reward = -100
            done = False
            obs=True
        elif new_state == self.goal:
            reward = 100
            done = True
        else:
            reward = -1
            done = False
        if not obs:
            self.state = new_state
        return new_state, reward, done, {}
In [183]:
# Création de l'environnement
N=10
P=10
env = GridEnvironment(N,P)

# Exemple d'utilisation avec un agent Q-learning
states = [(x, y) for x in range(env.grid_lines) for y in range(env.grid_cols)]
actions = env.actions

agent = QLearningAgent(states, actions)
#agent.train(env, 1000)

Grid Actions initial states¶

In [184]:
def draw_text(ax, start, action):
    directions_name = {
        0: "up",    # Up
        1: "down",  # Down
        2: "left",  # Left
        3: "right"  # Right
    }
    directions = {
        0: (0, -0.4),   # Up
        1: (0, 0.4),  # Down
        2: (-0.4, 0),  # Left
        3: (0.4, 0)    # Right
    }
    dx, dy = directions[action]
    action_text = str(start[0]) + "," + str(start[1]) #action
    ax.text(start[1]+0.5, start[0]+0.5, action_text, ha='center', va='center', color='black', fontsize=8)
    ax.text(start[1]+0.5, start[0]+0.8, directions_name[action], ha='center', va='center', color='red', fontsize=8)
    ax.arrow(start[1]+0.5, start[0]+0.5, dx, dy, head_width=0.2, head_length=0.2, color='b')

def draw_grid_with_texts(agent, grid_lines, grid_cols):
    fig, ax = plt.subplots()
    ax.set_xlim(-0.5, grid_cols + 0.5)
    ax.set_ylim(grid_lines + 0.5, -0.5)  # Inverser l'axe des y
    ax.set_xticks(range(grid_cols+1))
    ax.set_yticks(range(grid_lines+1))
    ax.grid(which='both')

    cmap = colors.ListedColormap(['white', 'blue', 'red', 'green'])
    grid = np.zeros((grid_lines, grid_cols))
    plot = ax.imshow(grid, cmap=cmap, interpolation='none')
    
    for x in range(grid_lines):
        for y in range(grid_cols):
            action = agent.act((x, y))
            draw_text(ax, (x, y), action)
    
    plt.show()
In [185]:
draw_grid_with_texts(agent, N, P)
No description has been provided for this image

Animation¶

In [186]:
def update(frame, env, agent, plot, text):
    if frame == 0 :
        env.reset()  # Reset environment at the start of the animation
    state = env.state
    action = agent.act(state)
    next_state, reward, done, _ = env.step(action)
    
    grid = np.zeros((env.grid_lines, env.grid_cols))
    for obs in env.obstacles:
        grid[obs] = 2
    grid[env.goal] = 3
    grid[next_state] = 1  # Mark agent's position
    
    cmap = colors.ListedColormap(['white', 'blue', 'red', 'green'])  # Normal, agent, obstacles, goal
    norm = colors.BoundaryNorm([0, 1, 2, 3, 4], cmap.N)

    plot.set_data(grid)
    plot.set_cmap(cmap)
    plot.set_norm(norm)
    text.set_text(f'Frame: {frame+1} - State: {next_state} - Action: {action} - Reward: {reward}')
    
    if done or frame == 49:  # Stop after 50 frames or if done
        ani.event_source.stop()
    return [plot, text]
In [187]:
# Test and animate the agent
env.reset()
fig, ax = plt.subplots()
ax.set_xlim(-0.5, P + 0.5)
ax.set_ylim(N + 0.5, -0.5)  # Inverser l'axe des y
ax.set_xticks(range(N+1))
ax.set_yticks(range(P+1))
ax.grid(which='both')

plot = ax.imshow(np.zeros((env.grid_lines, env.grid_cols)), cmap='viridis', interpolation='none',extent=(0, P , N, 0 ))
text = ax.text(0.5, -0.1, '', transform=ax.transAxes, ha='center')

ani = FuncAnimation(fig, update, frames=50, fargs=(env, agent, plot, text), blit=True, repeat=False)

# Afficher l'animation dans Jupyter Lab
HTML(ani.to_jshtml())
Out[187]:
No description has been provided for this image
No description has been provided for this image
In [188]:
fig, ax = plt.subplots()
ax.set_xlim(-0.5, P + 0.5)
ax.set_ylim(N + 0.5, -0.5)  # Inverser l'axe des y
ax.set_xticks(range(N+1))
ax.set_yticks(range(P+1))
ax.grid(which='both')

grid = np.zeros((env.grid_lines, env.grid_cols))
for obs in env.obstacles:
    grid[obs] = 2
grid[env.goal] = 3
grid[(0,0)] = 1  # Mark agent's position

cmap = colors.ListedColormap(['white', 'blue', 'red', 'green'])  # Normal, agent, obstacles, goal
norm = colors.BoundaryNorm([0, 1, 2, 3, 4], cmap.N)

plot = ax.imshow(grid, cmap=cmap, norm=norm, interpolation='none',extent=(0, P , N, 0 ))

plt.show()
No description has been provided for this image
In [192]:
agent.train(env, 1000)
100%|██████████| 1000/1000 [00:00<00:00, 17496.83it/s]
In [193]:
# Test and animate the agent
env.reset()
fig, ax = plt.subplots()
ax.set_xlim(-0.5, P + 0.5)
ax.set_ylim(N + 0.5, -0.5)  # Inverser l'axe des y
ax.set_xticks(range(N+1))
ax.set_yticks(range(P+1))
ax.grid(which='both')

plot = ax.imshow(np.zeros((env.grid_lines, env.grid_cols)), cmap='viridis', interpolation='none',extent=(0, P , N, 0 ))
text = ax.text(0.5, -0.1, '', transform=ax.transAxes, ha='center')

ani = FuncAnimation(fig, update, frames=50, fargs=(env, agent, plot, text), blit=True, repeat=False)

# Afficher l'animation dans Jupyter Lab
HTML(ani.to_jshtml())
Out[193]:
No description has been provided for this image
No description has been provided for this image
In [194]:
draw_grid_with_texts(agent, N, P)
No description has been provided for this image

Deep reinforcement learning ¶

Using neural networks to estimate the Q table¶

Allows you to manage large or continuous state and action spaces, where the traditional Q table would become too large or inefficient¶

Neural network architectures ¶

Fully Connected Networks: Simple and widely used for medium-sized state spaces.¶

Convolutional Neural Networks (CNN): Used primarily in environments where the input data is images, such as in video games.¶

Recurrent Neural Networks (RNN): Suitable for environments where past states are important for predicting the future, such as in games with time sequence elements.¶

Dueling Networks: Architecture where the network is split into two paths to separately learn the value of states and the benefit of actions, then combining to form an estimate of Q.¶

Deep Q-Network (DQN): Introduces concepts like experiment replay and target network to improve learning stability.¶

Adapt the code to use a neural network

To use a neural network to estimate the Q table, here are the key adaptations to make:¶

Replace the Q table with a neural network: The network must take the state as input and produce a Q value for each action.¶

Manage transitions: Store experiences (state, action, reward, new state) in a replay memory for network training.¶

Batch Training: Use mini batches of experiments to train the network to minimize the loss function, often calculated as the squared error between the predicted Q value and the updated Q target.¶

Here is a basic example for a DQN:¶

In [195]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
from collections import deque
from tqdm import tqdm
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from IPython.display import HTML
from matplotlib.colors import ListedColormap
import copy
from matplotlib import colors
In [196]:
class GridEnvironment:
    def __init__(self, N, P , diff=2):
        self.grid_lines = N
        self.grid_cols = P
        self.obstacles = self.generate_couples(N, P,diff)
        self.goal = (N-1, P-1)
        self.state = (0, 0)
        self.actions = [0,1,2,3] # ["up", "down", "left", "right"]

    def generate_couples(self, N, P,diff):
        num_couples = N // diff
        couples = [(random.randint(1, N-1), random.randint(1, P-1)) for _ in range(num_couples)]
        return couples


    def reset(self):
        self.state = (0, 0)
        return self.state

    def step(self, action):
        x, y = self.state
        if action == 0 and x > 0:  # Up
            x -= 1
        elif action == 1 and x < self.grid_lines - 1:  # Down
            x += 1
        elif action == 2 and y > 0:  # Left
            y -= 1
        elif action == 3 and y < self.grid_cols - 1:  # Right
            y += 1
        else:
            #print(f"Invalid action: {action}")
            pass
        self.state = (x, y)
        reward = -1  # Placeholder reward
        if (x, y) in self.obstacles:
            reward = -10  # Penalty for hitting an obstacle
        elif (x, y) == self.goal:
            reward = 10  # Reward for reaching the goal
        done = (x, y) == self.goal  # Episode ends when goal is reached
        #print(f"Action: {action}, New State: {self.state}, Reward: {reward}, Done: {done}")
        return self.state, reward, done, {}
In [197]:
class DQNNetwork(nn.Module):
    def __init__(self, state_size, action_size):
        super(DQNNetwork, self).__init__()
        self.dense1 = nn.Linear(state_size, 32)
        self.relu1 = nn.ReLU()
        self.dense2 = nn.Linear(32, 32)
        self.relu2 = nn.ReLU()
        self.output = nn.Linear(32, action_size)

    def forward(self, x):
        x = self.relu1(self.dense1(x))
        x = self.relu2(self.dense2(x))
        x = self.output(x)
        return x
In [198]:
class DQNAgent:
    def __init__(self, grid_lines, grid_cols, action_size):
        self.state_size = 2  # State is now (x, y)
        self.grid_lines = grid_lines
        self.grid_cols = grid_cols
        self.action_size = action_size
        self.memory = deque(maxlen=2000)
        self.gamma = 0.95
        self.epsilon = 1.2
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.learning_rate = 0.001
        self.model = DQNNetwork(self.state_size, action_size)
        self.optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        state = torch.FloatTensor(state).unsqueeze(0)
        act_values = self.model(state)
        return act_values.max(1)[1].item()

    def replay(self, batch_size):
        if len(self.memory) < batch_size:
            return
        minibatch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state, done in minibatch:
            state = torch.FloatTensor(state).unsqueeze(0)
            next_state = torch.FloatTensor(next_state).unsqueeze(0)

            current_q_values = self.model(state)
            next_q_values = self.model(next_state).detach()

            max_next_q = torch.max(next_q_values, 1)[0]
            target_q_value = reward + (self.gamma * max_next_q * (1 - int(done)))

            target_q_values = current_q_values.clone()
            target_q_values[0, action] = target_q_value

            loss = torch.nn.functional.mse_loss(current_q_values, target_q_values)
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()

        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def train(self, env, episodes, batch_size):
        for episode in tqdm(range(episodes)):
            state = env.reset()
            done = False
            total_reward = 0
            trys=0
            while not done and trys < 100 :
                trys+=1
                action = self.act(state)
                next_state, reward, done, _ = env.step(action)
                self.remember(state, action, reward, next_state, done)
                state = next_state
                total_reward += reward
                if len(self.memory) > batch_size:
                    self.replay(batch_size)
                #print(f"State: {state}, Action: {action}, Reward: {reward}, Next state: {next_state}, Done: {done}")
            #print(f"Episode {episode + 1}/{episodes}, Reward: {total_reward}, Epsilon: {self.epsilon:.2f}")
In [199]:
grid_lines = 10
grid_cols = 10
action_size = 4
agent = DQNAgent(grid_lines, grid_cols, action_size)

# Entrainement avec l'environnement dummy
env = GridEnvironment(grid_lines, grid_cols)
N=10
P=10

Display initial state¶

In [200]:
draw_grid_with_texts(agent, N, P)
No description has been provided for this image
In [201]:
# Test and animate the agent
env.reset()
fig, ax = plt.subplots()
ax.set_xlim(-0.5, P + 0.5)
ax.set_ylim(N + 0.5, -0.5)  # Inverser l'axe des y
ax.set_xticks(range(N+1))
ax.set_yticks(range(P+1))
ax.grid(which='both')

plot = ax.imshow(np.zeros((env.grid_lines, env.grid_cols)), cmap='viridis', interpolation='none',extent=(0, P , N, 0 ))
text = ax.text(0.5, -0.1, '', transform=ax.transAxes, ha='center')

ani = FuncAnimation(fig, update, frames=50, fargs=(env, agent, plot, text), blit=True, repeat=False)

# Afficher l'animation dans Jupyter Lab
HTML(ani.to_jshtml())
Out[201]:
No description has been provided for this image
No description has been provided for this image
In [202]:
# Entraînement de l'agent
episodes = 100
batch_size = 32
agent.train(env, episodes, batch_size)
100%|██████████| 100/100 [02:18<00:00,  1.39s/it]
In [203]:
agent.epsilon=0
draw_grid_with_texts(agent, 10, 10)
No description has been provided for this image
In [204]:
# Test and animate the agent
env.reset()
fig, ax = plt.subplots()
ax.set_xlim(-0.5, P + 0.5)
ax.set_ylim(N + 0.5, -0.5)  # Inverser l'axe des y
ax.set_xticks(range(N+1))
ax.set_yticks(range(P+1))
ax.grid(which='both')

plot = ax.imshow(np.zeros((env.grid_lines, env.grid_cols)), cmap='viridis', interpolation='none',extent=(0, P , N, 0 ))
text = ax.text(0.5, -0.1, '', transform=ax.transAxes, ha='center')

ani = FuncAnimation(fig, update, frames=50, fargs=(env, agent, plot, text), blit=True, repeat=False)

# Afficher l'animation dans Jupyter Lab
HTML(ani.to_jshtml())
Out[204]:
No description has been provided for this image
No description has been provided for this image

A* ¶

image.png

image.png

image.png

Breadth First Search ¶

In [210]:
import random
import copy
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.colors as colors
from matplotlib.animation import FuncAnimation
from matplotlib import animation
from IPython.display import HTML
from queue import Queue
In [211]:
import random

class GridEnvironment:
    def __init__(self, N, P, diff=2):
        self.grid_lines = N
        self.grid_cols = P
        self.obstacles = self.generate_couples(N, P, diff)
        self.goal = (N-1, P-1)
        self.start = (0, 0)
        self.actions = [0, 1, 2, 3, 4]  # ["unvisited", "start", "frontier", "visited", "goal"]
        self.cells = self.initialize_cells()
    
    def generate_couples(self, N, P, diff):
        num_couples = diff
        couples = []
        while len(couples) < num_couples:
            couple = (random.randint(0, N-1), random.randint(0, P-1))
            if couple not in couples and couple != (0, 0) and couple != (N-1, P-1):
                couples.append(couple)
        return couples
    
    def initialize_cells(self):
        cells = {}
        for i in range(self.grid_lines):
            for j in range(self.grid_cols):
                if (i, j) in self.obstacles:
                    cells[(i, j)] = {'state': 'obstacle', 'cost': float('inf')}
                elif (i, j) == self.start:
                    cells[(i, j)] = {'state': 'start', 'cost': 0}
                elif (i, j) == self.goal:
                    cells[(i, j)] = {'state': 'goal', 'cost': 0}
                else:
                    cells[(i, j)] = {'state': 'unvisited', 'cost': 1}
        return cells
    
    def reset(self):
        self.cells = self.initialize_cells()
        self.start = (0, 0)
        return self.cells
    
    def print_grid(self):
        for i in range(self.grid_lines):
            for j in range(self.grid_cols):
                cell = self.cells[(i, j)]
                state = cell['state'][0].upper()  # First letter of state
                print(state, end=" ")
            print()  # Newline after each row
        print("_____________________________________")
        print("")
        for i in range(self.grid_lines):
            for j in range(self.grid_cols):
                cell = self.cells[(i, j)]
                state = cell['cost']  # First letter of state
                print(str(state)+"\t", end=" ")
            print()  # Newline after each row
In [212]:
# Example usage:
grid_env = GridEnvironment(5, 5, 3)
grid_env.print_grid()
S U U U U 
U U O U U 
U U U U U 
U U O U U 
U U U O G 
_____________________________________

0	 1	 1	 1	 1	 
1	 1	 inf	 1	 1	 
1	 1	 1	 1	 1	 
1	 1	 inf	 1	 1	 
1	 1	 1	 inf	 0	 
In [213]:
# Création de l'environnement
N = 10
P = 10
env = GridEnvironment(N, P)
In [214]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.colors as colors
from matplotlib.animation import FuncAnimation
from IPython.display import HTML
from queue import Queue

# Breadth First Search (BFS) Implementation
def bfs(env):
    start = env.start
    frontier = Queue()
    frontier.put(start)
    reached = set()
    reached.add(start)
    steps = []

    while not frontier.empty():
        current_step = {}
        current = frontier.get()
        current_step[current] = 'visited'
        
        if current == env.goal:
            steps.append(current_step)
            break

        # Get possible next states
        neighbors = get_neighbors(current, env)
        
        for next in neighbors:
            if next not in reached and next not in env.obstacles:
                frontier.put(next)
                reached.add(next)
                current_step[next] = 'frontier'
        
        steps.append(current_step)
    
    return steps

# Function to get neighbors of a cell
def get_neighbors(cell, env):
    x, y = cell
    neighbors = []
    if x > 0:
        neighbors.append((x-1, y))
    if x < env.grid_lines - 1:
        neighbors.append((x+1, y))
    if y > 0:
        neighbors.append((x, y-1))
    if y < env.grid_cols - 1:
        neighbors.append((x, y+1))
    return neighbors

# Animation update function
def update(frame, env, steps, plot, text):
    step = steps[frame]
    
    for i in range(env.grid_lines):
        for j in range(env.grid_cols):
            if (i, j) in step:
                env.cells[(i, j)]['state'] = step[(i, j)]

    grid = np.zeros((env.grid_lines, env.grid_cols))
    
    for i in range(env.grid_lines):
        for j in range(env.grid_cols):
            cell_state = env.cells[(i, j)]['state']
            if cell_state == 'unvisited':
                grid[i, j] = 0
            elif cell_state == 'start':
                grid[i, j] = 1
            elif cell_state == 'frontier':
                grid[i, j] = 2
            elif cell_state == 'visited':
                grid[i, j] = 3
            elif cell_state == 'goal':
                grid[i, j] = 4
            elif cell_state == 'obstacle':
                grid[i, j] = 5

    cmap = colors.ListedColormap(['white', 'blue', 'orange', 'red', 'green', 'black'])  # Normal, start, frontier, visited, goal, obstacles
    norm = colors.BoundaryNorm([0, 1, 2, 3, 4, 5, 6], cmap.N)

    plot.set_data(grid)
    plot.set_cmap(cmap)
    plot.set_norm(norm)
    text.set_text(f'Frame: {frame+1}')
    
    return [plot, text]

# Example usage:
env = GridEnvironment(10, 10, 3)
steps = bfs(env)

# Setup for animation
fig, ax = plt.subplots()
ax.set_xlim(-0.5, P + 0.5)
ax.set_ylim(N + 0.5, -0.5)  # Inverser l'axe des y
ax.set_xticks(range(N+1))
ax.set_yticks(range(P+1))
ax.grid(which='both')

'''
plot = ax.imshow(np.zeros((env.grid_lines, env.grid_cols)), cmap='viridis', interpolation='none')
text = ax.text(0.5, -0.1, '', transform=ax.transAxes, ha='center')

ani = FuncAnimation(fig, update, frames=len(steps), fargs=(env, steps, plot, text), blit=True, repeat=False)
'''

plot = ax.imshow(np.zeros((env.grid_lines, env.grid_cols)), cmap='viridis', interpolation='none',extent=(0, P , N, 0 ))
text = ax.text(0.5, -0.1, '', transform=ax.transAxes, ha='center')

ani = FuncAnimation(fig, update, frames=len(steps), fargs=(env, steps, plot, text), blit=True, repeat=False)

# Afficher l'animation dans Jupyter Lab
HTML(ani.to_jshtml())
Out[214]:
No description has been provided for this image
No description has been provided for this image

Early srop¶

image.png

In [215]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.colors as colors
from matplotlib.animation import FuncAnimation
from IPython.display import HTML
from queue import Queue

# Define GridEnvironment class
class GridEnvironment:
    def __init__(self, grid_lines, grid_cols, num_obstacles):
        self.grid_lines = grid_lines
        self.grid_cols = grid_cols
        self.start = (0, 0)
        self.goal = (grid_lines - 2, grid_cols - 5)
        self.obstacles = self.generate_obstacles(num_obstacles)
        self.cells = {(i, j): {'state': 'unvisited'} for i in range(grid_lines) for j in range(grid_cols)}
        self.cells[self.start]['state'] = 'start'
        self.cells[self.goal]['state'] = 'goal'
        for obs in self.obstacles:
            self.cells[obs]['state'] = 'obstacle'
    
    def generate_obstacles(self, num_obstacles):
        obstacles = set()
        while len(obstacles) < num_obstacles:
            x = np.random.randint(0, self.grid_lines)
            y = np.random.randint(0, self.grid_cols)
            if (x, y) != self.start and (x, y) != self.goal:
                obstacles.add((x, y))
        return obstacles

# Breadth First Search (BFS) Implementation
def bfs(env):
    start = env.start
    frontier = Queue()
    frontier.put(start)
    reached = set()
    reached.add(start)
    steps = []

    while not frontier.empty():
        current_step = {}
        current = frontier.get()
        current_step[current] = 'visited'
        
        if current == env.goal:
            steps.append(current_step)
            break

        # Get possible next states
        neighbors = get_neighbors(current, env)
        
        for next in neighbors:
            if next not in reached and next not in env.obstacles:
                frontier.put(next)
                reached.add(next)
                current_step[next] = 'frontier'
        
        steps.append(current_step)
    
    return steps

# Function to get neighbors of a cell
def get_neighbors(cell, env):
    x, y = cell
    neighbors = []
    if x > 0:
        neighbors.append((x-1, y))
    if x < env.grid_lines - 1:
        neighbors.append((x+1, y))
    if y > 0:
        neighbors.append((x, y-1))
    if y < env.grid_cols - 1:
        neighbors.append((x, y+1))
    return neighbors

# Animation update function
def update(frame, env, steps, plot, text):
    step = steps[frame]
    
    for i in range(env.grid_lines):
        for j in range(env.grid_cols):
            if (i, j) in step:
                env.cells[(i, j)]['state'] = step[(i, j)]

    grid = np.zeros((env.grid_lines, env.grid_cols))
    
    for i in range(env.grid_lines):
        for j in range(env.grid_cols):
            cell_state = env.cells[(i, j)]['state']
            if cell_state == 'unvisited':
                grid[i, j] = 0
            elif cell_state == 'start':
                grid[i, j] = 1
            elif cell_state == 'frontier':
                grid[i, j] = 2
            elif cell_state == 'visited':
                grid[i, j] = 3
            elif cell_state == 'goal':
                grid[i, j] = 4
            elif cell_state == 'obstacle':
                grid[i, j] = 5

    cmap = colors.ListedColormap(['white', 'blue', 'orange', 'red', 'green', 'black'])  # Normal, start, frontier, visited, goal, obstacles
    norm = colors.BoundaryNorm([0, 1, 2, 3, 4, 5, 6], cmap.N)

    plot.set_data(grid)
    plot.set_cmap(cmap)
    plot.set_norm(norm)
    text.set_text(f'Frame: {frame+1}')
    
    return [plot, text]

# Example usage:
env = GridEnvironment(10, 10, 15)
steps = bfs(env)


# Setup for animation
fig, ax = plt.subplots()
ax.set_xlim(-0.5, P + 0.5)
ax.set_ylim(N + 0.5, -0.5)  # Inverser l'axe des y
ax.set_xticks(range(N+1))
ax.set_yticks(range(P+1))
ax.grid(which='both')

plot = ax.imshow(np.zeros((env.grid_lines, env.grid_cols)), cmap='viridis', interpolation='none', extent=(0, env.grid_cols, env.grid_lines, 0))
text = ax.text(0.5, -0.1, '', transform=ax.transAxes, ha='center')

ani = FuncAnimation(fig, update, frames=len(steps), fargs=(env, steps, plot, text), blit=True, repeat=False)

# Display the animation in Jupyter Lab
HTML(ani.to_jshtml())
Out[215]:
No description has been provided for this image
No description has been provided for this image

Dijkstra’s Algorithm (or Uniform Cost Search) ¶

In [216]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.colors as colors
from matplotlib.animation import FuncAnimation
from IPython.display import HTML
import heapq

# Define GridEnvironment class
class GridEnvironment:
    def __init__(self, grid_lines, grid_cols, num_obstacles):
        self.grid_lines = grid_lines
        self.grid_cols = grid_cols
        self.start = (0, 0)
        self.goal = (grid_lines - 2, grid_cols - 5)
        self.obstacles = self.generate_obstacles(num_obstacles)
        self.cells = {(i, j): {'state': 'unvisited', 'cost': np.random.randint(1, 10)} for i in range(grid_lines) for j in range(grid_cols)}
        self.cells[self.start]['state'] = 'start'
        self.cells[self.goal]['state'] = 'goal'
        for obs in self.obstacles:
            self.cells[obs]['state'] = 'obstacle'
            self.cells[obs]['cost'] = float('inf')
    
    def generate_obstacles(self, num_obstacles):
        obstacles = set()
        while len(obstacles) < num_obstacles:
            x = np.random.randint(0, self.grid_lines)
            y = np.random.randint(0, self.grid_cols)
            if (x, y) != self.start and (x, y) != self.goal:
                obstacles.add((x, y))
        return obstacles

# Dijkstra's Algorithm (Uniform Cost Search) Implementation
def dijkstra(env):
    start = env.start
    frontier = [(0, start)]
    reached = {start: 0}
    steps = []

    while frontier:
        current_cost, current = heapq.heappop(frontier)
        current_step = {current: 'visited'}
        
        if current == env.goal:
            steps.append(current_step)
            break

        neighbors = get_neighbors(current, env)
        
        for next in neighbors:
            if next not in env.obstacles:
                new_cost = current_cost + env.cells[next]['cost']
                if next not in reached or new_cost < reached[next]:
                    reached[next] = new_cost
                    heapq.heappush(frontier, (new_cost, next))
                    current_step[next] = 'frontier'
        
        steps.append(current_step)
    
    return steps

# Function to get neighbors of a cell
def get_neighbors(cell, env):
    x, y = cell
    neighbors = []
    if x > 0:
        neighbors.append((x-1, y))
    if x < env.grid_lines - 1:
        neighbors.append((x+1, y))
    if y > 0:
        neighbors.append((x, y-1))
    if y < env.grid_cols - 1:
        neighbors.append((x, y+1))
    return neighbors

# Animation update function
def update(frame, env, steps, plot, text):
    step = steps[frame]
    
    for i in range(env.grid_lines):
        for j in range(env.grid_cols):
            if (i, j) in step:
                env.cells[(i, j)]['state'] = step[(i, j)]

    grid = np.zeros((env.grid_lines, env.grid_cols))
    
    for i in range(env.grid_lines):
        for j in range(env.grid_cols):
            cell_state = env.cells[(i, j)]['state']
            if cell_state == 'unvisited':
                grid[i, j] = 0
            elif cell_state == 'start':
                grid[i, j] = 1
            elif cell_state == 'frontier':
                grid[i, j] = 2
            elif cell_state == 'visited':
                grid[i, j] = 3
            elif cell_state == 'goal':
                grid[i, j] = 4
            elif cell_state == 'obstacle':
                grid[i, j] = 5

    cmap = colors.ListedColormap(['white', 'blue', 'orange', 'red', 'green', 'black'])  # Normal, start, frontier, visited, goal, obstacles
    norm = colors.BoundaryNorm([0, 1, 2, 3, 4, 5, 6], cmap.N)

    plot.set_data(grid)
    plot.set_cmap(cmap)
    plot.set_norm(norm)
    text.set_text(f'Frame: {frame+1}')
    
    return [plot, text]

# Example usage:
env = GridEnvironment(10, 10, 15)
steps = dijkstra(env)

# Setup for animation

fig, ax = plt.subplots()
ax.set_xlim(-0.5, P + 0.5)
ax.set_ylim(N + 0.5, -0.5)  # Inverser l'axe des y
ax.set_xticks(range(N+1))
ax.set_yticks(range(P+1))
ax.grid(which='both')


plot = ax.imshow(np.zeros((env.grid_lines, env.grid_cols)), cmap='viridis', interpolation='none', extent=(0, env.grid_cols, env.grid_lines, 0))
text = ax.text(0.5, -0.1, '', transform=ax.transAxes, ha='center')

ani = FuncAnimation(fig, update, frames=len(steps), fargs=(env, steps, plot, text), blit=True, repeat=False)

# Display the animation in Jupyter Lab
HTML(ani.to_jshtml())
Out[216]:
No description has been provided for this image
No description has been provided for this image
In [217]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.colors as colors
from matplotlib.animation import FuncAnimation
from IPython.display import HTML
import heapq

# Define GridEnvironment class
class GridEnvironment:
    def __init__(self, grid_lines, grid_cols, num_obstacles):
        self.grid_lines = grid_lines
        self.grid_cols = grid_cols
        self.start = (0, 0)
        self.goal = (grid_lines - 2, grid_cols - 5)
        self.obstacles = self.generate_obstacles(num_obstacles)
        self.cells = {(i, j): {'state': 'unvisited', 'cost': np.random.randint(1, 10)} for i in range(grid_lines) for j in range(grid_cols)}
        self.cells[self.start]['state'] = 'start'
        self.cells[self.goal]['state'] = 'goal'
        for obs in self.obstacles:
            self.cells[obs]['state'] = 'obstacle'
            self.cells[obs]['cost'] = float('inf')
    
    def generate_obstacles(self, num_obstacles):
        obstacles = set()
        while len(obstacles) < num_obstacles:
            x = np.random.randint(0, self.grid_lines)
            y = np.random.randint(0, self.grid_cols)
            if (x, y) != self.start and (x, y) != self.goal:
                obstacles.add((x, y))
        return obstacles

# Dijkstra's Algorithm (Uniform Cost Search) Implementation
def dijkstra(env):
    start = env.start
    frontier = [(0, start)]
    reached = {start: 0}
    came_from = {start: None}
    steps = []

    while frontier:
        current_cost, current = heapq.heappop(frontier)
        current_step = {current: 'visited'}
        
        if current == env.goal:
            steps.append(current_step)
            break

        neighbors = get_neighbors(current, env)
        
        for next in neighbors:
            if next not in env.obstacles:
                new_cost = current_cost + env.cells[next]['cost']
                if next not in reached or new_cost < reached[next]:
                    reached[next] = new_cost
                    heapq.heappush(frontier, (new_cost, next))
                    current_step[next] = 'frontier'
                    came_from[next] = current
        
        steps.append(current_step)
    
    # Reconstruct path
    path = []
    current = env.goal
    while current is not None:
        path.append(current)
        current = came_from[current]
    path.reverse()

    return steps, path

# Function to get neighbors of a cell
def get_neighbors(cell, env):
    x, y = cell
    neighbors = []
    if x > 0:
        neighbors.append((x-1, y))
    if x < env.grid_lines - 1:
        neighbors.append((x+1, y))
    if y > 0:
        neighbors.append((x, y-1))
    if y < env.grid_cols - 1:
        neighbors.append((x, y+1))
    return neighbors

# Animation update function
def update(frame, env, steps, plot, text, costs, path_line):
    step = steps[frame]
    
    for i in range(env.grid_lines):
        for j in range(env.grid_cols):
            if (i, j) in step:
                env.cells[(i, j)]['state'] = step[(i, j)]

    grid = np.zeros((env.grid_lines, env.grid_cols))
    
    for i in range(env.grid_lines):
        for j in range(env.grid_cols):
            cell_state = env.cells[(i, j)]['state']
            if cell_state == 'unvisited':
                grid[i, j] = 0
            elif cell_state == 'start':
                grid[i, j] = 1
            elif cell_state == 'frontier':
                grid[i, j] = 2
            elif cell_state == 'visited':
                grid[i, j] = 3
            elif cell_state == 'goal':
                grid[i, j] = 4
            elif cell_state == 'obstacle':
                grid[i, j] = 5

    cmap = colors.ListedColormap(['white', 'blue', 'orange', 'red', 'green', 'black'])  # Normal, start, frontier, visited, goal, obstacles
    norm = colors.BoundaryNorm([0, 1, 2, 3, 4, 5, 6], cmap.N)

    plot.set_data(grid)
    plot.set_cmap(cmap)
    plot.set_norm(norm)
    text.set_text(f'Frame: {frame+1}')

    for (i, j), cost_text in costs.items():
        cost_text.set_text(env.cells[(i, j)]['cost'])

    # Draw the path if we reached the goal
    if frame == len(steps) - 1:
        path_coords = np.array(path)
        path_line.set_data(path_coords[:, 1] + 0.5, path_coords[:, 0] + 0.5)
    
    return [plot, text] + list(costs.values()) + [path_line]

# Example usage:
env = GridEnvironment(10, 10, 15)
steps, path = dijkstra(env)

fig, ax = plt.subplots()
ax.set_xlim(-0.5, P + 0.5)
ax.set_ylim(N + 0.5, -0.5)  # Inverser l'axe des y
ax.set_xticks(range(N+1))
ax.set_yticks(range(P+1))
ax.grid(which='both')

plot = ax.imshow(np.zeros((env.grid_lines, env.grid_cols)), cmap='viridis', interpolation='none', extent=(0, env.grid_cols, env.grid_lines, 0))
text = ax.text(0.5, -0.1, '', transform=ax.transAxes, ha='center')

# Display costs
costs = {}
for i in range(env.grid_lines):
    for j in range(env.grid_cols):
        costs[(i, j)] = ax.text(j + 0.5, i + 0.5, env.cells[(i, j)]['cost'], ha='center', va='center', color='black')

# Add path line
path_line, = ax.plot([], [], 'yellow', linewidth=2)

ani = FuncAnimation(fig, update, frames=len(steps), fargs=(env, steps, plot, text, costs, path_line), blit=True, repeat=False)

# Display the animation in Jupyter Lab
HTML(ani.to_jshtml())
Out[217]:
No description has been provided for this image
No description has been provided for this image

Changes Made:¶

Implemented A Algorithm: Modified the BFS/Dijkstra's to A using a priority queue with a heuristic function.¶

Heuristic Function: Added a Manhattan distance heuristic function.¶

Adjusted Animation: Ensured the animation uses the A* steps and reconstructs the path correctly.¶

In [218]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.colors as colors
from matplotlib.animation import FuncAnimation
from IPython.display import HTML
import heapq

# Define GridEnvironment class
class GridEnvironment:
    def __init__(self, grid_lines, grid_cols, num_obstacles):
        self.grid_lines = grid_lines
        self.grid_cols = grid_cols
        self.start = (0, 0)
        self.goal = (grid_lines - 2, grid_cols - 5)
        self.obstacles = self.generate_obstacles(num_obstacles)
        self.cells = {(i, j): {'state': 'unvisited', 'cost': np.random.randint(1, 10)} for i in range(grid_lines) for j in range(grid_cols)}
        self.cells[self.start]['state'] = 'start'
        self.cells[self.goal]['state'] = 'goal'
        for obs in self.obstacles:
            self.cells[obs]['state'] = 'obstacle'
            self.cells[obs]['cost'] = float('inf')
    
    def generate_obstacles(self, num_obstacles):
        obstacles = set()
        while len(obstacles) < num_obstacles:
            x = np.random.randint(0, self.grid_lines)
            y = np.random.randint(0, self.grid_cols)
            if (x, y) != self.start and (x, y) != self.goal:
                obstacles.add((x, y))
        return obstacles

# A* Algorithm Implementation
def a_star(env):
    start = env.start
    goal = env.goal
    frontier = []
    heapq.heappush(frontier, (0, start))
    came_from = {start: None}
    cost_so_far = {start: 0}
    steps = []

    while frontier:
        _, current = heapq.heappop(frontier)
        current_step = {current: 'visited'}

        if current == goal:
            steps.append(current_step)
            break

        neighbors = get_neighbors(current, env)

        for next in neighbors:
            if next not in env.obstacles:
                new_cost = cost_so_far[current] + env.cells[next]['cost']
                if next not in cost_so_far or new_cost < cost_so_far[next]:
                    cost_so_far[next] = new_cost
                    priority = new_cost + heuristic(goal, next)
                    heapq.heappush(frontier, (priority, next))
                    came_from[next] = current
                    current_step[next] = 'frontier'
        
        steps.append(current_step)
    
    # Reconstruct path
    path = []
    current = goal
    while current is not None:
        path.append(current)
        current = came_from[current]
    path.reverse()

    return steps, path

# Heuristic function (Manhattan distance)
def heuristic(a, b):
    (x1, y1) = a
    (x2, y2) = b
    return abs(x1 - x2) + abs(y1 - y2)

# Function to get neighbors of a cell
def get_neighbors(cell, env):
    x, y = cell
    neighbors = []
    if x > 0:
        neighbors.append((x-1, y))
    if x < env.grid_lines - 1:
        neighbors.append((x+1, y))
    if y > 0:
        neighbors.append((x, y-1))
    if y < env.grid_cols - 1:
        neighbors.append((x, y+1))
    return neighbors

# Animation update function
def update(frame, env, steps, plot, text, costs, path_line):
    step = steps[frame]
    
    for i in range(env.grid_lines):
        for j in range(env.grid_cols):
            if (i, j) in step:
                env.cells[(i, j)]['state'] = step[(i, j)]

    grid = np.zeros((env.grid_lines, env.grid_cols))
    
    for i in range(env.grid_lines):
        for j in range(env.grid_cols):
            cell_state = env.cells[(i, j)]['state']
            if cell_state == 'unvisited':
                grid[i, j] = 0
            elif cell_state == 'start':
                grid[i, j] = 1
            elif cell_state == 'frontier':
                grid[i, j] = 2
            elif cell_state == 'visited':
                grid[i, j] = 3
            elif cell_state == 'goal':
                grid[i, j] = 4
            elif cell_state == 'obstacle':
                grid[i, j] = 5

    cmap = colors.ListedColormap(['white', 'blue', 'orange', 'red', 'green', 'black'])  # Normal, start, frontier, visited, goal, obstacles
    norm = colors.BoundaryNorm([0, 1, 2, 3, 4, 5, 6], cmap.N)

    plot.set_data(grid)
    plot.set_cmap(cmap)
    plot.set_norm(norm)
    text.set_text(f'Frame: {frame+1}')

    for (i, j), cost_text in costs.items():
        cost_text.set_text(env.cells[(i, j)]['cost'])

    # Draw the path if we reached the goal
    if frame == len(steps) - 1:
        path_coords = np.array(path)
        path_line.set_data(path_coords[:, 1] + 0.5, path_coords[:, 0] + 0.5)
    
    return [plot, text] + list(costs.values()) + [path_line]

# Example usage:
env = GridEnvironment(10, 10, 15)
steps, path = a_star(env)

# Setup for animation
fig, ax = plt.subplots()
ax.set_xlim(-0.5, P + 0.5)
ax.set_ylim(N + 0.5, -0.5)  # Inverser l'axe des y
ax.set_xticks(range(N+1))
ax.set_yticks(range(P+1))
ax.grid(which='both')

plot = ax.imshow(np.zeros((env.grid_lines, env.grid_cols)), cmap='viridis', interpolation='none', extent=(0, env.grid_cols, env.grid_lines, 0))
text = ax.text(0.5, -0.1, '', transform=ax.transAxes, ha='center')

# Display costs
costs = {}
for i in range(env.grid_lines):
    for j in range(env.grid_cols):
        costs[(i, j)] = ax.text(j + 0.5, i + 0.5, env.cells[(i, j)]['cost'], ha='center', va='center', color='black')

# Add path line
path_line, = ax.plot([], [], 'yellow', linewidth=2)

ani = FuncAnimation(fig, update, frames=len(steps), fargs=(env, steps, plot, text, costs, path_line), blit=True, repeat=False)

# Display the animation in Jupyter Lab
HTML(ani.to_jshtml())
Out[218]:
No description has been provided for this image
No description has been provided for this image

Here we are : Implementing Q* ¶

image.png

Dataset Pipe Line¶

In [219]:
import pandas as pd
from datasets import load_dataset, load_metric
import re
from IPython.display import display, Markdown
import json
import requests
from tqdm import tqdm
import evaluate
from datetime import datetime
In [220]:
# pip install evaluate
In [221]:
accuracy = evaluate.load("accuracy")
In [222]:
print(accuracy.description)
Accuracy is the proportion of correct predictions among the total number of cases processed. It can be computed with:
Accuracy = (TP + TN) / (TP + TN + FP + FN)
 Where:
TP: True positive
TN: True negative
FP: False positive
FN: False negative

In [223]:
# math = evaluate.load("competition_math") pip install git+https://github.com/hendrycks/math.git

Dataset¶

In [224]:
dataset = load_dataset("lighteval/MATH", 'all', split='test[:100]')
df = pd.DataFrame(dataset)
In [225]:
def extract_boxed_answer(answer):
    '''
    Extracts the content within the last \boxed{} in the answer, handling nested braces
    '''
    pattern = re.compile(r'\\boxed{((?:[^{}]|\{(?:[^{}]|\{[^{}]*\})*\})*)}')
    matches = pattern.findall(answer)

    if matches:
        return matches[-1]
    return None
In [226]:
def get_MATH_QA(row_number=None):
    #Load the dataset

    if row_number is None:
        row_number=int(input("Please enter the row number (0-99)"))

    if row_number <0 or row_number >=len(df):
        raise ValueError("Row number must be between 0 and 99")

    selected_row = df.iloc[row_number]

    question = selected_row['problem']
    full_answer=selected_row['solution']
    short_answer = extract_boxed_answer(full_answer)

    return question, full_answer, short_answer
In [227]:
question, full_answer,short_answer = get_MATH_QA() # 14 
In [228]:
prompt = """Giving this problem : {question}

please write agani the final solution, only the final answer in the last line without any extra text"""
s0=prompt.format(question=question)
In [229]:
display(Markdown("question : "+ question))
display(Markdown("full ======= > " +full_answer))
display(Markdown("_____________ > " + short_answer))

question : Kite $ABCD$ (a quadrilateral with two pairs of adjacent equal sides) has coordinates $A\ (0,7),\ B\ (1,0),\ C\ (12,-2),$ and $D\ (7,8).$ What is the area of $ABCD,$ given that the area of a kite is equal to half the product of its diagonals?

[asy] string sp(pair P, string P2){return "$" + P2 + "\,(" + string(P.x) + "," + string(P.y) + ")$";} size(150); defaultpen(fontsize(10)); draw((-5,0)--(15,0),Arrows(4)); draw((0,-5)--(0,10),Arrows(4)); pair A=(0,7),B=(1,0),C=(12,-2),D=(7,8); draw(A--B--C--D--cycle, linewidth(0.7)); draw(A--C, dashed); draw(B--D, dashed); label(sp(A,"A"),A,W); label(sp(B,"B"),B,S); label(sp(C,"C"),C,E); label(sp(D,"D"),D,N); [/asy]

full ======= > As the problem suggests, we need to compute the lengths of the diagonals $\overline{AC}$ and $\overline{BD}$. By the distance formula,

\begin{align*} AC &= \sqrt{(12 -0)^2 + (-2-7)^2} = \sqrt{12^2 + 9^2} = 15\\ BD &= \sqrt{(7-1)^2 + (8-0)^2} = \sqrt{6^2 + 8^2} = 10\\ \end{align*}Thus, the answer is $\frac 12 \cdot 10 \cdot 15 = \boxed{75}$.

As an extra challenge, can you figure out why the area of a kite equals half the product of the lengths of its diagonals?

_____________ > 75

In [230]:
display(Markdown('### $\\boxed{ '+extract_boxed_answer(full_answer)+ '}$'))

$\boxed{ 75}$¶

In [231]:
def llm(messages, temperature=0.2, stream=False):
    headers = {
        "Content-Type": "application/json",
    }
    data = {
        "model": "gemma2:latest",
        "messages": [
    {
      "role": "user",
      "content": messages
    }
  ],
        "stream" : False,
        "options": {
            "seed": 101,
            "temperature": temperature,
            "max_token" : 1500
          }
    }
    api_chat_endpoint="http://localhost:11434/api/chat"         
    response = requests.post(api_chat_endpoint, headers=headers, data=json.dumps(data), stream=stream)
    ret=""
    if response.status_code == 200:
        for line in response.iter_lines():
            if line:
                line = line.decode('utf-8').strip()
                #print("stripped line=", line, "", "")
                try:
                    content = json.loads(line)["message"]["content"]
                    ret+= content
                except Exception as e:
                    print(f"Impossible de décoder la ligne JSON : {line}, erreur: {e}")
                    continue
    else:
        print(f"API Error {response.status_code}: {response.text}")
        raise Exception(f"API Error {response.status_code}: {response.text}")
    return ret
In [232]:
def llm2(user_msg,assistant_msg, temperature=0.9, stream=False):
    headers = {
        "Content-Type": "application/json",
    }
    data = {
        "model": "gemma2:latest",
        "messages": [
    {
      "role": "user",
      "content": user_msg
    },
    {
      "role": "assistant",
      "content": assistant_msg
    },      
  ],
        "stream" : False,
        "options": {
            # "seed": 101,
            "temperature": temperature,
            "max_token" : 1500,
            "stop" : ["\n","\\n","\n\n","\\n\\n"]
          }
    }
    api_chat_endpoint="http://localhost:11434/api/chat"         
    response = requests.post(api_chat_endpoint, headers=headers, data=json.dumps(data), stream=stream)
    ret=""
    if response.status_code == 200:
        for line in response.iter_lines():
            if line:
                line = line.decode('utf-8').strip()
                #print("stripped line=", line, "", "")
                try:
                    content = json.loads(line)["message"]["content"]
                    ret+= content
                except Exception as e:
                    print(f"Impossible de décoder la ligne JSON : {line}, erreur: {e}")
                    continue
    else:
        print(f"API Error {response.status_code}: {response.text}")
        raise Exception(f"API Error {response.status_code}: {response.text}")
    return ret
In [233]:
prompt = """Problem : {question}

please think step by step, separate reasoning steps by double new lines, outpout the final answer in the last line without any extra text"""
In [234]:
prompt=prompt.format(question=question)
print(prompt)
Problem : Kite $ABCD$ (a quadrilateral with two pairs of adjacent equal sides) has coordinates $A\ (0,7),\ B\ (1,0),\ C\ (12,-2),$ and $D\ (7,8).$ What is the area of $ABCD,$ given that the area of a kite is equal to half the product of its diagonals?

[asy]
string sp(pair P, string P2){return "$" + P2 + "\,(" + string(P.x) + "," + string(P.y) + ")$";}
size(150); defaultpen(fontsize(10)); draw((-5,0)--(15,0),Arrows(4)); draw((0,-5)--(0,10),Arrows(4)); pair A=(0,7),B=(1,0),C=(12,-2),D=(7,8); draw(A--B--C--D--cycle, linewidth(0.7)); draw(A--C, dashed); draw(B--D, dashed); label(sp(A,"A"),A,W); label(sp(B,"B"),B,S); label(sp(C,"C"),C,E); label(sp(D,"D"),D,N);
[/asy]

please think step by step, separate reasoning steps by double new lines, outpout the final answer in the last line without any extra text
In [235]:
ret= llm(prompt) # +"\nPlease output the final answer on the last line of your output without any prefix or suffix")
In [236]:
display(Markdown(ret))

1. Find the slopes of the diagonals.

The slope of a line passing through points $(x_1, y_1)$ and $(x_2, y_2)$ is given by:

$(y_2 - y_1) / (x_2 - x_1)$

  • Diagonal AC: Slope = (-2 - 7) / (12 - 0) = -9/12 = -3/4

  • Diagonal BD: Slope = (8 - 0) / (7 - 1) = 8/6 = 4/3

2. Find the equations of the diagonals.

We can use the point-slope form of a linear equation:

y - y1 = m(x - x1), where 'm' is the slope and (x1, y1) is a point on the line.

  • Diagonal AC: Using point A (0, 7) and the slope -3/4: y - 7 = (-3/4)(x - 0) y - 7 = (-3/4)x y = (-3/4)x + 7

  • Diagonal BD: Using point B (1, 0) and the slope 4/3: y - 0 = (4/3)(x - 1) y = (4/3)x - 4/3

3. Find the intersection point of the diagonals.

To find the intersection point, we set the two equations equal to each other and solve for 'x':

(-3/4)x + 7 = (4/3)x - 4/3

Multiply both sides by 12 to get rid of the fractions: -9x + 84 = 16x - 16

Combine like terms: 25x = 100

Solve for 'x': x = 4

Substitute x = 4 back into either equation to find 'y'. Let's use y = (-3/4)x + 7:

y = (-3/4)(4) + 7 y = -3 + 7 y = 4

Therefore, the intersection point of the diagonals is (4, 4).

4. Calculate the lengths of the diagonals.

We can use the distance formula to find the length of each diagonal:

√[(x2 - x1)² + (y2 - y1)²]

  • Diagonal AC: Using points A (0, 7) and C (12, -2): √[(12 - 0)² + (-2 - 7)²] = √(144 + 81) = √225 = 15

  • Diagonal BD: Using points B (1, 0) and D (7, 8): √[(7 - 1)² + (8 - 0)²] = √(36 + 64) = √100 = 10

5. Calculate the area of the kite.

Area of a kite = (1/2) * product of diagonals

Area of ABCD = (1/2) * 15 * 10 = 75

In [237]:
# Fonction d'évaluation
def evaluate_rep(example):
    prompt = example['problem']
    prediction = llm(f"Giving this problem : {prompt}\n\nplease write agani the final solution, only the final answer in the last line without any extra tex").strip().split("\n")[-1]
    return {
        'reference': extract_boxed_answer(example['solution']),
        'prediction': prediction
    }
In [238]:
len(dataset)
Out[238]:
100
In [ ]:
#Fonction d'évaluation
def evaluate_rep(example):
    prompt = example['problem']
    prediction = llm(f"Giving this problem : {prompt}\n\nplease write agani the final solution, only the final answer in the last line without any extra tex").strip().split("\n")[-1]
    return {
        'reference': extract_boxed_answer(example['solution']),
        'prediction': prediction
    }
# Application de l'évaluation sur le dataset
results = []
for example in tqdm(dataset):
    results.append(evaluate_rep(example))

with open("math_eval_01.json", 'w') as file:
    json.dump(results, file, indent=4)
    
In [44]:
for r in results[:10]:
    print(r)
{'reference': '2', 'prediction': '**Answer:** 2'}
{'reference': '10', 'prediction': '10'}
{'reference': '\\dfrac{9}{7}', 'prediction': 'The only solution that works is $x = \\boxed{\\frac{2}{7}}$.'}
{'reference': 'i', 'prediction': 'i'}
{'reference': '4', 'prediction': '**Answer:** 4'}
{'reference': '402', 'prediction': '402'}
{'reference': 'x \\in [-2,7]', 'prediction': '**Solution in interval notation:** [-2, 7]'}
{'reference': '7', 'prediction': '**Answer:** 6'}
{'reference': '4,6,14,15', 'prediction': '4, 6, 12, 18'}
{'reference': '-\\frac{1}{8}', 'prediction': 'We have two possible solutions: x = 3/2 and x = -1/8. The smallest of these is **-1/8**.'}
In [95]:
# Calcul des scores
predictions = [result['prediction'] for result in results]
references = [result['reference'] for result in results]
In [45]:
# Initialisation des listes pour stocker les références et les prédictions nettoyées
references = []
predictions = []

# Fonction pour remplacer tout ce qui est entre deux ** par le contenu entre eux
def clean_prediction(prediction):
    # Remplacer tout ce qui est entre deux ** par le contenu entre eux
    cleaned = re.sub(r'\*\*(.*?)\*\*', r'\1', prediction)
    # Supprimer 'Finale answer'
    cleaned = cleaned.replace('Final answer:', '').strip()

    cleaned = cleaned.replace('Answer:', '').strip()
    
    return cleaned

# Parcourir chaque dictionnaire dans la liste
for item in results:
    # Ajouter la référence à la liste des références
    references.append(item['reference'])
    
    # Nettoyer la prédiction et l'ajouter à la liste des prédictions
    cleaned_prediction = clean_prediction(item['prediction']).strip()
    predictions.append(cleaned_prediction)
In [46]:
# Afficher les listes obtenues
print("References: ____________ ", references)
print("Predictions: ___________ ", predictions)
References: ____________  ['2', '10', '\\dfrac{9}{7}', 'i', '4', '402', 'x \\in [-2,7]', '7', '4,6,14,15', '-\\frac{1}{8}', '\\frac{x+2}{7}', '-15', '10', '8', '75', '\\frac{11}{2}', '-25', '8', '3', '187.5', '18', '\\$40', '5', '8', '3125', '[0,\\infty)', '.5', '12, 10, 6', '5', '16', '2300', '5', '105', '-13.5', '\\frac{243}{625}', '2', '(-\\sqrt{3}, \\sqrt{3})', '23', '49', '2x^9 - 8x^7 + 9x^6 - 16x^5 - 12x^4 + 9x^3 - 24x^2', '(-\\infty,-8)\\cup (8,\\infty)', '0', '2', '16', '\\frac{1}{12}', '6+9i', '2', '20', '7(x+3) (x-3)', 'y^4-2y^3+7y^2+y-5', '4', '0', '12', '\\frac{7}{2}', '69', '5', '\\left(-\\infty,-\\frac 12\\right)\\cup \\left(-\\frac 12,\\infty\\right)', '\\frac{2}{5}', '7', '20', '7', '\\sqrt{x}', '78', '9', '4', '6', '17', '\\left(1,\\frac{9}{2}\\right)', '\\frac{15}{2}', '-2', '8', '20', '24', '\\frac{19}{4}', '5', '-55', '60', '-7', '0.43', '108', '4950', '50', '14', '8', '26', '129', '0', '-5', '2', '4', '30', '161', '1', '5', '3s^2', '125', '8', '286', '(9,11)', '\\dfrac{1}{5}']
Predictions: ___________  ['2', '10', 'The only solution that works is $x = \\boxed{\\frac{2}{7}}$.', 'i', '4', '402', 'Solution in interval notation: [-2, 7]', '6', '4, 6, 12, 18', 'We have two possible solutions: x = 3/2 and x = -1/8. The smallest of these is -1/8.', '* *h⁻¹(x) = (x + 2) / 7*', '```', '10', '8', "Let me know if you'd like me to work through the calculations step-by-step!", '5.5', '-25', "Let me know if you'd like to explore a similar problem with slightly different conditions!", '3', '187.5', '16', '$20.00', '* Therefore, x = 5', '8', '3125', '[0, ∞)', '1/2', '1, 4, 9, 16', '0', 'The largest possible value of b is 16.', '$2300', '5', '67.5', '17.3', 'Calculate (3/5)^7 and then multiply by (125/9). This will give you the eighth term as a fraction.', 'n = 2', '$-\\sqrt{7} < x < \\sqrt{7}$', 'Finally,  $a + b + c = 1 + 2 + \\sqrt{6} = \\boxed{3+\\sqrt{6}}$.', '49', '$$2x^9 - 8x^7 + 9x^6 - 16x^5 - 12x^4 + 9x^3 - 24x^2$$', 'The solution is  $m \\in (-\\infty, -8) \\cup (8, \\infty)$.', 'Therefore, the value of $b$ is $\\boxed{0}$.', '2', 'The bookstore should charge $16 to maximize its revenue.', 'w = 1/12', '6 + 9i', '2', 'Therefore, the simplified form of (2 - 2i)(5 + 5i) is 20.', 'Therefore, the factored expression is: $\\boxed{7(x + 3)(x - 3)}$', '$$y^4 - 2y^3 + 7y^2 + y - 5$$', '6', '*f(x) - f*<sup>-1</sup>(*x*) = *f(x) - f(x)* = 0', '107', '7/2', "Let me know if you'd like me to work through the specific calculations!", '25/4', '(-∞, -1/2) U (-1/2, ∞)', '4/10 = 2/5', '* Final  7', 'Therefore, the simplified form of (3 - i)(6 + 2i) is 20.', '7', 'x^(7/18)', '78', '9', '4', '6', '108', 'Therefore, the midpoint of $\\overline{PQ}$ is $(1, \\frac{9}{2})$.', '7', '-2', '8', '20', '24', 'k = 65/8', '5', '-55', '√[3]{216000} = 60', 'x = -7', '0.71', '108', '4950', "Let me know if you'd like me to work through the detailed solution steps!", 'There is no solution that satisfies the given conditions.', '8 quarts', '26', '360', '0', 'b = -5', 'e = 2', '4', '30', "Let me know if you'd like me to work through the entire calculation!", '1', '5', '$3s^2$', "Let me know if you'd like me to work through the detailed algebraic steps!", '8', '287', 'All four points lie on different lines.', '1/5']
In [49]:
score = evaluate.load('exact_match').compute(predictions=predictions, references=references)

# Affichage des résultats
print(f"Score exact_match sur le dataset lighteval/MATH: {score}")
Score exact_match sur le dataset lighteval/MATH: {'exact_match': 0.36}

NB : 36% ¶

Let's try other metrics¶

In [51]:
score = evaluate.load('competition_math').compute(predictions=predictions, references=references)
print(f"Score exact_match sur le dataset lighteval/MATH: {score}")
Score exact_match sur le dataset lighteval/MATH: {'accuracy': 0.45}
In [110]:
# pip install git+https://github.com/hendrycks/math.git

Evaluate llm without Q*¶

rougeLsum': 0.6176895748414735}¶

Q * alignement¶

image.png

image.png

1st way ¶

image.png

image.png

2nd way ¶

image.png

to plug to :¶

image.png

to plug in¶

image.png

[26]¶

image.png image.png image.png

[27]¶

image.png

3rd way ¶

image.png

In [ ]:
 

image.png

Reward function¶

In [22]:
MAX_STEPS=100
In [23]:
prompt="""Question : {question}

Please think step by step and output each reasoning step in a new line.
You MUST output the very final answer value in the last line without any prefix or extra text, ONLY the answer numerical value.

"""
In [24]:
def step(s0, state, action):
    next_step=llm2(s0, state+'\n'+action)
    return next_step
In [25]:
s0=prompt.format(question=question)
In [26]:
st0=step(s0,"","")
print(st0)
 To find $120\%$ of 30, we need to multiply $30$ by $\frac{120}{100}$, which is equivalent to multiplying by $1.2$.
In [27]:
llm(s0)
Out[27]:
' Step 1: Calculate $120\\%$ of 30\n$120\\%$ of 30 is equal to $(120/100) * 30 = 1.2 * 30 = 36$.\n\nStep 2: Calculate $130\\%$ of 20\n$130\\%$ of 20 is equal to $(130/100) * 20 = 1.3 * 20 = 26$.\n\nStep 3: Find the positive difference between the two values\nThe positive difference between $36$ and $26$ is $36 - 26 = 10$.\n\nFinal answer: 10'
In [28]:
def process_text(text):
    # Convert the text to a list of lines
    lines = text.split('\n')
    # Strip empty lines at the beginning and the end
    stripped_lines = [line for line in lines if line.strip()]
    
    return stripped_lines

# Exemple de texte multi-lignes
llm_traj = """
    This is the first line.

    This is the second line.
    
    This is the third line.
    
"""

# Appel de la fonction
result = process_text(llm_traj)

# Afficher le résultat
print(result)
['    This is the first line.', '    This is the second line.', '    This is the third line.']
In [29]:
def trajectory(s0, state):
    states=[]
    traj=llm(s0+'\n'+state)
    return process_text(traj)      
    
In [30]:
traj1=trajectory(s0,st0)
In [31]:
traj1
Out[31]:
[' So, $120\\%$ of 30 is:',
 '$$30 \\times 1.2 = 36.$$',
 'To find $130\\%$ of 20, we need to multiply $20$ by $\\frac{130}{100}$, which is equivalent to multiplying by $1.3$.',
 'So, $130\\%$ of 20 is:',
 '$$20 \\times 1.3 = 26.$$',
 'The positive difference between these two values is:',
 '$$36 - 26 = 10.$$',
 'Therefore, the final answer is $\\boxed{10}$.']

image.png

In particular, we assign a reward of 1 if the generated code passes all test cases ( for code generation¶

task s) or the final answe matche the ground-truth (for math reasoning task s

image.png image.png

We will use following way :¶

image.png)

Compute Reward(s0, state,action) ¶

In [32]:
def nettoyer_chaine(chaine):
    # Supprimer les lignes vides à la fin
    lignes = chaine.splitlines()
    while lignes and not lignes[-1].strip():
        lignes.pop()
    
    # Conserver uniquement la dernière ligne non vide
    derniere_ligne = lignes[-1] if lignes else ""
    
    # Supprimer les espaces en début et fin de chaîne
    derniere_ligne = derniere_ligne.strip().lower()

    # Expression régulière mise à jour pour capturer le texte après "solution"
    pattern = r"^\**solution\**:* *\**\s*(.*)$"
    match = re.match(pattern, derniere_ligne, re.IGNORECASE)
    if match:
        derniere_ligne=  match.group(1).strip()

    pattern = r"^\**answer\**:* *\**\s*(.*)$"
    match = re.match(pattern, derniere_ligne, re.IGNORECASE)
    if match:
        derniere_ligne=  match.group(1).strip()

    pattern = r"^\**final answer\**:* *\**\s*(.*)$"
    match = re.match(pattern, derniere_ligne, re.IGNORECASE)
    if match:
        derniere_ligne=  match.group(1).strip()

    pattern = r"^\**final solution\**:* *\**\s*(.*)$"
    match = re.match(pattern, derniere_ligne, re.IGNORECASE)
    if match:
        derniere_ligne=  match.group(1).strip()
    
    # Supprimer à nouveau les espaces au cas où
    derniere_ligne = derniere_ligne.strip()
    cl= extract_boxed_answer(derniere_ligne)
    if cl is not None:
        derniere_ligne=cl
        
    return derniere_ligne
In [33]:
metric=evaluate.load('exact_match')
In [34]:
def reward(s0,state,action):
    final_ret=llm(s0+'\n'+'\n'+state)
    final_answer =  nettoyer_chaine(final_ret)
    gt_answer=short_answer
    t,r1 = evaluer_expression_latex(gt_answer)
    if t:
        #gt_answer=str(r1)
        t2,r2 = evaluer_expression_latex(final_answer)
        if t2:
            if r1==r2:
                #print("Rawrd = 1")
                return 1
                
    score = metric.compute(predictions=[final_answer], references=[gt_answer])

    # Affichage des résultats
    #print(f"Score  {final_answer} /   {gt_answer} : {score}")
    return score['exact_match']
In [35]:
import sympy as sp
from latex2sympy2 import latex2sympy

def evaluer_expression_latex(expression):
    try:
        # Convertir l'expression LaTeX en une expression sympy
        expr_sympy = latex2sympy(expression)
        # Évaluer l'expression et comparer à la valeur donnée
        expr_valeur = expr_sympy.evalf()
        return True, expr_valeur
    except Exception as e:
        # print(f"Erreur lors de l'interprétation de l'expression : {e}")
        return False,0
In [25]:
#!pip install latex2sympy2

Rolling out ¶

In [37]:
def roll_out(s0,state,action):
    pool = []
    nb_iterations=1
    for i in range(nb_iterations):
        r1=llm2(s0,state+'\n'+action)
        r2=llm2(s0+'\n'+"\Solution :\n",state+'\n'+action)
        r3=llm2(s0+'\nSteps :\n',state+'\n'+action)
        pool.append(r1)
        pool.append(r2)
        pool.append(r3)
    return pool
In [38]:
pool0=roll_out(s0,"","")
In [39]:
pool0
Out[39]:
[' To find the positive difference between $120\\%$ of 30 and $130\\%$ of 20, we will first calculate each percentage individually and then subtract one from the other.',
 ' First, we need to calculate $120\\%$ of 30:',
 ' Step 1: Calculate $120\\%$ of 30.']

roll out s0 = question¶

contruct trajecory¶

for each element reward (state,action)¶

keep best reward trajectory¶

In [111]:
reward(s0,"",pool0[3])
Out[111]:
1
In [40]:
def remove_duplicates(input_list):
    seen = set()
    return [x for x in input_list if not (x in seen or seen.add(x))]
In [41]:
clean_pool0=remove_duplicates(pool0)
In [42]:
clean_pool0
Out[42]:
[' To find the positive difference between $120\\%$ of 30 and $130\\%$ of 20, we will first calculate each percentage individually and then subtract one from the other.',
 ' First, we need to calculate $120\\%$ of 30:',
 ' Step 1: Calculate $120\\%$ of 30.']
In [43]:
s0
Out[43]:
'Question : What is the positive difference between $120\\%$ of 30 and $130\\%$ of 20?\n\nPlease think step by step and output each reasoning step in a new line.\nYou MUST output the very final answer value in the last line without any prefix or extra text, ONLY the answer numerical value.\n\n'
In [ ]:
qvalues_labels=[]
tot=len(pool0)
i=0
# Obtenir la date et l'heure actuelles
current_time = datetime.now()

# Afficher la date et l'heure actuelles
print("Date et heure actuelles :", current_time)

for p in clean_pool0:
    i+=1
    print(f"{i}/{tot}")
    tr=trajectory(s0,p)
    #print("trajectoire ________ ", tr)
    # for each state action in trajectory pool
    appendtr=""
    for sa in tr :
        # roll out
        sapool= roll_out(s0,p,sa)
        clean_sapool = remove_duplicates(sapool)
        #print("cleaned __________", clean_sapool)
        #compute reward
        best_reward=0
        for action in clean_sapool:
            actual_reward=reward(s0,sa,action)
            if actual_reward >=best_reward:
                best_reward=actual_reward
        #selectes best reward
        # add to qvalues_labels
        qvalues_labels.append(((s0, p+'\n'+appendtr+sa,action),best_reward))
        appendtr+=sa
# Obtenir la date et l'heure actuelles
current_time = datetime.now()
# Afficher la date et l'heure actuelles
print("Date et heure actuelles :", current_time)
In [45]:
# Convertir les tuples en listes pour la compatibilité JSON
data_to_save = [list(item) for item in qvalues_labels]
In [247]:
with open('qvalues_labels.json', 'w') as f:
    json.dump(data_to_save, f)
In [46]:
# Charger les données depuis un fichier JSON
with open('qvalues_labels.json', 'r') as f:
    loaded_data = json.load(f)

# Convertir les listes en tuples
data_reloaded = [tuple(item) for item in loaded_data]
In [ ]:
qvalues_labels=[]

current_time = datetime.now()
# Afficher la date et l'heure actuelles
print("Date et heure actuelles :", current_time)

for i in tqdm(range(len(df))):
    dd=df.iloc[i]
    question = dd['problem']
    full_answer=dd['solution']
    short_answer = extract_boxed_answer(full_answer)
    s0=prompt.format(question=question)
    pool0=roll_out(s0,"","")
    clean_pool0=remove_duplicates(pool0)
    for p in clean_pool0:
        tr=trajectory(s0,p)
        #print("trajectoire ________ ", tr)
        # for each state action in trajectory pool
        appendtr=""
        for sa in tr :
            # roll out
            sapool= roll_out(s0,p,sa)
            clean_sapool = remove_duplicates(sapool)
            #print("cleaned __________", clean_sapool)
            #compute reward
            best_reward=0
            for action in clean_sapool:
                actual_reward=reward(s0,sa,action)
                if actual_reward >=best_reward:
                    best_reward=actual_reward
            #selectes best reward
            # add to qvalues_labels
            qvalues_labels.append(((s0, p+'\n'+appendtr+sa,action),best_reward))
            appendtr+=sa
# Obtenir la date et l'heure actuelles
current_time = datetime.now()
# Afficher la date et l'heure actuelles
print("Date et heure actuelles :", current_time)
data_to_save = [list(item) for item in qvalues_labels]
with open('qvalues_labels.json', 'w') as f:
    json.dump(data_to_save, f)
In [258]:
data_to_save = [list(item) for item in qvalues_labels]
with open('qvalues_labels.json', 'w') as f:
    json.dump(data_to_save, f)

Q Model ¶

In [29]:
len(data_reloaded)
Out[29]:
3805
In [71]:
data_reloaded[0]
Out[71]:
(['Question : How many vertical asymptotes does the graph of $y=\\frac{2}{x^2+x-6}$ have?\n\nPlease think step by step and output each reasoning step in a new line.\nYou MUST output the very final answer value in the last line without any prefix or extra text, ONLY the answer numerical value.\n\n',
  '1. **Vertical asymptotes occur where the denominator of a rational function equals zero.**\n2. **Factor the denominator:**  $x^2 + x - 6 = (x+3)(x-2)$',
  ''],
 1)
In [47]:
EMBEDDING_ENDPOINT = "http://localhost:11434/api/embeddings"
def embeddings(prompt):
    headers = {
        "Content-Type": "application/json",
    }

    data = {
        "model" :  "nomic-embed-text",
        "prompt": prompt
    }
    response = requests.post(EMBEDDING_ENDPOINT, headers=headers, data=json.dumps(data))
    try:
        return response.json()['embedding']
    except Exception as e:
        print("Embedding Error")
    return torch.zeros(768, dtype=torch.float32)
In [48]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
In [49]:
# Définition du modèle
class QNetwork(nn.Module):
    def __init__(self, embedding_dim, hidden_dim):
        super(QNetwork, self).__init__()
        self.fc1 = nn.Linear(embedding_dim * 2, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, 1)
        self.relu = nn.ReLU()

    def forward(self, state_embedding, action_embedding):
        x = torch.cat((state_embedding, action_embedding), dim=-1)
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        q_value = self.fc3(x)
        return q_value
In [53]:
# Paramètres
embedding_dim = 768
hidden_dim = 256
batch_size = 32
num_epochs = 10
learning_rate = 0.001


# Modèle
q_network = QNetwork(embedding_dim, hidden_dim)

# Optimiseur
optimizer = optim.Adam(q_network.parameters(), lr=0.001)

# Fonction de perte
criterion = nn.MSELoss()


# Fonction d'embedding (à remplacer par votre fonction d'embeddings)
def embed_text(text):
    # Exemple de fonction d'embedding retournant un vecteur aléatoire
    return  torch.tensor(embeddings(text))
In [81]:
# Exemple de données
state = "example state text"
action = "example action text"
value = 0.5

# Embedding des données
state_embedding = embed_text(state)
action_embedding = embed_text(action)
In [82]:
# Préparation des données pour l'entraînement
state_embedding = state_embedding.unsqueeze(0)  # Ajouter dimension batch
action_embedding = action_embedding.unsqueeze(0)  # Ajouter dimension batch
value = torch.tensor([value])

# Forward pass
predicted_q_value = q_network(state_embedding, action_embedding)

# Calcul de la perte
loss = criterion(predicted_q_value, value)

# Backward pass et optimisation
optimizer.zero_grad()
loss.backward()
optimizer.step()

print("Predicted Q Value:", predicted_q_value.item())
Predicted Q Value: 0.0895596370100975
/home/imed/anaconda3/envs/ime/lib/python3.11/site-packages/torch/nn/modules/loss.py:535: UserWarning: Using a target size (torch.Size([1])) that is different to the input size (torch.Size([1, 1])). This will likely lead to incorrect results due to broadcasting. Please ensure they have the same size.
  return F.mse_loss(input, target, reduction=self.reduction)
In [54]:
class QDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        state_action, value = self.data[idx]
        state, action = state_action[0], state_action[1]
        state_embedding = embed_text(state)
        action_embedding = embed_text(action)
        return state_embedding, action_embedding, torch.tensor(value, dtype=torch.float32)
In [55]:
# Création du DataLoader
dataset = QDataset(data_reloaded)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
In [56]:
len(dataset)
Out[56]:
3805
In [ ]:
# Boucle d'entraînement
i=0
for epoch in range(num_epochs):
    i+=1
    print(f"{i}/{num_epochs}")
    for state_embedding, action_embedding, value in tqdm(dataloader):
        # Forward pass
        predicted_q_value = q_network(state_embedding, action_embedding).squeeze()

        # Calcul de la perte
        loss = criterion(predicted_q_value, value)

        # Backward pass et optimisation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {loss.item()}')

print("Entraînement terminé.")
In [99]:
num_epochs=10
from tqdm import tqdm
import matplotlib.pyplot as plt
In [102]:
from tqdm import tqdm

# Boucle d'entraînement
loss_history = []
i = 0
for epoch in range(num_epochs):
    i += 1
    print(f"{i}/{num_epochs}")
    epoch_loss = 0
    with tqdm(dataloader, desc=f"Epoch {epoch+1}/{num_epochs}", leave=False) as pbar:
        for state_embedding, action_embedding, value in pbar:
            # Forward pass
            predicted_q_value = q_network(state_embedding, action_embedding).squeeze()

            # Calcul de la perte
            loss = criterion(predicted_q_value, value)

            # Backward pass et optimisation
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            # Ajouter la perte à la perte de l'époque
            epoch_loss += loss.item()

            # Mettre à jour la barre de progression avec la perte actuelle
            pbar.set_postfix(loss=loss.item())

    # Calculer la perte moyenne pour cette époque
    epoch_loss /= len(dataloader)
    loss_history.append(epoch_loss)

    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}')
1/10
Epoch 1/10:  95%|█████████▍| 113/119 [12:01<00:38,  6.38s/it, loss=0.0581]
Embedding Error
/tmp/ipykernel_4010/3163542839.py:22: UserWarning: To copy construct from a tensor, it is recommended to use sourceTensor.clone().detach() or sourceTensor.clone().detach().requires_grad_(True), rather than torch.tensor(sourceTensor).
  return  torch.tensor(embeddings(text))
                                                                          
Epoch 1/10, Loss: 0.1097
2/10
                                                                          
Epoch 2/10, Loss: 0.1114
3/10
Epoch 3/10:  15%|█▌        | 18/119 [01:54<10:43,  6.37s/it, loss=0.0635]
Embedding Error
                                                                          
Epoch 3/10, Loss: 0.1114
4/10
Epoch 4/10:  82%|████████▏ | 98/119 [10:24<02:13,  6.37s/it, loss=0.14]  
Embedding Error
                                                                          
Epoch 4/10, Loss: 0.1095
5/10
Epoch 5/10:  31%|███       | 37/119 [03:56<08:42,  6.37s/it, loss=0.0844]
Embedding Error
                                                                          
Epoch 5/10, Loss: 0.1077
6/10
                                                                          
Epoch 6/10, Loss: 0.1072
7/10
                                                                          
Epoch 7/10, Loss: 0.1102
8/10
                                                                          
Epoch 8/10, Loss: 0.1079
9/10
                                                                          
Epoch 9/10, Loss: 0.1094
10/10
Epoch 10/10:  33%|███▎      | 39/119 [04:08<08:29,  6.37s/it, loss=0.143] 
Embedding Error
                                                                           
Epoch 10/10, Loss: 0.1062

In [101]:
# Afficher l'historique des pertes
plt.plot(range(1, num_epochs + 1), loss_history, marker='o')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training Loss Over Epochs')
plt.grid(True)
plt.show()
No description has been provided for this image
In [103]:
# Afficher l'historique des pertes
plt.plot(range(1, num_epochs + 1), loss_history, marker='o')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training Loss Over Epochs')
plt.grid(True)
plt.show()
No description has been provided for this image
In [157]:
# Sauvegarder l'état du modèle
torch.save(q_network.state_dict(), "qv01.pth")
In [57]:
q_network.load_state_dict(torch.load("qv01.pth"))
Out[57]:
<All keys matched successfully>
In [241]:
def last_4_lines_empty(input_str):
    # Vérifier si la chaîne est vide
    if input_str == "":
        return True

    # Séparer la chaîne en lignes
    lines = input_str.split('\n')
    
    # Si la chaîne a moins de 4 lignes, retourner False
    if len(lines) < 4:
        return False
    
    # Prendre les 4 dernières lignes
    last_4_lines = lines[-6:]
    
    # Vérifier si chaque ligne est vide
    for line in last_4_lines:
        if line.strip() != '':
            return False
    return True
In [105]:
prompt="""Question : {question}

Please think step by step and output each reasoning step in a new line.
You MUST output the very final answer value in the last line without any or extra text, ONLY the answer numerical value prefixed by **final answer**.

"""

Q* test on a sub dataset of ligtheval/MATH¶

First test without A* exploration¶

In [242]:
#start with question
#roll out
#compute qvalue with qmodel
#use best step
#loop until find soltion
def q_start(question):

    s0=prompt.format(question=question)
    #print(llm(s0))
    found=False
    state=""
    action=""
    i=0
    while not found:
        i+=1
        '''
        print(f"______{i}____________________{state}=========={action}_____")
        print(f"__________________________{i}")
        '''
        pool=roll_out(s0,state,action)
        #print(pool)
        best_score=0.
        nex_action=pool[0]
        #print("nex_action=",nex_action)
        for n_action in pool:
            #print("state=_",state)
            #print("action_",n_action)
            if state =="":
                state='\n'
            if n_action == "":
                n_action='\n'
            # score=reward(s0,state,action)
            # score=q_network(torch.tensor(embeddings(state), dtype=torch.float32),torch.tensor(embeddings(action), dtype=torch.float32))
            score=q_network(torch.tensor(embeddings(state), dtype=torch.float32),torch.tensor(embeddings(n_action), dtype=torch.float32))
            score=float(score)
            if score >= best_score:
                nex_action=n_action
        
        state=state+'\n'+action
        action=nex_action
        if action =="" or action is None:
            break
        if "final answer" in action.lower():
            break
        if last_4_lines_empty(state) :
            break
            
    #print(f"{state}\n{action}")
    return state+'\n'+action
In [151]:
# Obtenir la date et l'heure actuelles
current_time = datetime.now()

# Afficher la date et l'heure actuelles
print("Date et heure actuelles :", current_time)
ans=q_start(question)
print(nettoyer_chaine(ans))
# Obtenir la date et l'heure actuelles
current_time = datetime.now()

# Afficher la date et l'heure actuelles
print("Date et heure actuelles :", current_time)
Date et heure actuelles : 2024-07-14 09:17:53.111912



1. Calculate 120% of 30: (120/100) * 30 = 36


2. Calculate 130% of 20: (130/100) * 20 = 26


3. Find the positive difference between 36 and 26: 36 - 26 = 10


final answer 10
10
Date et heure actuelles : 2024-07-14 09:18:06.970778
In [154]:
dataset = load_dataset("lighteval/MATH", 'all', split='test[:100]')
#Fonction d'évaluation
def evaluate_rep2(example):
    prompt = example['problem']
    prediction = nettoyer_chaine(q_start(prompt))
    return {
        'reference': extract_boxed_answer(example['solution']),
        'prediction': prediction
    }
# Application de l'évaluation sur le dataset
results = []
for example in tqdm(dataset):
    results.append(evaluate_rep2(example))

with open("math_eval_02.json", 'w') as file:
    json.dump(results, file, indent=4)
  1%|          | 1/100 [00:17<28:44, 17.42s/it]
Embedding Error
/tmp/ipykernel_2906/3991589663.py:34: UserWarning: To copy construct from a tensor, it is recommended to use sourceTensor.clone().detach() or sourceTensor.clone().detach().requires_grad_(True), rather than torch.tensor(sourceTensor).
  score=q_network(torch.tensor(embeddings(state), dtype=torch.float32),torch.tensor(embeddings(n_action), dtype=torch.float32))
 13%|█▎        | 13/100 [11:25<1:24:50, 58.51s/it] 
Embedding Error
 17%|█▋        | 17/100 [14:34<1:02:03, 44.87s/it]
Embedding Error
 18%|█▊        | 18/100 [15:47<1:12:54, 53.35s/it]
Embedding Error
 27%|██▋       | 27/100 [22:23<56:02, 46.06s/it]  
Embedding Error
 31%|███       | 31/100 [26:06<1:04:27, 56.06s/it]
Embedding Error
 39%|███▉      | 39/100 [33:13<1:04:40, 63.62s/it]
Embedding Error
 47%|████▋     | 47/100 [37:21<28:22, 32.12s/it]  
Embedding Error
 52%|█████▏    | 52/100 [40:01<26:47, 33.49s/it]
Embedding Error
 64%|██████▍   | 64/100 [47:20<20:06, 33.51s/it]
Embedding Error
 70%|███████   | 70/100 [53:22<26:14, 52.49s/it]
Embedding Error
 72%|███████▏  | 72/100 [54:25<19:54, 42.67s/it]
Embedding Error
 75%|███████▌  | 75/100 [57:09<20:48, 49.96s/it]
Embedding Error
 89%|████████▉ | 89/100 [1:09:15<08:00, 43.64s/it]
Embedding Error
 93%|█████████▎| 93/100 [1:12:39<05:32, 47.45s/it]
Embedding Error
 99%|█████████▉| 99/100 [1:16:50<00:38, 38.88s/it]
Embedding Error
100%|██████████| 100/100 [1:17:36<00:00, 46.56s/it]
In [156]:
# Calcul des scores
predictions = [result['prediction'] for result in results]
references = [result['reference'] for result in results]

Evaluate the model¶

In [157]:
score = evaluate.load('exact_match').compute(predictions=predictions, references=references)

# Affichage des résultats
print(f"Score exact_match sur le dataset lighteval/MATH: {score}")
Score exact_match sur le dataset lighteval/MATH: {'exact_match': 0.52}

52 % ¶

In [158]:
score = evaluate.load('rouge').compute(predictions=predictions, references=references)
print(f"Score exact_match sur le dataset lighteval/MATH: {score}")
Score exact_match sur le dataset lighteval/MATH: {'rouge1': 0.6206349206349205, 'rouge2': 0.08333333333333331, 'rougeL': 0.6228888888888888, 'rougeLsum': 0.6239682539682538}
In [159]:
score = evaluate.load('competition_math').compute(predictions=predictions, references=references)
print(f"Score exact_match sur le dataset lighteval/MATH: {score}")
Score exact_match sur le dataset lighteval/MATH: {'accuracy': 0.56}

Implementing final Q*¶

===> To do ¶

Refs¶

https://www.redblobgames.com/pathfinding/a-star/introduction.html¶

Q*: Improving Multi-step Reasoning for LLMs with Deliberative Planning¶

https://arxiv.org/pdf/2406.14283¶

MATH-SHEPHERD: VERIFY AND REINFORCE LLMS STEP-BY-STEP WITHOUT HUMAN ANNOTATIONS¶

https://arxiv.org/pdf/2312.08935¶

Alphazero-like Tree-Search can Guide Large Language Model Decoding and Training¶

https://arxiv.org/pdf/2309.17179¶

Reasoning with Language Model is Planning with World Model¶

https://arxiv.org/pdf/2305.14992¶

Offline Reinforcement Learning: Tutorial, Review, and Perspectives on Open Problemss¶

https://arxiv.org/pdf/2005.0164¶

Mix Q-learning for Lane Changing: A Collaborative, Decision-Making Method in Multi-Agent Deep, Reinforcement Learning¶

https://arxiv.org/pdf/2406.09755¶

Let’s Verify Step by Step¶

https://arxiv.org/pdf/2305.20050¶

On the Measure of Intelligence¶

https://arxiv.org/pdf/1911.01547¶

DaViT: Dual Attention Vision Transformers¶

https://arxiv.org/pdf/2204.036453¶

In [ ]: