2024-06-12 16:40:23 +02:00
from tkinter import *
from tkinter import messagebox
import mysql . connector
from obswebsocket import obsws , requests as obsrequests
from twitch . helix . api import TwitchHelix
import webbrowser
import asyncio
import re
class NouvelleFenetre :
is_live = False
client_id = ' 6h883my3p6ndqozotnl6nsbllyyr3h ' # id de l'app
oauth_authorization_url = f " https://id.twitch.tv/oauth2/authorize?client_id= { client_id } &redirect_uri=http://localhost&response_type=token&scope=user_read "
def __init__ ( self , root , username , password ) :
self . oauth_token = None
asyncio . run ( self . initialize_window ( root , username , password ) )
self . username = username
self . password = password
self . root = root
self . root . grid_rowconfigure ( 0 , weight = 0 )
self . root . grid_rowconfigure ( 1 , weight = 0 )
self . root . grid_rowconfigure ( 2 , weight = 0 )
self . root . grid_rowconfigure ( 3 , weight = 0 )
self . root . grid_rowconfigure ( 4 , weight = 0 )
self . root . grid_rowconfigure ( 5 , weight = 0 )
self . root . grid_rowconfigure ( 6 , weight = 0 )
self . root . grid_rowconfigure ( 7 , weight = 0 )
self . root . grid_rowconfigure ( 8 , weight = 0 )
self . root . grid_rowconfigure ( 9 , weight = 0 )
self . root . grid_rowconfigure ( 10 , weight = 0 )
self . root . grid_rowconfigure ( 11 , weight = 0 )
self . root . grid_rowconfigure ( 12 , weight = 0 )
self . root . grid_rowconfigure ( 13 , weight = 0 )
self . root . grid_rowconfigure ( 14 , weight = 0 )
self . root . grid_columnconfigure ( 0 , weight = 1 )
self . root . grid_columnconfigure ( 1 , weight = 0 )
self . root . grid_columnconfigure ( 2 , weight = 0 )
self . root . grid_columnconfigure ( 3 , weight = 0 )
self . obs_host = " localhost " # Adresse IP de votre ordinateur où OBS est en cours d'exécution
self . obs_port = 4455 # Port par défaut utilisé par OBSWebsocket
self . obs_password = " MPcZLZfViZihHqmg " # Mot de passe OBSWebsocket
self . twitch_key_from_database = self . get_twitch_key_from_database ( username )
async def initialize_window ( self , root , username , password ) :
self . root = Toplevel ( root )
self . root . title ( " OBS Extension Multistreaming " )
self . root . geometry ( " 1920x1080 " )
self . username = username
self . password = password
self . lancer_auth_twitch ( )
self . oauth_token = self . get_oauth_token_from_database ( username )
await self . initialize_twitch ( )
label = Label ( self . root , text = f " Bienvenue, { username } ! " )
label . grid ( row = 0 , column = 0 )
label = Label ( self . root , text = " Clé Twitch : " )
label . grid ( row = 2 , column = 0 )
self . entry_twitch = Entry ( self . root )
self . entry_twitch . grid ( row = 3 , column = 0 )
boutonvalidation = Button ( self . root , text = " Envoyé " , command = lambda : self . set_key_twitch ( username ) )
boutonvalidation . grid ( row = 4 , column = 0 )
boutondediffusiontwitch = Button ( self . root , text = " Diffusion de Twitch On/Off " , command = lambda : asyncio . run ( self . configure_obs_for_twitch ( self . twitch_key_from_database ) ) )
boutondediffusiontwitch . grid ( row = 4 , column = 3 )
label = Label ( self . root , text = " Clé Youtube : " )
label . grid ( row = 5 , column = 0 )
self . entry_youtube = Entry ( self . root )
self . entry_youtube . grid ( row = 6 , column = 0 )
boutonvalidation2 = Button ( self . root , text = " Envoyé " , command = lambda : self . set_key_youtube ( username ) )
boutonvalidation2 . grid ( row = 7 , column = 0 )
boutondediffusionyoutube = Button ( self . root , text = " Diffusion de YouTube On/Off " )
boutondediffusionyoutube . grid ( row = 7 , column = 3 )
bouton_fermer = Button ( self . root , text = " Fermer " , command = self . fermer_et_reafficher )
bouton_fermer . grid ( row = 13 , column = 0 )
def fermer_et_reafficher ( self ) :
self . root . destroy ( )
self . ws . disconnect ( )
self . root . master . deiconify ( )
def get_key_twitch ( self ) :
self . twitch = self . entry_twitch . get ( )
print ( self . twitch )
def get_key_youtube ( self ) :
self . youtube = self . entry_youtube . get ( )
print ( self . youtube )
def set_key_twitch ( self , username ) :
try :
cnx = mysql . connector . connect ( user = ' root ' , password = ' ' ,
host = ' 127.0.0.1 ' ,
database = ' obs_project ' )
cur = cnx . cursor ( )
query = " SELECT * FROM utilisateurs WHERE username = %s "
cur . execute ( query , ( username , ) )
user = cur . fetchone ( )
if user :
new_twitch_key = self . entry_twitch . get ( )
update_query = " UPDATE twitch SET twitch_key = %s WHERE username = %s "
cur . execute ( update_query , ( new_twitch_key , username ) )
cnx . commit ( )
print ( f " La clé a été mis à jour pour { username } avec la clé { new_twitch_key } " )
else :
print ( f " Utilisateur { username } non trouvée " )
except mysql . connector . Error as err :
print ( f " Erreur: { err } " )
messagebox . showerror ( " Erreur " , " Une erreur a eu lieu pour mettre à jour votre clé " )
finally :
cur . close ( )
cnx . close ( )
def set_key_youtube ( self , username ) :
try :
cnx = mysql . connector . connect ( user = ' root ' , password = ' ' ,
host = ' 127.0.0.1 ' ,
database = ' obs_project ' )
cur = cnx . cursor ( )
query = " SELECT * FROM utilisateurs WHERE username = %s "
cur . execute ( query , ( username , ) )
user = cur . fetchone ( )
if user :
new_youtube_key = self . entry_youtube . get ( )
update_query = " UPDATE youtube SET youtube_key = %s WHERE username = %s "
cur . execute ( update_query , ( new_youtube_key , username ) )
cnx . commit ( )
print ( f " La clé a été mis à jour pour { username } avec la clé { new_youtube_key } " )
else :
print ( f " Utilisateur { username } non trouvée " )
except mysql . connector . Error as err :
print ( f " Erreur: { err } " )
messagebox . showerror ( " Erreur " , " Une erreur a eu lieu pour mettre à jour votre clé " )
finally :
cur . close ( )
cnx . close ( )
def get_twitch_key_from_database ( self , username ) :
try :
cnx = mysql . connector . connect ( user = ' root ' , password = ' ' ,
host = ' 127.0.0.1 ' ,
database = ' obs_project ' )
cur = cnx . cursor ( )
query = " SELECT twitch_key FROM twitch WHERE username = %s "
cur . execute ( query , ( username , ) )
twitch_key = cur . fetchone ( )
if twitch_key :
return twitch_key [ 0 ]
else :
print ( f " Clé Twitch non trouvée pour l ' utilisateur { username } " )
return None
except mysql . connector . Error as err :
print ( f " Erreur: { err } " )
messagebox . showerror ( " Erreur " , " Une erreur a eu lieu pour récupérer la clé Twitch " )
finally :
cur . close ( )
cnx . close ( )
async def configure_obs_for_twitch ( self , twitch_key_from_database ) :
try :
if twitch_key_from_database is not None :
if not self . is_live :
print ( " Connexion à OBS via Websocket avec les paramètres suivants : " )
print ( " Host: " , self . obs_host )
print ( " Port: " , self . obs_port )
print ( " Password: " , self . obs_password )
print ( " Twitch Stream Key: " , twitch_key_from_database )
self . obs_ws = obsws ( self . obs_host , self . obs_port , self . obs_password )
self . obs_ws . connect ( )
start_stream_request = obsrequests . StartStreaming ( stream_key = twitch_key_from_database )
self . obs_ws . call ( start_stream_request )
self . is_live = True
else :
print ( " Arrêt du streaming Twitch. " )
self . obs_ws . call ( obsrequests . StopStreaming ( ) )
self . obs_ws . disconnect ( )
self . is_live = False
else :
print ( " La clé Twitch est manquante. " )
except Exception as e :
print ( f " Une erreur s ' est produite lors de la configuration d ' OBS pour Twitch : { e } " )
async def initialize_twitch ( self ) :
try :
if self . oauth_token :
twitch = TwitchHelix ( self . client_id , self . oauth_token )
users = twitch . get_users ( )
if users :
print ( " Utilisateur Twitch trouvé: " )
print ( " ID: " , users [ 0 ] . id )
print ( " Nom d ' utilisateur: " , users [ 0 ] . display_name )
else :
print ( " Utilisateur Twitch non trouvé. " )
else :
print ( " Le token OAuth Twitch n ' est pas disponible. Assurez-vous de le récupérer depuis la base de données. " )
except Exception as e :
print ( f " Une erreur s ' est produite lors de la récupération de l ' utilisateur Twitch : { e } " )
def lancer_auth_twitch ( self ) :
oauth_token = self . get_oauth_token_from_database ( self . username )
print ( f " Token OAuth récupéré pour { self . username } : { oauth_token } " )
if oauth_token is None or oauth_token . strip ( ) == ' ' :
webbrowser . open ( self . oauth_authorization_url )
messagebox . showinfo ( " Authentification Twitch " , " Veuillez autoriser l ' accès à Twitch dans le navigateur Web qui s ' est ouvert. " )
else :
messagebox . showinfo ( " Authentification Twitch " , " L ' authentification Twitch n ' est pas nécessaire car le token OAuth est déjà présent dans la base de données. " )
def get_oauth_token_from_database ( self , username ) :
try :
cnx = mysql . connector . connect ( user = ' root ' , password = ' ' ,
host = ' 127.0.0.1 ' ,
database = ' obs_project ' )
cur = cnx . cursor ( )
query = " SELECT OAuth_Key FROM twitch WHERE username = %s "
cur . execute ( query , ( username , ) )
oauth_token = cur . fetchone ( )
print ( f " Résultat de la requête pour l ' utilisateur { username } : { oauth_token } " )
2024-06-12 16:43:55 +02:00
if oauth_token and oauth_token [ 0 ] . strip ( ) :
2024-06-12 16:40:23 +02:00
print ( f " OAuth token trouvé pour l ' utilisateur { username } : { oauth_token [ 0 ] } " )
return oauth_token [ 0 ]
else :
print ( f " OAuth token non trouvé pour l ' utilisateur { username } " )
return None
except mysql . connector . Error as err :
print ( f " Erreur: { err } " )
messagebox . showerror ( " Erreur " , " Une erreur a eu lieu pour récupérer le token OAuth " )
finally :
cur . close ( )
cnx . close ( )