forked from tanchou/Verilog
Refactor ESP32 communication: update packet structure, enhance frame reading, and implement command interpretation
This commit is contained in:
@@ -60,7 +60,8 @@ void onClientConnected(WiFiEvent_t event, WiFiEventInfo_t info) {
|
||||
// Copier l'adr MAC dans le tableau
|
||||
memcpy(&packet[3], conn->mac, 6);
|
||||
|
||||
packet[9] = 0x03;
|
||||
packet[9] = 0x1B; // marqueur avant fin
|
||||
packet[10] = 0x03; // fin de trame
|
||||
|
||||
Serial.write(packet, sizeof(packet));
|
||||
digitalWrite(2, LOW);
|
||||
@@ -71,15 +72,18 @@ void onClientDisconnected(WiFiEvent_t event, WiFiEventInfo_t info) {
|
||||
|
||||
const wifi_event_ap_stadisconnected_t* disc = reinterpret_cast<const wifi_event_ap_stadisconnected_t*>(&info);
|
||||
|
||||
byte packet[11];
|
||||
byte packet[12];
|
||||
packet[0] = 0x02;
|
||||
packet[1] = 0x02;
|
||||
packet[2] = 0x00;
|
||||
packet[1] = 0x02; // OP_CODE: Connection Update
|
||||
packet[2] = 0x00; // Disconnected
|
||||
|
||||
memcpy(&packet[3], disc->mac, 6);
|
||||
packet[9] = 0x03;
|
||||
memcpy(&packet[3], disc->mac, 6); // MAC address
|
||||
|
||||
Serial.write(packet, sizeof(packet));
|
||||
packet[9] = 0x1B; // marqueur avant fin
|
||||
packet[10] = 0x03; // fin de trame
|
||||
packet[11] = '\n'; // (optionnel, pour debug dans terminal série)
|
||||
|
||||
Serial.write(packet, 11); // <= envoie bien 11 octets, pas 12 (on ne compte pas le \n ici si tu veux l’ignorer)
|
||||
digitalWrite(2, LOW);
|
||||
}
|
||||
|
||||
@@ -109,13 +113,10 @@ void processCommand(uint8_t* data, int length) {
|
||||
|
||||
case 0x03: { // Request Connected Devices
|
||||
wifi_sta_list_t staList;
|
||||
tcpip_adapter_sta_list_t adapterList;
|
||||
if (esp_wifi_ap_get_sta_list(&staList) == ESP_OK &&
|
||||
tcpip_adapter_get_sta_list(&staList, &adapterList) == ESP_OK) {
|
||||
|
||||
for (int i = 0; i < adapterList.num; i++) {
|
||||
if (esp_wifi_ap_get_sta_list(&staList) == ESP_OK) {
|
||||
for (int i = 0; i < staList.num; i++) {
|
||||
byte packet[12] = {0x02, 0x02, 0x01}; // 0x01 = connect
|
||||
memcpy(packet + 3, adapterList.sta[i].mac, 6);
|
||||
memcpy(packet + 3, staList.sta[i].mac, 6);
|
||||
packet[9] = 0x1B;
|
||||
packet[10] = 0x03;
|
||||
Serial.write(packet, 11);
|
||||
@@ -127,6 +128,7 @@ void processCommand(uint8_t* data, int length) {
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
case 0x04: { // Send Message
|
||||
if (length < 12) {
|
||||
byte packet[] = {0x02, 0x00, 0x02, 0x1B, 0x03}; // args error
|
||||
|
@@ -1,46 +1,28 @@
|
||||
import serial
|
||||
|
||||
def format_mac(mac_bytes):
|
||||
return ":".join(f"{b:02X}" for b in mac_bytes)
|
||||
|
||||
def interpret_command(data):
|
||||
if len(data) == 10 and data[0] == 0x02 and data[1] == 0x02 and data[-1] == 0x03:
|
||||
cmd_type = data[2]
|
||||
mac = data[3:9]
|
||||
if cmd_type == 0x01:
|
||||
return f"[+] Appareil connecté : {format_mac(mac)}"
|
||||
elif cmd_type == 0x00:
|
||||
return f"[-] Appareil déconnecté : {format_mac(mac)}"
|
||||
else:
|
||||
return f"[?] Commande inconnue : {data.hex()}"
|
||||
elif len(data) == 4 and data[0] == 0x02 and data[1] == 0x01 and data[-1] == 0x03:
|
||||
return "[*] Démarrage ESP32"
|
||||
else:
|
||||
return "[!] Donnée non reconnue"
|
||||
|
||||
def main():
|
||||
port = "COM5" # Remplace par ton port série (ex. /dev/ttyUSB0 sous Linux)
|
||||
baudrate = 115200
|
||||
try:
|
||||
with serial.Serial('COM5', 115200, timeout=1) as ser:
|
||||
buffer = []
|
||||
while True:
|
||||
byte = ser.read(1)
|
||||
if byte:
|
||||
value = byte[0]
|
||||
buffer.append(value)
|
||||
print(f"Reçu: {value:#04x}")
|
||||
|
||||
with serial.Serial(port, baudrate, timeout=1) as ser:
|
||||
print(f"Lecture sur {port} à {baudrate} bauds...\n")
|
||||
buffer = bytearray()
|
||||
# Vérifie début de trame
|
||||
if len(buffer) == 1 and buffer[0] != 0x02:
|
||||
buffer.clear()
|
||||
|
||||
while True:
|
||||
byte = ser.read()
|
||||
if not byte:
|
||||
continue
|
||||
|
||||
buffer += byte
|
||||
|
||||
# Si on détecte le début (0x02) et fin (0x03), on traite
|
||||
if buffer[0] == 0x02 and 0x03 in buffer:
|
||||
end_index = buffer.index(0x03)
|
||||
packet = buffer[:end_index + 1]
|
||||
buffer = buffer[end_index + 1:]
|
||||
|
||||
print("Reçu :", ' '.join(f'{b:02X}' for b in packet))
|
||||
print("↳", interpret_command(packet), "\n")
|
||||
# Vérifie fin de trame
|
||||
if len(buffer) >= 3 and buffer[-2] == 0x1B and buffer[-1] == 0x03:
|
||||
print("\n=== Trame complète reçue ===")
|
||||
print("Trame :", ' '.join(f"{b:#04x}" for b in buffer))
|
||||
print("=============================\n")
|
||||
buffer.clear()
|
||||
except serial.SerialException as e:
|
||||
print("Erreur de port série :", e)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
@@ -0,0 +1,54 @@
|
||||
import serial
|
||||
|
||||
def read_frame(ser):
|
||||
frame = []
|
||||
in_frame = False
|
||||
|
||||
while True:
|
||||
byte = ser.read()
|
||||
if not byte:
|
||||
continue
|
||||
b = byte[0]
|
||||
if not in_frame:
|
||||
if b == 0x02:
|
||||
frame = [b]
|
||||
in_frame = True
|
||||
else:
|
||||
frame.append(b)
|
||||
if len(frame) >= 2 and frame[-2] == 0x1B and frame[-1] == 0x03:
|
||||
return frame
|
||||
|
||||
def interpret_frame(frame):
|
||||
if len(frame) < 4 or frame[0] != 0x02 or frame[-2:] != [0x1B, 0x03]:
|
||||
return "Trame invalide"
|
||||
|
||||
op_code = frame[1]
|
||||
if op_code == 0x00:
|
||||
return f"[Erreur] Code: {hex(frame[2])}"
|
||||
elif op_code == 0x01:
|
||||
state = frame[2]
|
||||
return f"[Wi-Fi] {'UP' if state == 1 else 'DOWN'}"
|
||||
elif op_code == 0x02:
|
||||
status = 'Connecté' if frame[2] == 0x01 else 'Déconnecté'
|
||||
mac = ':'.join(f"{b:02X}" for b in frame[3:9])
|
||||
return f"[Connexion] {status} - {mac}"
|
||||
elif op_code == 0x03:
|
||||
return "[Demande appareils connectés]"
|
||||
elif op_code == 0x04:
|
||||
mac = ':'.join(f"{b:02X}" for b in frame[2:8])
|
||||
length = frame[8]
|
||||
msg = bytes(frame[9:9+length]).decode(errors='ignore')
|
||||
return f"[Message] à {mac} : {msg}"
|
||||
else:
|
||||
return f"[OpCode inconnu] {hex(op_code)}"
|
||||
|
||||
def main():
|
||||
with serial.Serial('COM5', 115200, timeout=1) as ser:
|
||||
print("Lecture des trames...")
|
||||
while True:
|
||||
frame = read_frame(ser)
|
||||
info = interpret_frame(frame)
|
||||
print(info)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Reference in New Issue
Block a user