from datetime import datetime from flask import Flask, Blueprint, flash, render_template, request, url_for import ldap, ldap.filter, os from app.exceptions import LoginNotFoundException, MultipleUsersFoundForLoginException from flask_migrate import Migrate from flask_mail import Mail, Message from app.models import db, MagicToken from dotenv import load_dotenv load_dotenv() f_app = Flask(__name__) f_app.secret_key = os.getenv("SECRET_KEY") bp = Blueprint('mdp', __name__) f_app.config["SQLALCHEMY_DATABASE_URI"] = os.getenv("DATABASE_URI") db.init_app(f_app) migrate = Migrate(f_app, db) def to_bool(value, default: bool = False): """ Convert a value to a boolean """ if value is None: return default if isinstance(value, str): if value.lower() in ['true', '1', 't', 'y', 'yes']: return True if value.lower() in ['false', '0', 'f', 'n', 'no']: return False if value: return True return default f_app.config['MAIL_SERVER']= os.getenv("MAIL_SERVER") f_app.config['MAIL_PORT'] = int(os.getenv("MAIL_PORT")) f_app.config['MAIL_USERNAME'] = os.getenv("MAIL_USERNAME") f_app.config['MAIL_PASSWORD'] = os.getenv("MAIL_PASSWORD") f_app.config['MAIL_USE_TLS'] = to_bool(os.getenv("MAIL_USE_TLS")) f_app.config['MAIL_USE_SSL'] = to_bool(os.getenv("MAIL_USE_SSL")) mail = Mail(f_app) @bp.route('/') def home(): return render_template('pages/home.html.j2') @bp.route('/identifiant') def lost_login(): return render_template('pages/lost_login.html.j2') @bp.route('/mdp', methods=("GET", "POST")) def password_link_form(): if request.method == 'GET': return render_template('pages/password_link_form.html.j2') login = request.form['login'] error = None if not login: error = "Le champ 'identifiant' ne peut pas être vide." else: try: user_dn, user_emails = ldap_get_dn_and_emails(login) if len(user_emails) == 0: error = "Aucune adresse email trouvée pour cet identifiant. Rapprochez-vous du secrétariat de votre formation pour mettre à jour votre dossier ou " + render_template("partials/contact_link.html.j2") + "." except LoginNotFoundException: error = "Aucun utilisateur trouvé avec cet identifiant.<br>Si vous avez perdu votre identifiant, <a class=\"text-blue-600 hover:underline\" href=\"" + url_for("mdp.lost_login") + "\">cliquez ici</a>." except MultipleUsersFoundForLoginException: error = "Plusieurs utilisateurs trouvés avec cet identifiant (cette erreur ne devrait jamais s'afficher, " + render_template("partials/contact_link.html.j2") + ")." except (ldap.SERVER_DOWN, ldap.BUSY, ldap.CONNECT_ERROR, ldap.UNAVAILABLE): error = "La connexion au serveur LDAP a échouée." except ldap.TIMEOUT: error = "Le serveur LDAP a mis trop de temps à répondre." except ldap.LDAPError: error = "Une erreur est survenue lors de la connexion à l'annuaire LDAP." if error is not None: flash(error) return render_template('pages/password_link_form.html.j2') try: magictoken = MagicToken.generate(user_emails, user_dn) db.session.add(magictoken) db.session.commit() except: flash("Une erreur est survenue lors de la création du lien.") return render_template('pages/password_link_form.html.j2') try: send_email(user_emails, magictoken) anonymized_emails = [anonymize_email(email) for email in user_emails] except: flash("Une erreur est survenue lors de l'envoi du mail.") return render_template('pages/password_link_form.html.j2') return render_template('pages/password_link_sent.html.j2', emails=anonymized_emails, is_student=True) @bp.route('/mdp/<token>', methods=("GET", "POST")) def password_reset_form(token: str = None): magictoken = MagicToken.query.filter_by(token=token).first() if magictoken is None: return render_template('pages/invalid_password_token.html.j2') if magictoken.used: return render_template('pages/invalid_password_token.html.j2', used=True) if magictoken.expiration_date < datetime.now(): return render_template('pages/invalid_password_token.html.j2', expired=True) if request.method == 'GET': return render_template('pages/password_reset_form.html.j2') newpassword = request.form['newpasswd'] error = None if not newpassword: error = "Le champ 'mot de passe' ne peut pas être vide." if error is not None: flash(error) return render_template('pages/password_reset_form.html.j2') try: ldap_set_password(magictoken.user_dn, newpassword) except (ldap.SERVER_DOWN, ldap.BUSY, ldap.CONNECT_ERROR, ldap.UNAVAILABLE): error = "La connexion au serveur LDAP a échouée." except ldap.TIMEOUT: error = "Le serveur LDAP a mis trop de temps à répondre." except ldap.LDAPError as err: error = "Erreur LDAP. " + str(err) if error is not None: flash(error) return render_template('pages/password_reset_form.html.j2') magictoken.used = True db.session.commit() return render_template('pages/password_reset_success.html.j2') def send_email(emails: list[str], magictoken: MagicToken): """ Send the magic token to the list of emails """ msg = Message(subject='Initialisation du mot de passe', sender=os.getenv("MAIL_DEFAULT_SENDER"), recipients=emails) msg.body = render_template('emails/reset-link.txt.j2', magictoken=magictoken) msg.html = render_template('emails/reset-link.html.j2', magictoken=magictoken) mail.send(msg) def anonymize_email(email: str): """ Anonymize an email address before displaying it on the website """ email_parts = email.split('@') email_parts[0] = email_parts[0][:3] + '*' * (len(email_parts[0]) - 3) known_domains = ['etu.u-pec.fr', 'u-pec.fr', 'iutsf.org', 'iut-fbleau.fr'] if email_parts[1].lower() not in known_domains: email_parts[1] = email_parts[1].rsplit('.', 1) email_parts[1][0] = email_parts[1][0][:3] + '*' * (len(email_parts[1][0]) - 3) email_parts[1] = '.'.join(email_parts[1]) return '@'.join(email_parts) def ldap_get_dn_and_emails(login: str): """ Get the user's distinguished name and email addresses from the LDAP server """ if not to_bool(os.getenv("LDAP_TLS_VERIFY")): ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER) conn = ldap.initialize(os.getenv("LDAP_SERVER")) conn.simple_bind_s(os.getenv("LDAP_USER"), os.getenv("LDAP_PASSWORD")) result = conn.search_s( os.getenv("LDAP_BASE_DN"), ldap.SCOPE_SUBTREE, os.getenv("LDAP_SEARCH_FILTER") % ldap.filter.escape_filter_chars(login), [os.getenv("LDAP_EMAIL_ATTRIBUTE")] ) if len(result) == 0: raise LoginNotFoundException() if len(result) > 1: raise MultipleUsersFoundForLoginException() return result[0][0], [entry[1][os.getenv("LDAP_EMAIL_ATTRIBUTE")][0].decode('utf-8') for entry in result] def ldap_set_password(user_dn: str, newpassword: str): """ Set the user's password in LDAP """ if not to_bool(os.getenv("LDAP_TLS_VERIFY")): ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER) conn = ldap.initialize(os.getenv("LDAP_SERVER")) conn.simple_bind_s(os.getenv("LDAP_USER"), os.getenv("LDAP_PASSWORD")) # https://serverfault.com/questions/937330/update-ad-password-from-python-ldap-code-insuff-access-rights conn.modify_s(user_dn, [(ldap.MOD_REPLACE, 'unicodePwd', ['"{0}"'.format(newpassword).encode('utf-16-le')])]) f_app.register_blueprint(bp)