196 lines
7.7 KiB
Python
196 lines
7.7 KiB
Python
|
from datetime import datetime
|
||
|
from flask import Flask, Blueprint, flash, render_template, request, url_for
|
||
|
import ldap, ldap.filter, os
|
||
|
from app.exceptions import LoginNotFoundException, MultipleUsersFoundForLoginException
|
||
|
from flask_migrate import Migrate
|
||
|
from flask_mail import Mail, Message
|
||
|
from app.models import db, MagicToken
|
||
|
from dotenv import load_dotenv
|
||
|
load_dotenv()
|
||
|
|
||
|
f_app = Flask(__name__)
|
||
|
f_app.secret_key = os.getenv("SECRET_KEY")
|
||
|
|
||
|
bp = Blueprint('mdp', __name__)
|
||
|
|
||
|
f_app.config["SQLALCHEMY_DATABASE_URI"] = os.getenv("DATABASE_URI")
|
||
|
db.init_app(f_app)
|
||
|
migrate = Migrate(f_app, db)
|
||
|
|
||
|
def to_bool(value, default: bool = False):
|
||
|
"""
|
||
|
Convert a value to a boolean
|
||
|
"""
|
||
|
if value is None:
|
||
|
return default
|
||
|
if isinstance(value, str):
|
||
|
if value.lower() in ['true', '1', 't', 'y', 'yes']:
|
||
|
return True
|
||
|
if value.lower() in ['false', '0', 'f', 'n', 'no']:
|
||
|
return False
|
||
|
if value:
|
||
|
return True
|
||
|
return default
|
||
|
|
||
|
f_app.config['MAIL_SERVER']= os.getenv("MAIL_SERVER")
|
||
|
f_app.config['MAIL_PORT'] = int(os.getenv("MAIL_PORT"))
|
||
|
f_app.config['MAIL_USERNAME'] = os.getenv("MAIL_USERNAME")
|
||
|
f_app.config['MAIL_PASSWORD'] = os.getenv("MAIL_PASSWORD")
|
||
|
f_app.config['MAIL_USE_TLS'] = to_bool(os.getenv("MAIL_USE_TLS"))
|
||
|
f_app.config['MAIL_USE_SSL'] = to_bool(os.getenv("MAIL_USE_SSL"))
|
||
|
|
||
|
mail = Mail(f_app)
|
||
|
|
||
|
@bp.route('/')
|
||
|
def home():
|
||
|
return render_template('pages/home.html.j2')
|
||
|
|
||
|
@bp.route('/identifiant')
|
||
|
def lost_login():
|
||
|
return render_template('pages/lost_login.html.j2')
|
||
|
|
||
|
@bp.route('/mdp', methods=("GET", "POST"))
|
||
|
def password_link_form():
|
||
|
if request.method == 'GET':
|
||
|
return render_template('pages/password_link_form.html.j2')
|
||
|
|
||
|
login = request.form['login']
|
||
|
error = None
|
||
|
|
||
|
if not login:
|
||
|
error = "Le champ 'identifiant' ne peut pas être vide."
|
||
|
else:
|
||
|
try:
|
||
|
user_dn, user_emails = ldap_get_dn_and_emails(login)
|
||
|
if len(user_emails) == 0:
|
||
|
error = "Aucune adresse email trouvée pour cet identifiant. Rapprochez-vous du secrétariat de votre formation pour mettre à jour votre dossier ou " + render_template("partials/contact_link.html.j2") + "."
|
||
|
except LoginNotFoundException:
|
||
|
error = "Aucun utilisateur trouvé avec cet identifiant.<br>Si vous avez perdu votre identifiant, <a class=\"text-blue-600 hover:underline\" href=\"" + url_for("mdp.lost_login") + "\">cliquez ici</a>."
|
||
|
except MultipleUsersFoundForLoginException:
|
||
|
error = "Plusieurs utilisateurs trouvés avec cet identifiant (cette erreur ne devrait jamais s'afficher, " + render_template("partials/contact_link.html.j2") + ")."
|
||
|
except (ldap.SERVER_DOWN, ldap.BUSY, ldap.CONNECT_ERROR, ldap.UNAVAILABLE):
|
||
|
error = "La connexion au serveur LDAP a échouée."
|
||
|
except ldap.TIMEOUT:
|
||
|
error = "Le serveur LDAP a mis trop de temps à répondre."
|
||
|
except ldap.LDAPError:
|
||
|
error = "Une erreur est survenue lors de la connexion à l'annuaire LDAP."
|
||
|
|
||
|
if error is not None:
|
||
|
flash(error)
|
||
|
return render_template('pages/password_link_form.html.j2')
|
||
|
|
||
|
try:
|
||
|
magictoken = MagicToken.generate(user_emails, user_dn)
|
||
|
db.session.add(magictoken)
|
||
|
db.session.commit()
|
||
|
except:
|
||
|
flash("Une erreur est survenue lors de la création du lien.")
|
||
|
return render_template('pages/password_link_form.html.j2')
|
||
|
|
||
|
try:
|
||
|
send_email(user_emails, magictoken)
|
||
|
anonymized_emails = [anonymize_email(email) for email in user_emails]
|
||
|
except:
|
||
|
flash("Une erreur est survenue lors de l'envoi du mail.")
|
||
|
return render_template('pages/password_link_form.html.j2')
|
||
|
|
||
|
return render_template('pages/password_link_sent.html.j2', emails=anonymized_emails, is_student=True)
|
||
|
|
||
|
@bp.route('/mdp/<token>', methods=("GET", "POST"))
|
||
|
def password_reset_form(token: str = None):
|
||
|
magictoken = MagicToken.query.filter_by(token=token).first()
|
||
|
if magictoken is None:
|
||
|
return render_template('pages/invalid_password_token.html.j2')
|
||
|
if magictoken.used:
|
||
|
return render_template('pages/invalid_password_token.html.j2', used=True)
|
||
|
if magictoken.expiration_date < datetime.now():
|
||
|
return render_template('pages/invalid_password_token.html.j2', expired=True)
|
||
|
|
||
|
if request.method == 'GET':
|
||
|
return render_template('pages/password_reset_form.html.j2')
|
||
|
|
||
|
newpassword = request.form['newpasswd']
|
||
|
error = None
|
||
|
|
||
|
if not newpassword:
|
||
|
error = "Le champ 'mot de passe' ne peut pas être vide."
|
||
|
|
||
|
if error is not None:
|
||
|
flash(error)
|
||
|
return render_template('pages/password_reset_form.html.j2')
|
||
|
|
||
|
try:
|
||
|
ldap_set_password(magictoken.user_dn, newpassword)
|
||
|
except (ldap.SERVER_DOWN, ldap.BUSY, ldap.CONNECT_ERROR, ldap.UNAVAILABLE):
|
||
|
error = "La connexion au serveur LDAP a échouée."
|
||
|
except ldap.TIMEOUT:
|
||
|
error = "Le serveur LDAP a mis trop de temps à répondre."
|
||
|
except ldap.LDAPError as err:
|
||
|
error = "Erreur LDAP. " + str(err)
|
||
|
|
||
|
if error is not None:
|
||
|
flash(error)
|
||
|
return render_template('pages/password_reset_form.html.j2')
|
||
|
|
||
|
magictoken.used = True
|
||
|
db.session.commit()
|
||
|
|
||
|
return render_template('pages/password_reset_success.html.j2')
|
||
|
|
||
|
|
||
|
def send_email(emails: list[str], magictoken: MagicToken):
|
||
|
"""
|
||
|
Send the magic token to the list of emails
|
||
|
"""
|
||
|
msg = Message(subject='Initialisation du mot de passe', sender=os.getenv("MAIL_DEFAULT_SENDER"), recipients=emails)
|
||
|
msg.body = render_template('emails/reset-link.txt.j2', magictoken=magictoken)
|
||
|
msg.html = render_template('emails/reset-link.html.j2', magictoken=magictoken)
|
||
|
mail.send(msg)
|
||
|
|
||
|
def anonymize_email(email: str):
|
||
|
"""
|
||
|
Anonymize an email address before displaying it on the website
|
||
|
"""
|
||
|
email_parts = email.split('@')
|
||
|
email_parts[0] = email_parts[0][:3] + '*' * (len(email_parts[0]) - 3)
|
||
|
known_domains = ['etu.u-pec.fr', 'u-pec.fr', 'iutsf.org', 'iut-fbleau.fr']
|
||
|
if email_parts[1].lower() not in known_domains:
|
||
|
email_parts[1] = email_parts[1].rsplit('.', 1)
|
||
|
email_parts[1][0] = email_parts[1][0][:3] + '*' * (len(email_parts[1][0]) - 3)
|
||
|
email_parts[1] = '.'.join(email_parts[1])
|
||
|
|
||
|
return '@'.join(email_parts)
|
||
|
|
||
|
def ldap_get_dn_and_emails(login: str):
|
||
|
"""
|
||
|
Get the user's distinguished name and email addresses from the LDAP server
|
||
|
"""
|
||
|
if not to_bool(os.getenv("LDAP_TLS_VERIFY")):
|
||
|
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
|
||
|
conn = ldap.initialize(os.getenv("LDAP_SERVER"))
|
||
|
conn.simple_bind_s(os.getenv("LDAP_USER"), os.getenv("LDAP_PASSWORD"))
|
||
|
result = conn.search_s(
|
||
|
os.getenv("LDAP_BASE_DN"),
|
||
|
ldap.SCOPE_SUBTREE,
|
||
|
os.getenv("LDAP_SEARCH_FILTER") % ldap.filter.escape_filter_chars(login),
|
||
|
[os.getenv("LDAP_EMAIL_ATTRIBUTE")]
|
||
|
)
|
||
|
if len(result) == 0:
|
||
|
raise LoginNotFoundException()
|
||
|
if len(result) > 1:
|
||
|
raise MultipleUsersFoundForLoginException()
|
||
|
return result[0][0], [entry[1][os.getenv("LDAP_EMAIL_ATTRIBUTE")][0].decode('utf-8') for entry in result]
|
||
|
|
||
|
def ldap_set_password(user_dn: str, newpassword: str):
|
||
|
"""
|
||
|
Set the user's password in LDAP
|
||
|
"""
|
||
|
if not to_bool(os.getenv("LDAP_TLS_VERIFY")):
|
||
|
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
|
||
|
conn = ldap.initialize(os.getenv("LDAP_SERVER"))
|
||
|
conn.simple_bind_s(os.getenv("LDAP_USER"), os.getenv("LDAP_PASSWORD"))
|
||
|
# https://serverfault.com/questions/937330/update-ad-password-from-python-ldap-code-insuff-access-rights
|
||
|
conn.modify_s(user_dn, [(ldap.MOD_REPLACE, 'unicodePwd', ['"{0}"'.format(newpassword).encode('utf-16-le')])])
|
||
|
|
||
|
|
||
|
f_app.register_blueprint(bp)
|