diff --git a/FaultycatModules/ConfigBoard.py b/FaultycatModules/ConfigBoard.py new file mode 100644 index 0000000..f795f79 --- /dev/null +++ b/FaultycatModules/ConfigBoard.py @@ -0,0 +1,61 @@ +from enum import Enum + +class Commands(Enum): + COMMAND_HELP = "h" + COMMAND_ARM = "a" + COMMAND_DISARM = "d" + COMMAND_PULSE = "p" + COMMAND_ENABLE_TIMEOUT = "en" + COMMAND_DISABLE_TIMEOUT = "di" + COMMAND_FAST_TRIGGER = "f" + COMMAND_FAST_TRIGGER_CONF = "fa" + COMMAND_INTERNAL_HVP = "ih" + COMMAND_EXTERNAL_HVP = "eh" + COMMAND_CONFIGURE = "c" + COMMAND_TOGGLE_GPIO = "t" + COMMAND_STATUS = "s" + COMMAND_RESET = "r" + + def __str__(self): + return self.value + +class BoardStatus(Enum): + STATUS_ARMED = "armed" + STATUS_DISARMED = "disarmed" + STATUS_CHARGED = "charged" + STATUS_PULSE = "pulsed" + STATUS_NOT_CHARGED = "Not Charged" + STATUS_TIMEOUT_ACTIVE = "Timeout active" + STATUS_TIMEOUT_DEACT = "Timeout deactivated" + STATUS_HVP_INTERVAL = "HVP interval" + + def __str__(self): + return self.value + + @classmethod + def get_status_by_value(cls, value): + for status in cls.__members__.values(): + if status.value == value: + return status + return None + +class ConfigBoard: + BOARD_CONFIG = { + "pulse_time" : 1.0, + "pulse_power": 0.012200, + "pulse_count": 1, + "port" : "COM1" + } + def __init__(self) -> None: + self.board_config = ConfigBoard.BOARD_CONFIG + self.board_commands = Commands + + def get_config(self) -> dict: + return self.board_config + + def set_config(self, config: dict) -> None: + self.board_config = config + + def __str__(self) -> str: + return f"Board config: {self.board_config}" + \ No newline at end of file diff --git a/FaultycatModules/ConfigBoard.py b/FaultycatModules/ConfigBoard.py new file mode 100644 index 0000000..f795f79 --- /dev/null +++ b/FaultycatModules/ConfigBoard.py @@ -0,0 +1,61 @@ +from enum import Enum + +class Commands(Enum): + COMMAND_HELP = "h" + COMMAND_ARM = "a" + COMMAND_DISARM = "d" + COMMAND_PULSE = "p" + COMMAND_ENABLE_TIMEOUT = "en" + COMMAND_DISABLE_TIMEOUT = "di" + COMMAND_FAST_TRIGGER = "f" + COMMAND_FAST_TRIGGER_CONF = "fa" + COMMAND_INTERNAL_HVP = "ih" + COMMAND_EXTERNAL_HVP = "eh" + COMMAND_CONFIGURE = "c" + COMMAND_TOGGLE_GPIO = "t" + COMMAND_STATUS = "s" + COMMAND_RESET = "r" + + def __str__(self): + return self.value + +class BoardStatus(Enum): + STATUS_ARMED = "armed" + STATUS_DISARMED = "disarmed" + STATUS_CHARGED = "charged" + STATUS_PULSE = "pulsed" + STATUS_NOT_CHARGED = "Not Charged" + STATUS_TIMEOUT_ACTIVE = "Timeout active" + STATUS_TIMEOUT_DEACT = "Timeout deactivated" + STATUS_HVP_INTERVAL = "HVP interval" + + def __str__(self): + return self.value + + @classmethod + def get_status_by_value(cls, value): + for status in cls.__members__.values(): + if status.value == value: + return status + return None + +class ConfigBoard: + BOARD_CONFIG = { + "pulse_time" : 1.0, + "pulse_power": 0.012200, + "pulse_count": 1, + "port" : "COM1" + } + def __init__(self) -> None: + self.board_config = ConfigBoard.BOARD_CONFIG + self.board_commands = Commands + + def get_config(self) -> dict: + return self.board_config + + def set_config(self, config: dict) -> None: + self.board_config = config + + def __str__(self) -> str: + return f"Board config: {self.board_config}" + \ No newline at end of file diff --git a/FaultycatModules/UART.py b/FaultycatModules/UART.py new file mode 100644 index 0000000..3fd677f --- /dev/null +++ b/FaultycatModules/UART.py @@ -0,0 +1,97 @@ +import platform +import serial +import time +import serial.tools.list_ports +import threading + +from .ConfigBoard import BoardStatus + +if platform.system() == "Windows": + DEFAULT_COMPORT = "COM1" +else: + DEFAULT_COMPORT = "/dev/ttyACM0" + +DEFAULT_SERIAL_BAUDRATE = 921600 + +class UART(threading.Thread): + def __init__(self, serial_port: str = DEFAULT_COMPORT): + self.serial_worker = serial.Serial() + self.serial_worker.port = serial_port + self.serial_worker.baudrate = DEFAULT_SERIAL_BAUDRATE + self.recv_cancel = False + #self.daemon = True + + def __del__(self): + self.serial_worker.close() + + def __str__(self): + return f"Serial port: {self.serial_worker.port}" + + def set_serial_port(self, serial_port: str): + self.serial_worker.port = serial_port + + def set_serial_baudrate(self, serial_baudrate: int): + self.serial_worker.baudrate = serial_baudrate + + def is_valid_connection(self) -> bool: + try: + self.open() + self.close() + return True + except serial.SerialException as e: + return False + + def get_serial_ports(self): + return serial.tools.list_ports.comports() + + def reset_buffer(self): + self.serial_worker.reset_input_buffer() + self.serial_worker.reset_output_buffer() + + def cancel_recv(self): + self.recv_cancel = True + + def open(self): + self.serial_worker.open() + self.reset_buffer() + + def close(self): + self.reset_buffer() + self.serial_worker.close() + + def is_connected(self): + return self.serial_worker.is_open + + def send(self, data): + self.serial_worker.write(data) + self.serial_worker.write(b"\n\r") + self.serial_worker.flush() + + def recv(self): + if not self.is_connected(): + self.open() + try: + while not self.recv_cancel: + time.sleep(0.1) + + bytestream = self.serial_worker.readline() + if self.recv_cancel: + self.recv_cancel = False + return None + return bytestream + except serial.SerialException as e: + print(e) + return None + except KeyboardInterrupt: + self.recv_cancel = True + return None + + def send_recv(self, data): + self.send(data) + return self.recv() + + def stop_worker(self): + self.recv_cancel = True + self.reset_buffer() + self.close() + self.join() \ No newline at end of file diff --git a/FaultycatModules/ConfigBoard.py b/FaultycatModules/ConfigBoard.py new file mode 100644 index 0000000..f795f79 --- /dev/null +++ b/FaultycatModules/ConfigBoard.py @@ -0,0 +1,61 @@ +from enum import Enum + +class Commands(Enum): + COMMAND_HELP = "h" + COMMAND_ARM = "a" + COMMAND_DISARM = "d" + COMMAND_PULSE = "p" + COMMAND_ENABLE_TIMEOUT = "en" + COMMAND_DISABLE_TIMEOUT = "di" + COMMAND_FAST_TRIGGER = "f" + COMMAND_FAST_TRIGGER_CONF = "fa" + COMMAND_INTERNAL_HVP = "ih" + COMMAND_EXTERNAL_HVP = "eh" + COMMAND_CONFIGURE = "c" + COMMAND_TOGGLE_GPIO = "t" + COMMAND_STATUS = "s" + COMMAND_RESET = "r" + + def __str__(self): + return self.value + +class BoardStatus(Enum): + STATUS_ARMED = "armed" + STATUS_DISARMED = "disarmed" + STATUS_CHARGED = "charged" + STATUS_PULSE = "pulsed" + STATUS_NOT_CHARGED = "Not Charged" + STATUS_TIMEOUT_ACTIVE = "Timeout active" + STATUS_TIMEOUT_DEACT = "Timeout deactivated" + STATUS_HVP_INTERVAL = "HVP interval" + + def __str__(self): + return self.value + + @classmethod + def get_status_by_value(cls, value): + for status in cls.__members__.values(): + if status.value == value: + return status + return None + +class ConfigBoard: + BOARD_CONFIG = { + "pulse_time" : 1.0, + "pulse_power": 0.012200, + "pulse_count": 1, + "port" : "COM1" + } + def __init__(self) -> None: + self.board_config = ConfigBoard.BOARD_CONFIG + self.board_commands = Commands + + def get_config(self) -> dict: + return self.board_config + + def set_config(self, config: dict) -> None: + self.board_config = config + + def __str__(self) -> str: + return f"Board config: {self.board_config}" + \ No newline at end of file diff --git a/FaultycatModules/UART.py b/FaultycatModules/UART.py new file mode 100644 index 0000000..3fd677f --- /dev/null +++ b/FaultycatModules/UART.py @@ -0,0 +1,97 @@ +import platform +import serial +import time +import serial.tools.list_ports +import threading + +from .ConfigBoard import BoardStatus + +if platform.system() == "Windows": + DEFAULT_COMPORT = "COM1" +else: + DEFAULT_COMPORT = "/dev/ttyACM0" + +DEFAULT_SERIAL_BAUDRATE = 921600 + +class UART(threading.Thread): + def __init__(self, serial_port: str = DEFAULT_COMPORT): + self.serial_worker = serial.Serial() + self.serial_worker.port = serial_port + self.serial_worker.baudrate = DEFAULT_SERIAL_BAUDRATE + self.recv_cancel = False + #self.daemon = True + + def __del__(self): + self.serial_worker.close() + + def __str__(self): + return f"Serial port: {self.serial_worker.port}" + + def set_serial_port(self, serial_port: str): + self.serial_worker.port = serial_port + + def set_serial_baudrate(self, serial_baudrate: int): + self.serial_worker.baudrate = serial_baudrate + + def is_valid_connection(self) -> bool: + try: + self.open() + self.close() + return True + except serial.SerialException as e: + return False + + def get_serial_ports(self): + return serial.tools.list_ports.comports() + + def reset_buffer(self): + self.serial_worker.reset_input_buffer() + self.serial_worker.reset_output_buffer() + + def cancel_recv(self): + self.recv_cancel = True + + def open(self): + self.serial_worker.open() + self.reset_buffer() + + def close(self): + self.reset_buffer() + self.serial_worker.close() + + def is_connected(self): + return self.serial_worker.is_open + + def send(self, data): + self.serial_worker.write(data) + self.serial_worker.write(b"\n\r") + self.serial_worker.flush() + + def recv(self): + if not self.is_connected(): + self.open() + try: + while not self.recv_cancel: + time.sleep(0.1) + + bytestream = self.serial_worker.readline() + if self.recv_cancel: + self.recv_cancel = False + return None + return bytestream + except serial.SerialException as e: + print(e) + return None + except KeyboardInterrupt: + self.recv_cancel = True + return None + + def send_recv(self, data): + self.send(data) + return self.recv() + + def stop_worker(self): + self.recv_cancel = True + self.reset_buffer() + self.close() + self.join() \ No newline at end of file diff --git a/FaultycatModules/Worker.py b/FaultycatModules/Worker.py new file mode 100644 index 0000000..0bb2e54 --- /dev/null +++ b/FaultycatModules/Worker.py @@ -0,0 +1,68 @@ +import threading +import time +import typer +from rich.console import Console +from rich.table import Table + +from .UART import UART +from .ConfigBoard import ConfigBoard + +class FaultyWorker(threading.Thread): + def __init__(self): + super().__init__() + #self.daemon = True + self.workers = [] + self.board_uart = UART() + self.board_configurator = ConfigBoard() + self.pulse_count = self.board_configurator.BOARD_CONFIG["pulse_count"] + self.pulse_time = self.board_configurator.BOARD_CONFIG["pulse_time"] + + def add_worker(self, worker): + self.workers.append(worker) + + def stop_workers(self): + for worker in self.workers: + worker.join() + + def run_workers(self): + for worker in self.workers: + worker.start() + + def set_serial_port(self, serial_port): + self.board_uart.set_serial_port(serial_port) + + def validate_serial_connection(self): + return self.board_uart.is_valid_connection() + + def set_pulse_count(self, pulse_count): + self.pulse_count = pulse_count + self.board_configurator.BOARD_CONFIG["pulse_count"] = pulse_count + + def set_pulse_time(self, pulse_time): + self.pulse_time = pulse_time + self.board_configurator.BOARD_CONFIG["pulse_time"] = pulse_time + + def start_faulty_attack(self): + try: + self.board_uart.open() + time.sleep(0.1) + typer.secho("Board connected.", fg=typer.colors.GREEN) + typer.secho("[*] ARMING BOARD, BE CAREFULL!", fg=typer.colors.BRIGHT_YELLOW) + self.board_uart.send(self.board_configurator.board_commands.COMMAND_DISARM.value.encode("utf-8")) + time.sleep(1) + self.board_uart.send(self.board_configurator.board_commands.COMMAND_ARM.value.encode("utf-8")) + + typer.secho("[*] ARMED BOARD.", fg=typer.colors.BRIGHT_GREEN) + time.sleep(1) + typer.secho(f"[*] SENDING {self.pulse_count} PULSES.", fg=typer.colors.BRIGHT_GREEN) + for i in range(self.pulse_count): + typer.secho(f"\t- SENDING PULSE {i+1} OF {self.pulse_count}.", fg=typer.colors.BRIGHT_GREEN) + self.board_uart.send(self.board_configurator.board_commands.COMMAND_PULSE.value.encode("utf-8")) + time.sleep(self.pulse_time) + + typer.secho("DISARMING BOARD.", fg=typer.colors.BRIGHT_YELLOW) + self.board_uart.send(self.board_configurator.board_commands.COMMAND_DISARM.value.encode("utf-8")) + self.board_uart.close() + typer.secho("BOARD DISARMING.", fg=typer.colors.BRIGHT_YELLOW) + except Exception as e: + typer.secho(f"Error: {e}", fg=typer.colors.BRIGHT_RED) \ No newline at end of file diff --git a/FaultycatModules/ConfigBoard.py b/FaultycatModules/ConfigBoard.py new file mode 100644 index 0000000..f795f79 --- /dev/null +++ b/FaultycatModules/ConfigBoard.py @@ -0,0 +1,61 @@ +from enum import Enum + +class Commands(Enum): + COMMAND_HELP = "h" + COMMAND_ARM = "a" + COMMAND_DISARM = "d" + COMMAND_PULSE = "p" + COMMAND_ENABLE_TIMEOUT = "en" + COMMAND_DISABLE_TIMEOUT = "di" + COMMAND_FAST_TRIGGER = "f" + COMMAND_FAST_TRIGGER_CONF = "fa" + COMMAND_INTERNAL_HVP = "ih" + COMMAND_EXTERNAL_HVP = "eh" + COMMAND_CONFIGURE = "c" + COMMAND_TOGGLE_GPIO = "t" + COMMAND_STATUS = "s" + COMMAND_RESET = "r" + + def __str__(self): + return self.value + +class BoardStatus(Enum): + STATUS_ARMED = "armed" + STATUS_DISARMED = "disarmed" + STATUS_CHARGED = "charged" + STATUS_PULSE = "pulsed" + STATUS_NOT_CHARGED = "Not Charged" + STATUS_TIMEOUT_ACTIVE = "Timeout active" + STATUS_TIMEOUT_DEACT = "Timeout deactivated" + STATUS_HVP_INTERVAL = "HVP interval" + + def __str__(self): + return self.value + + @classmethod + def get_status_by_value(cls, value): + for status in cls.__members__.values(): + if status.value == value: + return status + return None + +class ConfigBoard: + BOARD_CONFIG = { + "pulse_time" : 1.0, + "pulse_power": 0.012200, + "pulse_count": 1, + "port" : "COM1" + } + def __init__(self) -> None: + self.board_config = ConfigBoard.BOARD_CONFIG + self.board_commands = Commands + + def get_config(self) -> dict: + return self.board_config + + def set_config(self, config: dict) -> None: + self.board_config = config + + def __str__(self) -> str: + return f"Board config: {self.board_config}" + \ No newline at end of file diff --git a/FaultycatModules/UART.py b/FaultycatModules/UART.py new file mode 100644 index 0000000..3fd677f --- /dev/null +++ b/FaultycatModules/UART.py @@ -0,0 +1,97 @@ +import platform +import serial +import time +import serial.tools.list_ports +import threading + +from .ConfigBoard import BoardStatus + +if platform.system() == "Windows": + DEFAULT_COMPORT = "COM1" +else: + DEFAULT_COMPORT = "/dev/ttyACM0" + +DEFAULT_SERIAL_BAUDRATE = 921600 + +class UART(threading.Thread): + def __init__(self, serial_port: str = DEFAULT_COMPORT): + self.serial_worker = serial.Serial() + self.serial_worker.port = serial_port + self.serial_worker.baudrate = DEFAULT_SERIAL_BAUDRATE + self.recv_cancel = False + #self.daemon = True + + def __del__(self): + self.serial_worker.close() + + def __str__(self): + return f"Serial port: {self.serial_worker.port}" + + def set_serial_port(self, serial_port: str): + self.serial_worker.port = serial_port + + def set_serial_baudrate(self, serial_baudrate: int): + self.serial_worker.baudrate = serial_baudrate + + def is_valid_connection(self) -> bool: + try: + self.open() + self.close() + return True + except serial.SerialException as e: + return False + + def get_serial_ports(self): + return serial.tools.list_ports.comports() + + def reset_buffer(self): + self.serial_worker.reset_input_buffer() + self.serial_worker.reset_output_buffer() + + def cancel_recv(self): + self.recv_cancel = True + + def open(self): + self.serial_worker.open() + self.reset_buffer() + + def close(self): + self.reset_buffer() + self.serial_worker.close() + + def is_connected(self): + return self.serial_worker.is_open + + def send(self, data): + self.serial_worker.write(data) + self.serial_worker.write(b"\n\r") + self.serial_worker.flush() + + def recv(self): + if not self.is_connected(): + self.open() + try: + while not self.recv_cancel: + time.sleep(0.1) + + bytestream = self.serial_worker.readline() + if self.recv_cancel: + self.recv_cancel = False + return None + return bytestream + except serial.SerialException as e: + print(e) + return None + except KeyboardInterrupt: + self.recv_cancel = True + return None + + def send_recv(self, data): + self.send(data) + return self.recv() + + def stop_worker(self): + self.recv_cancel = True + self.reset_buffer() + self.close() + self.join() \ No newline at end of file diff --git a/FaultycatModules/Worker.py b/FaultycatModules/Worker.py new file mode 100644 index 0000000..0bb2e54 --- /dev/null +++ b/FaultycatModules/Worker.py @@ -0,0 +1,68 @@ +import threading +import time +import typer +from rich.console import Console +from rich.table import Table + +from .UART import UART +from .ConfigBoard import ConfigBoard + +class FaultyWorker(threading.Thread): + def __init__(self): + super().__init__() + #self.daemon = True + self.workers = [] + self.board_uart = UART() + self.board_configurator = ConfigBoard() + self.pulse_count = self.board_configurator.BOARD_CONFIG["pulse_count"] + self.pulse_time = self.board_configurator.BOARD_CONFIG["pulse_time"] + + def add_worker(self, worker): + self.workers.append(worker) + + def stop_workers(self): + for worker in self.workers: + worker.join() + + def run_workers(self): + for worker in self.workers: + worker.start() + + def set_serial_port(self, serial_port): + self.board_uart.set_serial_port(serial_port) + + def validate_serial_connection(self): + return self.board_uart.is_valid_connection() + + def set_pulse_count(self, pulse_count): + self.pulse_count = pulse_count + self.board_configurator.BOARD_CONFIG["pulse_count"] = pulse_count + + def set_pulse_time(self, pulse_time): + self.pulse_time = pulse_time + self.board_configurator.BOARD_CONFIG["pulse_time"] = pulse_time + + def start_faulty_attack(self): + try: + self.board_uart.open() + time.sleep(0.1) + typer.secho("Board connected.", fg=typer.colors.GREEN) + typer.secho("[*] ARMING BOARD, BE CAREFULL!", fg=typer.colors.BRIGHT_YELLOW) + self.board_uart.send(self.board_configurator.board_commands.COMMAND_DISARM.value.encode("utf-8")) + time.sleep(1) + self.board_uart.send(self.board_configurator.board_commands.COMMAND_ARM.value.encode("utf-8")) + + typer.secho("[*] ARMED BOARD.", fg=typer.colors.BRIGHT_GREEN) + time.sleep(1) + typer.secho(f"[*] SENDING {self.pulse_count} PULSES.", fg=typer.colors.BRIGHT_GREEN) + for i in range(self.pulse_count): + typer.secho(f"\t- SENDING PULSE {i+1} OF {self.pulse_count}.", fg=typer.colors.BRIGHT_GREEN) + self.board_uart.send(self.board_configurator.board_commands.COMMAND_PULSE.value.encode("utf-8")) + time.sleep(self.pulse_time) + + typer.secho("DISARMING BOARD.", fg=typer.colors.BRIGHT_YELLOW) + self.board_uart.send(self.board_configurator.board_commands.COMMAND_DISARM.value.encode("utf-8")) + self.board_uart.close() + typer.secho("BOARD DISARMING.", fg=typer.colors.BRIGHT_YELLOW) + except Exception as e: + typer.secho(f"Error: {e}", fg=typer.colors.BRIGHT_RED) \ No newline at end of file diff --git a/functions.py b/functions.py index 66279b0..ee6bb8a 100644 --- a/functions.py +++ b/functions.py @@ -5,13 +5,14 @@ import serial import importlib from scope import Scope +from FaultycatModules import Worker from textual.widgets import Button, Input, Switch from textual.containers import Vertical import asyncio import functions -DEBUG_MODE = False +DEBUG_MODE = True app_instance = None # Global variable to store the app instance text_area = None # Store global reference to scrollable text area @@ -19,6 +20,10 @@ log_time = 0 # timestamp for logfile glitch_time = 0 # timestamp for when glitching started +# FaultyCat Variables +DEFAULT_FAULTY_COMPORT = "/dev/ttyACM0" +faulty_worker = Worker.FaultyWorker() + try: s = Scope() except IOError: @@ -77,6 +82,10 @@ if not hasattr(config, "CONFILE"): config.CONFILE = "config.py" # Or any suitable default return config.CONFILE + elif name == "uart_output_enabled": + if not hasattr(config, "UART_OUTPUT_ENABLED"): + config.UART_OUTPUT_ENABLED = False # Default to disabled + return config.UART_OUTPUT_ENABLED elif name.startswith("trigger_"): if "_value" in name: index = int(name.split('_')[1]) @@ -87,21 +96,35 @@ else: return 0 # Default fallback for unknown names -def set_config_value(name: str, value: int): +def set_config_value(name: str, value): + """Set the value of a config variable and update the UI if applicable.""" + attr_name = name.upper() - if hasattr(config, name.upper()): - current_value = getattr(config, name.upper()) - setattr(config, name.upper(), value) + # Create the attribute if it doesn't exist + if not hasattr(config, attr_name): + setattr(config, attr_name, value) + else: + setattr(config, attr_name, value) - # Update corresponding Input field + # Safely update corresponding input field if it exists + try: input_field = app_instance.query_one(f"#{name}_input") input_field.value = str(value) + except Exception: + # No input field exists for this config; ignore + pass - # Update the status box row + # Safely update status box row if possible + try: update_status_box(app_instance, name, value) + except Exception: + pass - # Refresh UI to reflect changes + # Refresh UI + try: app_instance.refresh() + except Exception: + pass def get_condition_string(index): """Returns the string from the triggers list at the given index.""" @@ -493,8 +516,8 @@ write_timeout=1.0, # Write timeout inter_byte_timeout=0.05, # Between bytes exclusive=True, # Prevent multiple access - rtscts=True, # Enable hardware flow control - dsrdtr=True # Additional flow control + rtscts=False, # Enable hardware flow control (disable for tigard) + dsrdtr=False # Additional flow control (disable for tigard) ) add_text("Connected to serial port.") return ser @@ -525,6 +548,38 @@ log_message(f"[UART TX ERROR] {str(e)}") return False +def flush_uart_buffer(): + """Flush UART buffers and clear app_instance.serial_buffer.""" + conn = getattr(app_instance, "serial_connection", None) + if not conn or not conn.is_open: + log_message("[UART] Flush skipped - No connection") + return False + try: + conn.reset_input_buffer() + conn.reset_output_buffer() + app_instance.serial_buffer = "" + log_message("[UART] Buffers flushed") + return True + except Exception as e: + log_message(f"[UART FLUSH ERROR] {e}") + return False + +def read_uart_buffer(): + """Read data into app_instance.serial_buffer and return it.""" + conn = getattr(app_instance, "serial_connection", None) + if not conn or not conn.is_open: + log_message("[UART] Read skipped - No connection") + return app_instance.serial_buffer + try: + data = conn.read_all().decode("utf-8", errors="replace") + if data: + app_instance.serial_buffer += data + log_message(f"[UART] Buffer read: {app_instance.serial_buffer.strip()}") + return app_instance.serial_buffer + except Exception as e: + log_message(f"[UART RX ERROR] {e}") + return app_instance.serial_buffer + def get_conditions_buffer_size(debug=False): """Return the maximum length of condition strings with debug option""" if not hasattr(config, 'conditions') or not config.conditions: @@ -545,8 +600,8 @@ def check_conditions(self, buffer, debug=False): """Check buffer against all conditions by examining every position""" - if debug: - log_message(f"[DEBUG] Checking buffer ({len(buffer)} chars): {repr(buffer)}") + #if debug: + #log_message(f"[DEBUG] Checking buffer ({len(buffer)} chars): {repr(buffer)}") if not hasattr(config, 'conditions') or not config.conditions: if debug: @@ -561,8 +616,8 @@ trigger_len = len(trigger_str) buffer_len = len(buffer) - if debug: - log_message(f"[DEBUG] Checking condition {i} for '{trigger_str}' (length: {trigger_len})") + #if debug: + #log_message(f"[DEBUG] Checking condition {i} for '{trigger_str}' (length: {trigger_len})") # Check every possible starting position in the buffer for pos in range(buffer_len - trigger_len + 1): @@ -585,8 +640,8 @@ log_message(f"[DEBUG] Condition check failed for {i}: {str(e)}") continue - if debug: - log_message("[DEBUG] No conditions matched") + #if debug: + #log_message("[DEBUG] No conditions matched") return None def execute_condition_action(action_name, debug=False): @@ -763,4 +818,66 @@ text_area.clear() def end_program(): - exit() \ No newline at end of file + exit() + +################## +# Faultycat stuff +################## + +def faulty_connect(comport: str = DEFAULT_FAULTY_COMPORT) -> bool: + try: + faulty_worker.set_serial_port(comport) + if not faulty_worker.validate_serial_connection(): + #if debug: + #log_message(f"Connection failed on {comport}") + return False + faulty_worker.board_uart.open() + time.sleep(0.1) + #if debug: + #log_message("Board connected") + return True + except Exception as e: + #if debug: + #log_message(f"Connection error: {e}") + return False + +def faulty_arm() -> bool: + try: + uart, cmd = faulty_worker.board_uart, faulty_worker.board_configurator.board_commands + uart.send(cmd.COMMAND_DISARM.value.encode("utf-8")) + time.sleep(1) + uart.send(cmd.COMMAND_ARM.value.encode("utf-8")) + #if debug: + #log_message("Board armed") + return True + except Exception as e: + #if debug: + #log_message(f"Arm error: {e}") + return False + +def faulty_send_pulse() -> bool: + try: + faulty_worker.board_uart.send( + faulty_worker.board_configurator.board_commands.COMMAND_PULSE.value.encode("utf-8") + ) + #if debug: + #log_message("Pulse sent") + return True + except Exception as e: + #if debug: + #log_message(f"Pulse error: {e}") + return False + +def faulty_disarm(close_uart: bool = True) -> bool: + try: + uart, cmd = faulty_worker.board_uart, faulty_worker.board_configurator.board_commands + uart.send(cmd.COMMAND_DISARM.value.encode("utf-8")) + if close_uart: + uart.close() + #if debug: + #log_message("Board disarmed") + return True + except Exception as e: + #if debug: + #log_message(f"Disarm error: {e}") + return False \ No newline at end of file diff --git a/FaultycatModules/ConfigBoard.py b/FaultycatModules/ConfigBoard.py new file mode 100644 index 0000000..f795f79 --- /dev/null +++ b/FaultycatModules/ConfigBoard.py @@ -0,0 +1,61 @@ +from enum import Enum + +class Commands(Enum): + COMMAND_HELP = "h" + COMMAND_ARM = "a" + COMMAND_DISARM = "d" + COMMAND_PULSE = "p" + COMMAND_ENABLE_TIMEOUT = "en" + COMMAND_DISABLE_TIMEOUT = "di" + COMMAND_FAST_TRIGGER = "f" + COMMAND_FAST_TRIGGER_CONF = "fa" + COMMAND_INTERNAL_HVP = "ih" + COMMAND_EXTERNAL_HVP = "eh" + COMMAND_CONFIGURE = "c" + COMMAND_TOGGLE_GPIO = "t" + COMMAND_STATUS = "s" + COMMAND_RESET = "r" + + def __str__(self): + return self.value + +class BoardStatus(Enum): + STATUS_ARMED = "armed" + STATUS_DISARMED = "disarmed" + STATUS_CHARGED = "charged" + STATUS_PULSE = "pulsed" + STATUS_NOT_CHARGED = "Not Charged" + STATUS_TIMEOUT_ACTIVE = "Timeout active" + STATUS_TIMEOUT_DEACT = "Timeout deactivated" + STATUS_HVP_INTERVAL = "HVP interval" + + def __str__(self): + return self.value + + @classmethod + def get_status_by_value(cls, value): + for status in cls.__members__.values(): + if status.value == value: + return status + return None + +class ConfigBoard: + BOARD_CONFIG = { + "pulse_time" : 1.0, + "pulse_power": 0.012200, + "pulse_count": 1, + "port" : "COM1" + } + def __init__(self) -> None: + self.board_config = ConfigBoard.BOARD_CONFIG + self.board_commands = Commands + + def get_config(self) -> dict: + return self.board_config + + def set_config(self, config: dict) -> None: + self.board_config = config + + def __str__(self) -> str: + return f"Board config: {self.board_config}" + \ No newline at end of file diff --git a/FaultycatModules/UART.py b/FaultycatModules/UART.py new file mode 100644 index 0000000..3fd677f --- /dev/null +++ b/FaultycatModules/UART.py @@ -0,0 +1,97 @@ +import platform +import serial +import time +import serial.tools.list_ports +import threading + +from .ConfigBoard import BoardStatus + +if platform.system() == "Windows": + DEFAULT_COMPORT = "COM1" +else: + DEFAULT_COMPORT = "/dev/ttyACM0" + +DEFAULT_SERIAL_BAUDRATE = 921600 + +class UART(threading.Thread): + def __init__(self, serial_port: str = DEFAULT_COMPORT): + self.serial_worker = serial.Serial() + self.serial_worker.port = serial_port + self.serial_worker.baudrate = DEFAULT_SERIAL_BAUDRATE + self.recv_cancel = False + #self.daemon = True + + def __del__(self): + self.serial_worker.close() + + def __str__(self): + return f"Serial port: {self.serial_worker.port}" + + def set_serial_port(self, serial_port: str): + self.serial_worker.port = serial_port + + def set_serial_baudrate(self, serial_baudrate: int): + self.serial_worker.baudrate = serial_baudrate + + def is_valid_connection(self) -> bool: + try: + self.open() + self.close() + return True + except serial.SerialException as e: + return False + + def get_serial_ports(self): + return serial.tools.list_ports.comports() + + def reset_buffer(self): + self.serial_worker.reset_input_buffer() + self.serial_worker.reset_output_buffer() + + def cancel_recv(self): + self.recv_cancel = True + + def open(self): + self.serial_worker.open() + self.reset_buffer() + + def close(self): + self.reset_buffer() + self.serial_worker.close() + + def is_connected(self): + return self.serial_worker.is_open + + def send(self, data): + self.serial_worker.write(data) + self.serial_worker.write(b"\n\r") + self.serial_worker.flush() + + def recv(self): + if not self.is_connected(): + self.open() + try: + while not self.recv_cancel: + time.sleep(0.1) + + bytestream = self.serial_worker.readline() + if self.recv_cancel: + self.recv_cancel = False + return None + return bytestream + except serial.SerialException as e: + print(e) + return None + except KeyboardInterrupt: + self.recv_cancel = True + return None + + def send_recv(self, data): + self.send(data) + return self.recv() + + def stop_worker(self): + self.recv_cancel = True + self.reset_buffer() + self.close() + self.join() \ No newline at end of file diff --git a/FaultycatModules/Worker.py b/FaultycatModules/Worker.py new file mode 100644 index 0000000..0bb2e54 --- /dev/null +++ b/FaultycatModules/Worker.py @@ -0,0 +1,68 @@ +import threading +import time +import typer +from rich.console import Console +from rich.table import Table + +from .UART import UART +from .ConfigBoard import ConfigBoard + +class FaultyWorker(threading.Thread): + def __init__(self): + super().__init__() + #self.daemon = True + self.workers = [] + self.board_uart = UART() + self.board_configurator = ConfigBoard() + self.pulse_count = self.board_configurator.BOARD_CONFIG["pulse_count"] + self.pulse_time = self.board_configurator.BOARD_CONFIG["pulse_time"] + + def add_worker(self, worker): + self.workers.append(worker) + + def stop_workers(self): + for worker in self.workers: + worker.join() + + def run_workers(self): + for worker in self.workers: + worker.start() + + def set_serial_port(self, serial_port): + self.board_uart.set_serial_port(serial_port) + + def validate_serial_connection(self): + return self.board_uart.is_valid_connection() + + def set_pulse_count(self, pulse_count): + self.pulse_count = pulse_count + self.board_configurator.BOARD_CONFIG["pulse_count"] = pulse_count + + def set_pulse_time(self, pulse_time): + self.pulse_time = pulse_time + self.board_configurator.BOARD_CONFIG["pulse_time"] = pulse_time + + def start_faulty_attack(self): + try: + self.board_uart.open() + time.sleep(0.1) + typer.secho("Board connected.", fg=typer.colors.GREEN) + typer.secho("[*] ARMING BOARD, BE CAREFULL!", fg=typer.colors.BRIGHT_YELLOW) + self.board_uart.send(self.board_configurator.board_commands.COMMAND_DISARM.value.encode("utf-8")) + time.sleep(1) + self.board_uart.send(self.board_configurator.board_commands.COMMAND_ARM.value.encode("utf-8")) + + typer.secho("[*] ARMED BOARD.", fg=typer.colors.BRIGHT_GREEN) + time.sleep(1) + typer.secho(f"[*] SENDING {self.pulse_count} PULSES.", fg=typer.colors.BRIGHT_GREEN) + for i in range(self.pulse_count): + typer.secho(f"\t- SENDING PULSE {i+1} OF {self.pulse_count}.", fg=typer.colors.BRIGHT_GREEN) + self.board_uart.send(self.board_configurator.board_commands.COMMAND_PULSE.value.encode("utf-8")) + time.sleep(self.pulse_time) + + typer.secho("DISARMING BOARD.", fg=typer.colors.BRIGHT_YELLOW) + self.board_uart.send(self.board_configurator.board_commands.COMMAND_DISARM.value.encode("utf-8")) + self.board_uart.close() + typer.secho("BOARD DISARMING.", fg=typer.colors.BRIGHT_YELLOW) + except Exception as e: + typer.secho(f"Error: {e}", fg=typer.colors.BRIGHT_RED) \ No newline at end of file diff --git a/functions.py b/functions.py index 66279b0..ee6bb8a 100644 --- a/functions.py +++ b/functions.py @@ -5,13 +5,14 @@ import serial import importlib from scope import Scope +from FaultycatModules import Worker from textual.widgets import Button, Input, Switch from textual.containers import Vertical import asyncio import functions -DEBUG_MODE = False +DEBUG_MODE = True app_instance = None # Global variable to store the app instance text_area = None # Store global reference to scrollable text area @@ -19,6 +20,10 @@ log_time = 0 # timestamp for logfile glitch_time = 0 # timestamp for when glitching started +# FaultyCat Variables +DEFAULT_FAULTY_COMPORT = "/dev/ttyACM0" +faulty_worker = Worker.FaultyWorker() + try: s = Scope() except IOError: @@ -77,6 +82,10 @@ if not hasattr(config, "CONFILE"): config.CONFILE = "config.py" # Or any suitable default return config.CONFILE + elif name == "uart_output_enabled": + if not hasattr(config, "UART_OUTPUT_ENABLED"): + config.UART_OUTPUT_ENABLED = False # Default to disabled + return config.UART_OUTPUT_ENABLED elif name.startswith("trigger_"): if "_value" in name: index = int(name.split('_')[1]) @@ -87,21 +96,35 @@ else: return 0 # Default fallback for unknown names -def set_config_value(name: str, value: int): +def set_config_value(name: str, value): + """Set the value of a config variable and update the UI if applicable.""" + attr_name = name.upper() - if hasattr(config, name.upper()): - current_value = getattr(config, name.upper()) - setattr(config, name.upper(), value) + # Create the attribute if it doesn't exist + if not hasattr(config, attr_name): + setattr(config, attr_name, value) + else: + setattr(config, attr_name, value) - # Update corresponding Input field + # Safely update corresponding input field if it exists + try: input_field = app_instance.query_one(f"#{name}_input") input_field.value = str(value) + except Exception: + # No input field exists for this config; ignore + pass - # Update the status box row + # Safely update status box row if possible + try: update_status_box(app_instance, name, value) + except Exception: + pass - # Refresh UI to reflect changes + # Refresh UI + try: app_instance.refresh() + except Exception: + pass def get_condition_string(index): """Returns the string from the triggers list at the given index.""" @@ -493,8 +516,8 @@ write_timeout=1.0, # Write timeout inter_byte_timeout=0.05, # Between bytes exclusive=True, # Prevent multiple access - rtscts=True, # Enable hardware flow control - dsrdtr=True # Additional flow control + rtscts=False, # Enable hardware flow control (disable for tigard) + dsrdtr=False # Additional flow control (disable for tigard) ) add_text("Connected to serial port.") return ser @@ -525,6 +548,38 @@ log_message(f"[UART TX ERROR] {str(e)}") return False +def flush_uart_buffer(): + """Flush UART buffers and clear app_instance.serial_buffer.""" + conn = getattr(app_instance, "serial_connection", None) + if not conn or not conn.is_open: + log_message("[UART] Flush skipped - No connection") + return False + try: + conn.reset_input_buffer() + conn.reset_output_buffer() + app_instance.serial_buffer = "" + log_message("[UART] Buffers flushed") + return True + except Exception as e: + log_message(f"[UART FLUSH ERROR] {e}") + return False + +def read_uart_buffer(): + """Read data into app_instance.serial_buffer and return it.""" + conn = getattr(app_instance, "serial_connection", None) + if not conn or not conn.is_open: + log_message("[UART] Read skipped - No connection") + return app_instance.serial_buffer + try: + data = conn.read_all().decode("utf-8", errors="replace") + if data: + app_instance.serial_buffer += data + log_message(f"[UART] Buffer read: {app_instance.serial_buffer.strip()}") + return app_instance.serial_buffer + except Exception as e: + log_message(f"[UART RX ERROR] {e}") + return app_instance.serial_buffer + def get_conditions_buffer_size(debug=False): """Return the maximum length of condition strings with debug option""" if not hasattr(config, 'conditions') or not config.conditions: @@ -545,8 +600,8 @@ def check_conditions(self, buffer, debug=False): """Check buffer against all conditions by examining every position""" - if debug: - log_message(f"[DEBUG] Checking buffer ({len(buffer)} chars): {repr(buffer)}") + #if debug: + #log_message(f"[DEBUG] Checking buffer ({len(buffer)} chars): {repr(buffer)}") if not hasattr(config, 'conditions') or not config.conditions: if debug: @@ -561,8 +616,8 @@ trigger_len = len(trigger_str) buffer_len = len(buffer) - if debug: - log_message(f"[DEBUG] Checking condition {i} for '{trigger_str}' (length: {trigger_len})") + #if debug: + #log_message(f"[DEBUG] Checking condition {i} for '{trigger_str}' (length: {trigger_len})") # Check every possible starting position in the buffer for pos in range(buffer_len - trigger_len + 1): @@ -585,8 +640,8 @@ log_message(f"[DEBUG] Condition check failed for {i}: {str(e)}") continue - if debug: - log_message("[DEBUG] No conditions matched") + #if debug: + #log_message("[DEBUG] No conditions matched") return None def execute_condition_action(action_name, debug=False): @@ -763,4 +818,66 @@ text_area.clear() def end_program(): - exit() \ No newline at end of file + exit() + +################## +# Faultycat stuff +################## + +def faulty_connect(comport: str = DEFAULT_FAULTY_COMPORT) -> bool: + try: + faulty_worker.set_serial_port(comport) + if not faulty_worker.validate_serial_connection(): + #if debug: + #log_message(f"Connection failed on {comport}") + return False + faulty_worker.board_uart.open() + time.sleep(0.1) + #if debug: + #log_message("Board connected") + return True + except Exception as e: + #if debug: + #log_message(f"Connection error: {e}") + return False + +def faulty_arm() -> bool: + try: + uart, cmd = faulty_worker.board_uart, faulty_worker.board_configurator.board_commands + uart.send(cmd.COMMAND_DISARM.value.encode("utf-8")) + time.sleep(1) + uart.send(cmd.COMMAND_ARM.value.encode("utf-8")) + #if debug: + #log_message("Board armed") + return True + except Exception as e: + #if debug: + #log_message(f"Arm error: {e}") + return False + +def faulty_send_pulse() -> bool: + try: + faulty_worker.board_uart.send( + faulty_worker.board_configurator.board_commands.COMMAND_PULSE.value.encode("utf-8") + ) + #if debug: + #log_message("Pulse sent") + return True + except Exception as e: + #if debug: + #log_message(f"Pulse error: {e}") + return False + +def faulty_disarm(close_uart: bool = True) -> bool: + try: + uart, cmd = faulty_worker.board_uart, faulty_worker.board_configurator.board_commands + uart.send(cmd.COMMAND_DISARM.value.encode("utf-8")) + if close_uart: + uart.close() + #if debug: + #log_message("Board disarmed") + return True + except Exception as e: + #if debug: + #log_message(f"Disarm error: {e}") + return False \ No newline at end of file diff --git a/glitch-o-bolt.py b/glitch-o-bolt.py index aba439b..ba3d95f 100644 --- a/glitch-o-bolt.py +++ b/glitch-o-bolt.py @@ -229,7 +229,8 @@ """Display serial data exactly as received""" if hasattr(functions, 'text_area'): # Write the data exactly as it should appear - functions.text_area.write(message.data) + if functions.get_config_value("uart_output_enabled") is True: + functions.text_area.write(message.data) log_time = functions.get_config_value("log_time") if log_time > 0: @@ -329,6 +330,15 @@ else: main_content.border_title = f"{port} {baud} \\[{log_time}.log]" + if "uart-output-switch" in switch.classes: + if bool(event.value) is True: + functions.set_config_value("uart_output_enabled", True) + else: + functions.set_config_value("uart_output_enabled", False) + functions.log_message(f"[FUNCTION] uart output toggled: {event.value}") + + + if "glitch-switch" in switch.classes: if functions.DEBUG_MODE: curr_time = functions.get_config_value("glitch_time") @@ -473,9 +483,12 @@ with sidebar_content3: with Grid(classes="sidebar_settings_switches"): # Add rows with switches - yield Static(f"uart") + yield Static(f"uart enable") yield Switch(classes="sidebar_trigger_switch", value=False, animate=False, id="uart_switch") + yield Static(f"uart output") + yield Switch(classes="uart-output-switch sidebar_trigger_switch", value=False, animate=False) + yield Static(f"logging") yield Switch(classes="logging-switch sidebar_trigger_switch", value=False, animate=False) diff --git a/FaultycatModules/ConfigBoard.py b/FaultycatModules/ConfigBoard.py new file mode 100644 index 0000000..f795f79 --- /dev/null +++ b/FaultycatModules/ConfigBoard.py @@ -0,0 +1,61 @@ +from enum import Enum + +class Commands(Enum): + COMMAND_HELP = "h" + COMMAND_ARM = "a" + COMMAND_DISARM = "d" + COMMAND_PULSE = "p" + COMMAND_ENABLE_TIMEOUT = "en" + COMMAND_DISABLE_TIMEOUT = "di" + COMMAND_FAST_TRIGGER = "f" + COMMAND_FAST_TRIGGER_CONF = "fa" + COMMAND_INTERNAL_HVP = "ih" + COMMAND_EXTERNAL_HVP = "eh" + COMMAND_CONFIGURE = "c" + COMMAND_TOGGLE_GPIO = "t" + COMMAND_STATUS = "s" + COMMAND_RESET = "r" + + def __str__(self): + return self.value + +class BoardStatus(Enum): + STATUS_ARMED = "armed" + STATUS_DISARMED = "disarmed" + STATUS_CHARGED = "charged" + STATUS_PULSE = "pulsed" + STATUS_NOT_CHARGED = "Not Charged" + STATUS_TIMEOUT_ACTIVE = "Timeout active" + STATUS_TIMEOUT_DEACT = "Timeout deactivated" + STATUS_HVP_INTERVAL = "HVP interval" + + def __str__(self): + return self.value + + @classmethod + def get_status_by_value(cls, value): + for status in cls.__members__.values(): + if status.value == value: + return status + return None + +class ConfigBoard: + BOARD_CONFIG = { + "pulse_time" : 1.0, + "pulse_power": 0.012200, + "pulse_count": 1, + "port" : "COM1" + } + def __init__(self) -> None: + self.board_config = ConfigBoard.BOARD_CONFIG + self.board_commands = Commands + + def get_config(self) -> dict: + return self.board_config + + def set_config(self, config: dict) -> None: + self.board_config = config + + def __str__(self) -> str: + return f"Board config: {self.board_config}" + \ No newline at end of file diff --git a/FaultycatModules/UART.py b/FaultycatModules/UART.py new file mode 100644 index 0000000..3fd677f --- /dev/null +++ b/FaultycatModules/UART.py @@ -0,0 +1,97 @@ +import platform +import serial +import time +import serial.tools.list_ports +import threading + +from .ConfigBoard import BoardStatus + +if platform.system() == "Windows": + DEFAULT_COMPORT = "COM1" +else: + DEFAULT_COMPORT = "/dev/ttyACM0" + +DEFAULT_SERIAL_BAUDRATE = 921600 + +class UART(threading.Thread): + def __init__(self, serial_port: str = DEFAULT_COMPORT): + self.serial_worker = serial.Serial() + self.serial_worker.port = serial_port + self.serial_worker.baudrate = DEFAULT_SERIAL_BAUDRATE + self.recv_cancel = False + #self.daemon = True + + def __del__(self): + self.serial_worker.close() + + def __str__(self): + return f"Serial port: {self.serial_worker.port}" + + def set_serial_port(self, serial_port: str): + self.serial_worker.port = serial_port + + def set_serial_baudrate(self, serial_baudrate: int): + self.serial_worker.baudrate = serial_baudrate + + def is_valid_connection(self) -> bool: + try: + self.open() + self.close() + return True + except serial.SerialException as e: + return False + + def get_serial_ports(self): + return serial.tools.list_ports.comports() + + def reset_buffer(self): + self.serial_worker.reset_input_buffer() + self.serial_worker.reset_output_buffer() + + def cancel_recv(self): + self.recv_cancel = True + + def open(self): + self.serial_worker.open() + self.reset_buffer() + + def close(self): + self.reset_buffer() + self.serial_worker.close() + + def is_connected(self): + return self.serial_worker.is_open + + def send(self, data): + self.serial_worker.write(data) + self.serial_worker.write(b"\n\r") + self.serial_worker.flush() + + def recv(self): + if not self.is_connected(): + self.open() + try: + while not self.recv_cancel: + time.sleep(0.1) + + bytestream = self.serial_worker.readline() + if self.recv_cancel: + self.recv_cancel = False + return None + return bytestream + except serial.SerialException as e: + print(e) + return None + except KeyboardInterrupt: + self.recv_cancel = True + return None + + def send_recv(self, data): + self.send(data) + return self.recv() + + def stop_worker(self): + self.recv_cancel = True + self.reset_buffer() + self.close() + self.join() \ No newline at end of file diff --git a/FaultycatModules/Worker.py b/FaultycatModules/Worker.py new file mode 100644 index 0000000..0bb2e54 --- /dev/null +++ b/FaultycatModules/Worker.py @@ -0,0 +1,68 @@ +import threading +import time +import typer +from rich.console import Console +from rich.table import Table + +from .UART import UART +from .ConfigBoard import ConfigBoard + +class FaultyWorker(threading.Thread): + def __init__(self): + super().__init__() + #self.daemon = True + self.workers = [] + self.board_uart = UART() + self.board_configurator = ConfigBoard() + self.pulse_count = self.board_configurator.BOARD_CONFIG["pulse_count"] + self.pulse_time = self.board_configurator.BOARD_CONFIG["pulse_time"] + + def add_worker(self, worker): + self.workers.append(worker) + + def stop_workers(self): + for worker in self.workers: + worker.join() + + def run_workers(self): + for worker in self.workers: + worker.start() + + def set_serial_port(self, serial_port): + self.board_uart.set_serial_port(serial_port) + + def validate_serial_connection(self): + return self.board_uart.is_valid_connection() + + def set_pulse_count(self, pulse_count): + self.pulse_count = pulse_count + self.board_configurator.BOARD_CONFIG["pulse_count"] = pulse_count + + def set_pulse_time(self, pulse_time): + self.pulse_time = pulse_time + self.board_configurator.BOARD_CONFIG["pulse_time"] = pulse_time + + def start_faulty_attack(self): + try: + self.board_uart.open() + time.sleep(0.1) + typer.secho("Board connected.", fg=typer.colors.GREEN) + typer.secho("[*] ARMING BOARD, BE CAREFULL!", fg=typer.colors.BRIGHT_YELLOW) + self.board_uart.send(self.board_configurator.board_commands.COMMAND_DISARM.value.encode("utf-8")) + time.sleep(1) + self.board_uart.send(self.board_configurator.board_commands.COMMAND_ARM.value.encode("utf-8")) + + typer.secho("[*] ARMED BOARD.", fg=typer.colors.BRIGHT_GREEN) + time.sleep(1) + typer.secho(f"[*] SENDING {self.pulse_count} PULSES.", fg=typer.colors.BRIGHT_GREEN) + for i in range(self.pulse_count): + typer.secho(f"\t- SENDING PULSE {i+1} OF {self.pulse_count}.", fg=typer.colors.BRIGHT_GREEN) + self.board_uart.send(self.board_configurator.board_commands.COMMAND_PULSE.value.encode("utf-8")) + time.sleep(self.pulse_time) + + typer.secho("DISARMING BOARD.", fg=typer.colors.BRIGHT_YELLOW) + self.board_uart.send(self.board_configurator.board_commands.COMMAND_DISARM.value.encode("utf-8")) + self.board_uart.close() + typer.secho("BOARD DISARMING.", fg=typer.colors.BRIGHT_YELLOW) + except Exception as e: + typer.secho(f"Error: {e}", fg=typer.colors.BRIGHT_RED) \ No newline at end of file diff --git a/functions.py b/functions.py index 66279b0..ee6bb8a 100644 --- a/functions.py +++ b/functions.py @@ -5,13 +5,14 @@ import serial import importlib from scope import Scope +from FaultycatModules import Worker from textual.widgets import Button, Input, Switch from textual.containers import Vertical import asyncio import functions -DEBUG_MODE = False +DEBUG_MODE = True app_instance = None # Global variable to store the app instance text_area = None # Store global reference to scrollable text area @@ -19,6 +20,10 @@ log_time = 0 # timestamp for logfile glitch_time = 0 # timestamp for when glitching started +# FaultyCat Variables +DEFAULT_FAULTY_COMPORT = "/dev/ttyACM0" +faulty_worker = Worker.FaultyWorker() + try: s = Scope() except IOError: @@ -77,6 +82,10 @@ if not hasattr(config, "CONFILE"): config.CONFILE = "config.py" # Or any suitable default return config.CONFILE + elif name == "uart_output_enabled": + if not hasattr(config, "UART_OUTPUT_ENABLED"): + config.UART_OUTPUT_ENABLED = False # Default to disabled + return config.UART_OUTPUT_ENABLED elif name.startswith("trigger_"): if "_value" in name: index = int(name.split('_')[1]) @@ -87,21 +96,35 @@ else: return 0 # Default fallback for unknown names -def set_config_value(name: str, value: int): +def set_config_value(name: str, value): + """Set the value of a config variable and update the UI if applicable.""" + attr_name = name.upper() - if hasattr(config, name.upper()): - current_value = getattr(config, name.upper()) - setattr(config, name.upper(), value) + # Create the attribute if it doesn't exist + if not hasattr(config, attr_name): + setattr(config, attr_name, value) + else: + setattr(config, attr_name, value) - # Update corresponding Input field + # Safely update corresponding input field if it exists + try: input_field = app_instance.query_one(f"#{name}_input") input_field.value = str(value) + except Exception: + # No input field exists for this config; ignore + pass - # Update the status box row + # Safely update status box row if possible + try: update_status_box(app_instance, name, value) + except Exception: + pass - # Refresh UI to reflect changes + # Refresh UI + try: app_instance.refresh() + except Exception: + pass def get_condition_string(index): """Returns the string from the triggers list at the given index.""" @@ -493,8 +516,8 @@ write_timeout=1.0, # Write timeout inter_byte_timeout=0.05, # Between bytes exclusive=True, # Prevent multiple access - rtscts=True, # Enable hardware flow control - dsrdtr=True # Additional flow control + rtscts=False, # Enable hardware flow control (disable for tigard) + dsrdtr=False # Additional flow control (disable for tigard) ) add_text("Connected to serial port.") return ser @@ -525,6 +548,38 @@ log_message(f"[UART TX ERROR] {str(e)}") return False +def flush_uart_buffer(): + """Flush UART buffers and clear app_instance.serial_buffer.""" + conn = getattr(app_instance, "serial_connection", None) + if not conn or not conn.is_open: + log_message("[UART] Flush skipped - No connection") + return False + try: + conn.reset_input_buffer() + conn.reset_output_buffer() + app_instance.serial_buffer = "" + log_message("[UART] Buffers flushed") + return True + except Exception as e: + log_message(f"[UART FLUSH ERROR] {e}") + return False + +def read_uart_buffer(): + """Read data into app_instance.serial_buffer and return it.""" + conn = getattr(app_instance, "serial_connection", None) + if not conn or not conn.is_open: + log_message("[UART] Read skipped - No connection") + return app_instance.serial_buffer + try: + data = conn.read_all().decode("utf-8", errors="replace") + if data: + app_instance.serial_buffer += data + log_message(f"[UART] Buffer read: {app_instance.serial_buffer.strip()}") + return app_instance.serial_buffer + except Exception as e: + log_message(f"[UART RX ERROR] {e}") + return app_instance.serial_buffer + def get_conditions_buffer_size(debug=False): """Return the maximum length of condition strings with debug option""" if not hasattr(config, 'conditions') or not config.conditions: @@ -545,8 +600,8 @@ def check_conditions(self, buffer, debug=False): """Check buffer against all conditions by examining every position""" - if debug: - log_message(f"[DEBUG] Checking buffer ({len(buffer)} chars): {repr(buffer)}") + #if debug: + #log_message(f"[DEBUG] Checking buffer ({len(buffer)} chars): {repr(buffer)}") if not hasattr(config, 'conditions') or not config.conditions: if debug: @@ -561,8 +616,8 @@ trigger_len = len(trigger_str) buffer_len = len(buffer) - if debug: - log_message(f"[DEBUG] Checking condition {i} for '{trigger_str}' (length: {trigger_len})") + #if debug: + #log_message(f"[DEBUG] Checking condition {i} for '{trigger_str}' (length: {trigger_len})") # Check every possible starting position in the buffer for pos in range(buffer_len - trigger_len + 1): @@ -585,8 +640,8 @@ log_message(f"[DEBUG] Condition check failed for {i}: {str(e)}") continue - if debug: - log_message("[DEBUG] No conditions matched") + #if debug: + #log_message("[DEBUG] No conditions matched") return None def execute_condition_action(action_name, debug=False): @@ -763,4 +818,66 @@ text_area.clear() def end_program(): - exit() \ No newline at end of file + exit() + +################## +# Faultycat stuff +################## + +def faulty_connect(comport: str = DEFAULT_FAULTY_COMPORT) -> bool: + try: + faulty_worker.set_serial_port(comport) + if not faulty_worker.validate_serial_connection(): + #if debug: + #log_message(f"Connection failed on {comport}") + return False + faulty_worker.board_uart.open() + time.sleep(0.1) + #if debug: + #log_message("Board connected") + return True + except Exception as e: + #if debug: + #log_message(f"Connection error: {e}") + return False + +def faulty_arm() -> bool: + try: + uart, cmd = faulty_worker.board_uart, faulty_worker.board_configurator.board_commands + uart.send(cmd.COMMAND_DISARM.value.encode("utf-8")) + time.sleep(1) + uart.send(cmd.COMMAND_ARM.value.encode("utf-8")) + #if debug: + #log_message("Board armed") + return True + except Exception as e: + #if debug: + #log_message(f"Arm error: {e}") + return False + +def faulty_send_pulse() -> bool: + try: + faulty_worker.board_uart.send( + faulty_worker.board_configurator.board_commands.COMMAND_PULSE.value.encode("utf-8") + ) + #if debug: + #log_message("Pulse sent") + return True + except Exception as e: + #if debug: + #log_message(f"Pulse error: {e}") + return False + +def faulty_disarm(close_uart: bool = True) -> bool: + try: + uart, cmd = faulty_worker.board_uart, faulty_worker.board_configurator.board_commands + uart.send(cmd.COMMAND_DISARM.value.encode("utf-8")) + if close_uart: + uart.close() + #if debug: + #log_message("Board disarmed") + return True + except Exception as e: + #if debug: + #log_message(f"Disarm error: {e}") + return False \ No newline at end of file diff --git a/glitch-o-bolt.py b/glitch-o-bolt.py index aba439b..ba3d95f 100644 --- a/glitch-o-bolt.py +++ b/glitch-o-bolt.py @@ -229,7 +229,8 @@ """Display serial data exactly as received""" if hasattr(functions, 'text_area'): # Write the data exactly as it should appear - functions.text_area.write(message.data) + if functions.get_config_value("uart_output_enabled") is True: + functions.text_area.write(message.data) log_time = functions.get_config_value("log_time") if log_time > 0: @@ -329,6 +330,15 @@ else: main_content.border_title = f"{port} {baud} \\[{log_time}.log]" + if "uart-output-switch" in switch.classes: + if bool(event.value) is True: + functions.set_config_value("uart_output_enabled", True) + else: + functions.set_config_value("uart_output_enabled", False) + functions.log_message(f"[FUNCTION] uart output toggled: {event.value}") + + + if "glitch-switch" in switch.classes: if functions.DEBUG_MODE: curr_time = functions.get_config_value("glitch_time") @@ -473,9 +483,12 @@ with sidebar_content3: with Grid(classes="sidebar_settings_switches"): # Add rows with switches - yield Static(f"uart") + yield Static(f"uart enable") yield Switch(classes="sidebar_trigger_switch", value=False, animate=False, id="uart_switch") + yield Static(f"uart output") + yield Switch(classes="uart-output-switch sidebar_trigger_switch", value=False, animate=False) + yield Static(f"logging") yield Switch(classes="logging-switch sidebar_trigger_switch", value=False, animate=False) diff --git a/style.tcss b/style.tcss index 78202e1..b095abb 100644 --- a/style.tcss +++ b/style.tcss @@ -272,7 +272,7 @@ .sidebar_settings_switches{ color: #9DC3CF; - height: 2; + height: 3; border: none; layout: grid; grid-size: 2;