#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Simulation du processus de création de liens entre pages web sous l'influence de moteurs de recherche, conformément à l'énoncé "Modélisations mathématiques — BUT3 Info Fontainebleau". Fonctionnalités : - Initialisation d'un graphe orienté de n pages avec pertinence p(x) dans [0,1] - Scores de recherche au choix : "indegree_plus_one" (degré entrant + 1, normalisé) ou "pagerank" - À chaque itération : on choisit x au hasard, puis y selon la distribution des scores s(y), puis on ajoute le lien x->y avec probabilité p(y) - Politiques : interdiction des boucles x->x, politique de remplacement optionnelle - Arrêt : quand chaque page a au moins ceil(ln n) liens sortants OU stagnation - Visualisation : graphe final (taille des nœuds = score), historique des nouveaux liens - Export CSV et PNG """ import numpy as np import random import math import pandas as pd import networkx as nx import matplotlib.pyplot as plt from dataclasses import dataclass from typing import Optional, Tuple, Dict, List # ---------------------------- # Configuration de simulation # ---------------------------- @dataclass class WebSimulationConfig: n: int = 30 seed: Optional[int] = 42 score_mode: str = "indegree_plus_one" # "indegree_plus_one" | "pagerank" use_realistic_p: bool = False beta_params: Tuple[float, float] = (2.0, 5.0) max_iters: int = 20000 min_out_links: Optional[int] = None stagnation_patience: int = 500 allow_self_loops: bool = False replacement_policy: str = "none" # "none" | "replace_worst_if_better" pagerank_damping: float = 0.85 pagerank_tol: float = 1.0e-08 pagerank_max_iter: int = 100 # ---------------------------- # Simulation # ---------------------------- class WebSimulation: def __init__(self, config: WebSimulationConfig): self.cfg = config if self.cfg.seed is not None: np.random.seed(self.cfg.seed) random.seed(self.cfg.seed) self.G = nx.DiGraph() self.G.add_nodes_from(range(self.cfg.n)) if self.cfg.use_realistic_p: a, b = self.cfg.beta_params p_vals = np.random.beta(a, b, size=self.cfg.n) else: p_vals = np.random.rand(self.cfg.n) self.p = {i: float(p_vals[i]) for i in range(self.cfg.n)} self.scores = {i: 1.0 / self.cfg.n for i in range(self.cfg.n)} self.min_out_links = (math.ceil(math.log(self.cfg.n)) if self.cfg.min_out_links is None else self.cfg.min_out_links) def _compute_scores_indegree_plus_one(self) -> Dict[int, float]: raw = np.array([self.G.in_degree(i) + 1 for i in range(self.cfg.n)], dtype=float) total = raw.sum() if total <= 0: return {i: 1.0 / self.cfg.n for i in range(self.cfg.n)} return {i: float(raw[i] / total) for i in range(self.cfg.n)} def _compute_scores_pagerank(self) -> Dict[int, float]: try: pr = nx.pagerank(self.G, alpha=self.cfg.pagerank_damping, tol=self.cfg.pagerank_tol, max_iter=self.cfg.pagerank_max_iter) except nx.PowerIterationFailedConvergence: pr = {i: 1.0 / self.cfg.n for i in range(self.cfg.n)} return pr def recompute_scores(self) -> None: if self.cfg.score_mode == "indegree_plus_one": self.scores = self._compute_scores_indegree_plus_one() elif self.cfg.score_mode == "pagerank": self.scores = self._compute_scores_pagerank() else: raise ValueError("Unknown score_mode") def _choose_y_weighted_by_scores(self) -> int: nodes = list(range(self.cfg.n)) weights = np.array([self.scores[i] for i in nodes], dtype=float) weights = weights / weights.sum() return int(np.random.choice(nodes, p=weights)) def _maybe_add_edge(self, x: int, y: int) -> bool: if (not self.cfg.allow_self_loops) and (x == y): return False accept = (random.random() <= self.p[y]) if not accept: return False if self.cfg.replacement_policy == "none": if not self.G.has_edge(x, y): self.G.add_edge(x, y, weight=1.0) return True return False if self.cfg.replacement_policy == "replace_worst_if_better": out_neighbors = list(self.G.successors(x)) if y in out_neighbors: return False if len(out_neighbors) < self.min_out_links: self.G.add_edge(x, y, weight=1.0) return True worst = min(out_neighbors, key=lambda t: self.p[t]) if out_neighbors else None if worst is None: self.G.add_edge(x, y, weight=1.0) return True if self.p[y] > self.p[worst]: self.G.remove_edge(x, worst) self.G.add_edge(x, y, weight=1.0) return True return False raise ValueError("Unknown replacement_policy") def _meets_stopping_condition(self, stagnation_steps: int) -> bool: if all(self.G.out_degree(i) >= self.min_out_links for i in range(self.cfg.n)): return True if stagnation_steps >= self.cfg.stagnation_patience: return True return False def run(self): history_new_edges = [] stagnation = 0 iters = 0 while iters < self.cfg.max_iters: x = random.randrange(self.cfg.n) y = self._choose_y_weighted_by_scores() changed = self._maybe_add_edge(x, y) history_new_edges.append(1 if changed else 0) if changed: stagnation = 0 self.recompute_scores() else: stagnation += 1 iters += 1 if self._meets_stopping_condition(stagnation): break self.recompute_scores() return self.G, self.p, self.scores, history_new_edges, iters # ---------------------------- # Outils d'analyse & tracés # ---------------------------- def summarize(G, p, scores) -> pd.DataFrame: rows = [] for i in G.nodes(): rows.append({ "node": i, "pertinence_p": p[i], "score": scores[i], "in_degree": G.in_degree(i), "out_degree": G.out_degree(i), }) return pd.DataFrame(rows).sort_values(["score", "pertinence_p"], ascending=[False, False]).reset_index(drop=True) def plot_expected_graph(G, scores, title="Graphe attendu (taille = score)"): # Disposition circulaire stricte pos = nx.circular_layout(G) # Taille des nœuds proportionnelle aux scores score_vals = np.array([scores[i] for i in G.nodes()]) node_sizes = 3000 * (score_vals / score_vals.max() + 0.05) plt.figure(figsize=(7, 7)) nx.draw_networkx_nodes(G, pos, node_size=node_sizes, node_color="dodgerblue") nx.draw_networkx_edges( G, pos, arrows=True, arrowstyle="-|>", arrowsize=8, edge_color="black", width=0.8 ) plt.axis("off") plt.title(title) plt.tight_layout() plt.show() def plot_history(history): plt.figure(figsize=(10, 3)) plt.plot(history) plt.title("Nouveaux liens (1) vs itérations") plt.xlabel("Itération") plt.ylabel("Nouveau lien ?") plt.tight_layout() plt.show() # ---------------------------- # Main # ---------------------------- def main(): cfg = WebSimulationConfig( n=30, seed=123, score_mode="indegree_plus_one", # ou "pagerank" use_realistic_p=True, beta_params=(2.5, 4.0), max_iters=10000, min_out_links=None, # -> ceil(ln n) stagnation_patience=1000, allow_self_loops=False, replacement_policy="replace_worst_if_better", ) sim = WebSimulation(cfg) G, p, scores, history, iters = sim.run() print(f"Itérations effectuées: {iters}") df = summarize(G, p, scores) print(df.head(10)) # Figures plot_expected_graph(G, scores) plot_history(history) # Exports df.to_csv("resume_pages.csv", index=False) plt.figure() plot_expected_graph(G, scores, title="Graphe final exporté") plt.savefig("graphe_final.png", dpi=150, bbox_inches="tight") plt.close() plt.figure() plot_history(history) plt.savefig("historique_liens.png", dpi=150, bbox_inches="tight") plt.close() if __name__ == "__main__": main()