196 lines
7.7 KiB
Python
Raw Permalink Normal View History

2025-03-13 22:24:53 +01:00
from datetime import datetime
from flask import Flask, Blueprint, flash, render_template, request, url_for
import ldap, ldap.filter, os
from app.exceptions import LoginNotFoundException, MultipleUsersFoundForLoginException
from flask_migrate import Migrate
from flask_mail import Mail, Message
from app.models import db, MagicToken
from dotenv import load_dotenv
load_dotenv()
f_app = Flask(__name__)
f_app.secret_key = os.getenv("SECRET_KEY")
bp = Blueprint('mdp', __name__)
f_app.config["SQLALCHEMY_DATABASE_URI"] = os.getenv("DATABASE_URI")
db.init_app(f_app)
migrate = Migrate(f_app, db)
def to_bool(value, default: bool = False):
"""
Convert a value to a boolean
"""
if value is None:
return default
if isinstance(value, str):
if value.lower() in ['true', '1', 't', 'y', 'yes']:
return True
if value.lower() in ['false', '0', 'f', 'n', 'no']:
return False
if value:
return True
return default
f_app.config['MAIL_SERVER']= os.getenv("MAIL_SERVER")
f_app.config['MAIL_PORT'] = int(os.getenv("MAIL_PORT"))
f_app.config['MAIL_USERNAME'] = os.getenv("MAIL_USERNAME")
f_app.config['MAIL_PASSWORD'] = os.getenv("MAIL_PASSWORD")
f_app.config['MAIL_USE_TLS'] = to_bool(os.getenv("MAIL_USE_TLS"))
f_app.config['MAIL_USE_SSL'] = to_bool(os.getenv("MAIL_USE_SSL"))
mail = Mail(f_app)
@bp.route('/')
def home():
return render_template('pages/home.html.j2')
@bp.route('/identifiant')
def lost_login():
return render_template('pages/lost_login.html.j2')
@bp.route('/mdp', methods=("GET", "POST"))
def password_link_form():
if request.method == 'GET':
return render_template('pages/password_link_form.html.j2')
login = request.form['login']
error = None
if not login:
error = "Le champ 'identifiant' ne peut pas être vide."
else:
try:
user_dn, user_emails = ldap_get_dn_and_emails(login)
if len(user_emails) == 0:
error = "Aucune adresse email trouvée pour cet identifiant. Rapprochez-vous du secrétariat de votre formation pour mettre à jour votre dossier ou " + render_template("partials/contact_link.html.j2") + "."
except LoginNotFoundException:
error = "Aucun utilisateur trouvé avec cet identifiant.<br>Si vous avez perdu votre identifiant, <a class=\"text-blue-600 hover:underline\" href=\"" + url_for("mdp.lost_login") + "\">cliquez ici</a>."
except MultipleUsersFoundForLoginException:
error = "Plusieurs utilisateurs trouvés avec cet identifiant (cette erreur ne devrait jamais s'afficher, " + render_template("partials/contact_link.html.j2") + ")."
except (ldap.SERVER_DOWN, ldap.BUSY, ldap.CONNECT_ERROR, ldap.UNAVAILABLE):
error = "La connexion au serveur LDAP a échouée."
except ldap.TIMEOUT:
error = "Le serveur LDAP a mis trop de temps à répondre."
except ldap.LDAPError:
error = "Une erreur est survenue lors de la connexion à l'annuaire LDAP."
if error is not None:
flash(error)
return render_template('pages/password_link_form.html.j2')
try:
magictoken = MagicToken.generate(user_emails, user_dn)
db.session.add(magictoken)
db.session.commit()
except:
flash("Une erreur est survenue lors de la création du lien.")
return render_template('pages/password_link_form.html.j2')
try:
send_email(user_emails, magictoken)
anonymized_emails = [anonymize_email(email) for email in user_emails]
except:
flash("Une erreur est survenue lors de l'envoi du mail.")
return render_template('pages/password_link_form.html.j2')
return render_template('pages/password_link_sent.html.j2', emails=anonymized_emails, is_student=True)
@bp.route('/mdp/<token>', methods=("GET", "POST"))
def password_reset_form(token: str = None):
magictoken = MagicToken.query.filter_by(token=token).first()
if magictoken is None:
return render_template('pages/invalid_password_token.html.j2')
if magictoken.used:
return render_template('pages/invalid_password_token.html.j2', used=True)
if magictoken.expiration_date < datetime.now():
return render_template('pages/invalid_password_token.html.j2', expired=True)
if request.method == 'GET':
return render_template('pages/password_reset_form.html.j2')
newpassword = request.form['newpasswd']
error = None
if not newpassword:
error = "Le champ 'mot de passe' ne peut pas être vide."
if error is not None:
flash(error)
return render_template('pages/password_reset_form.html.j2')
try:
ldap_set_password(magictoken.user_dn, newpassword)
except (ldap.SERVER_DOWN, ldap.BUSY, ldap.CONNECT_ERROR, ldap.UNAVAILABLE):
error = "La connexion au serveur LDAP a échouée."
except ldap.TIMEOUT:
error = "Le serveur LDAP a mis trop de temps à répondre."
except ldap.LDAPError as err:
error = "Erreur LDAP. " + str(err)
if error is not None:
flash(error)
return render_template('pages/password_reset_form.html.j2')
magictoken.used = True
db.session.commit()
return render_template('pages/password_reset_success.html.j2')
def send_email(emails: list[str], magictoken: MagicToken):
"""
Send the magic token to the list of emails
"""
msg = Message(subject='Initialisation du mot de passe', sender=os.getenv("MAIL_DEFAULT_SENDER"), recipients=emails)
msg.body = render_template('emails/reset-link.txt.j2', magictoken=magictoken)
msg.html = render_template('emails/reset-link.html.j2', magictoken=magictoken)
mail.send(msg)
def anonymize_email(email: str):
"""
Anonymize an email address before displaying it on the website
"""
email_parts = email.split('@')
email_parts[0] = email_parts[0][:3] + '*' * (len(email_parts[0]) - 3)
known_domains = ['etu.u-pec.fr', 'u-pec.fr', 'iutsf.org', 'iut-fbleau.fr']
if email_parts[1].lower() not in known_domains:
email_parts[1] = email_parts[1].rsplit('.', 1)
email_parts[1][0] = email_parts[1][0][:3] + '*' * (len(email_parts[1][0]) - 3)
email_parts[1] = '.'.join(email_parts[1])
return '@'.join(email_parts)
def ldap_get_dn_and_emails(login: str):
"""
Get the user's distinguished name and email addresses from the LDAP server
"""
if not to_bool(os.getenv("LDAP_TLS_VERIFY")):
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
conn = ldap.initialize(os.getenv("LDAP_SERVER"))
conn.simple_bind_s(os.getenv("LDAP_USER"), os.getenv("LDAP_PASSWORD"))
result = conn.search_s(
os.getenv("LDAP_BASE_DN"),
ldap.SCOPE_SUBTREE,
os.getenv("LDAP_SEARCH_FILTER") % ldap.filter.escape_filter_chars(login),
[os.getenv("LDAP_EMAIL_ATTRIBUTE")]
)
if len(result) == 0:
raise LoginNotFoundException()
if len(result) > 1:
raise MultipleUsersFoundForLoginException()
return result[0][0], [entry[1][os.getenv("LDAP_EMAIL_ATTRIBUTE")][0].decode('utf-8') for entry in result]
def ldap_set_password(user_dn: str, newpassword: str):
"""
Set the user's password in LDAP
"""
if not to_bool(os.getenv("LDAP_TLS_VERIFY")):
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
conn = ldap.initialize(os.getenv("LDAP_SERVER"))
conn.simple_bind_s(os.getenv("LDAP_USER"), os.getenv("LDAP_PASSWORD"))
# https://serverfault.com/questions/937330/update-ad-password-from-python-ldap-code-insuff-access-rights
conn.modify_s(user_dn, [(ldap.MOD_REPLACE, 'unicodePwd', ['"{0}"'.format(newpassword).encode('utf-16-le')])])
f_app.register_blueprint(bp)