import sys import os import re import time import serial import importlib from scope import Scope from FaultycatModules import Worker from textual.widgets import Button, Input, Switch from textual.containers import Vertical import asyncio import functions DEBUG_MODE = True app_instance = None # Global variable to store the app instance text_area = None # Store global reference to scrollable text area config = None # dynamic loading of config file log_time = 0 # timestamp for logfile glitch_time = 0 # timestamp for when glitching started # FaultyCat Variables DEFAULT_FAULTY_COMPORT = "/dev/ttyACM0" faulty_worker = Worker.FaultyWorker() try: s = Scope() except IOError: s = None print("Warning: Scope not connected, running in simulation mode") def set_config(cfg): global config config = cfg def set_app_instance(app): """Store the app instance for UART access""" global app_instance app_instance = app def log_message(message): if DEBUG_MODE: with open("debug.log", "a") as log_file: log_file.write(message + "\n") def set_log_time(value): global log_time log_time = value def set_glitch_time(value): global glitch_time glitch_time = value def get_config_value(name: str) -> int: """Return the latest value of the given config variable, and create them if they don't exist.""" if name == "length": if not hasattr(config, "LENGTH"): config.LENGTH = 0 # Default value if not set return config.LENGTH elif name == "repeat": if not hasattr(config, "REPEAT"): config.REPEAT = 0 # Default value if not set return config.REPEAT elif name == "serial_port": if not hasattr(config, "SERIAL_PORT"): config.SERIAL_PORT = "/dev/ttyUSB0" # Default value if not set return config.SERIAL_PORT elif name == "baud_rate": if not hasattr(config, "BAUD_RATE"): config.BAUD_RATE = 115200 # Default value if not set return config.BAUD_RATE elif name == "delay": if not hasattr(config, "DELAY"): config.DELAY = 0 # Default value if not set return config.DELAY elif name == "log_time": return log_time # Return the module variable directly elif name == "glitch_time": return glitch_time # Return the module variable directly elif name == "conFile": if not hasattr(config, "CONFILE"): config.CONFILE = "config.py" # Or any suitable default return config.CONFILE elif name == "uart_output_enabled": if not hasattr(config, "UART_OUTPUT_ENABLED"): config.UART_OUTPUT_ENABLED = False # Default to disabled return config.UART_OUTPUT_ENABLED elif name.startswith("trigger_"): if "_value" in name: index = int(name.split('_')[1]) return config.triggers[index][0] elif "_state" in name: index = int(name.split('_')[1]) return config.triggers[index][1] else: return 0 # Default fallback for unknown names def set_config_value(name: str, value): """Set the value of a config variable and update the UI if applicable.""" attr_name = name.upper() # Create the attribute if it doesn't exist if not hasattr(config, attr_name): setattr(config, attr_name, value) else: setattr(config, attr_name, value) # Safely update corresponding input field if it exists try: input_field = app_instance.query_one(f"#{name}_input") input_field.value = str(value) except Exception: # No input field exists for this config; ignore pass # Safely update status box row if possible try: update_status_box(app_instance, name, value) except Exception: pass # Refresh UI try: app_instance.refresh() except Exception: pass def get_condition_string(index): """Returns the string from the triggers list at the given index.""" if 0 <= index < len(config.conditions): return config.conditions[index][0] # Return the string value else: raise IndexError("Index out of range") def get_condition_value(index): """Returns the value from the triggers list at the given index.""" if 0 <= index < len(config.conditions): return config.conditions[index][1] # Return the boolean value else: raise IndexError("Index out of range") def set_condition_value(index: int, value: bool) -> None: """Update switch state in config""" if 0 <= index < len(config.conditions): if app_instance.query(f"#condition_switch_{index}"): switch = app_instance.query_one(f"#condition_switch_{index}", Switch) switch.value = value # Force turn off else: raise IndexError("Index out of range") def ensure_triggers_exist(): if not hasattr(config, "triggers") or not config.triggers or len(config.triggers) < 8: config.triggers = [["-", False] for _ in range(8)] def get_trigger_string(index): """Returns the string from the triggers list at the given index.""" if 0 <= index < len(config.triggers): return config.triggers[index][0] # Return the string value else: raise IndexError("Index out of range") def get_trigger_value(index): """Returns the value from the triggers list at the given index.""" if 0 <= index < len(config.triggers): return config.triggers[index][1] # Return the boolean value else: raise IndexError("Index out of range") def set_trigger_value(index, value): if 0 <= index < len(config.triggers): switch = app_instance.query_one(f"#trigger_switch_{index}", Switch) switch.value = value # Force turn off else: raise IndexError("Index out of range") def set_trigger_string(index: int, value: str): # Validate the input value valid_values = ["^", "v", "-"] if value not in valid_values: raise ValueError(f"Invalid trigger value. Must be one of {valid_values}") # Update config config.triggers[index][0] = value config.triggers[index][1] = False # Update the symbol display in the UI symbol_widget = app_instance.query_one(f"#trigger_symbol_{index}") symbol_widget.update(value) # Update the switch in the UI switch_widget = app_instance.query_one(f"#trigger_switch_{index}") switch_widget.value = False def toggle_trigger(self, index: int): current_symbol = config.triggers[index][0] cycle = ["^", "v", "-"] next_symbol = cycle[(cycle.index(current_symbol) + 1) % len(cycle)] # Update config config.triggers[index][0] = next_symbol config.triggers[index][1] = False # Update the symbol display in the UI symbol_widget = self.query_one(f"#trigger_symbol_{index}") symbol_widget.update(next_symbol) # Update the switch in the UI switch_widget = self.query_one(f"#trigger_switch_{index}") switch_widget.value = False log_message("next symbol: "+next_symbol) def set_uart_switch(state: bool | None = None) -> None: switch_uart = app_instance.query_one("#uart_switch") if state is None: switch_uart.value = not switch_uart.value # Toggle else: switch_uart.value = state # Set to specific state def modify_value(variable_name: str, amount: int) -> int: """ Modify a global variable by a given amount. Args: variable_name (str): The name of the variable to modify. amount (int): The amount to increment or decrement. Returns: int: The updated value. """ global config # Ensure we modify the variables from config.py if variable_name == "length": config.LENGTH += amount return config.LENGTH elif variable_name == "repeat": config.REPEAT += amount return config.REPEAT elif variable_name == "delay": config.DELAY += amount return config.DELAY else: raise ValueError(f"Unknown variable: {variable_name}") def on_button_pressed(app, event: Button.Pressed) -> None: """Handle button presses and update values dynamically.""" button = event.button button_name = button.name if button_name: # Strip everything before the first hyphen, including the hyphen itself button_name = button_name.split("-", 1)[-1] # Get the part after the first hyphen parts = button_name.split("_") if len(parts) == 2: variable_name, amount = parts[0], int(parts[1]) # Update the variable value in config.py if hasattr(config, variable_name.upper()): current_value = getattr(config, variable_name.upper()) new_value = current_value + amount setattr(config, variable_name.upper(), new_value) # Update corresponding Input field input_field = app.query_one(f"#{variable_name}_input") input_field.value = str(new_value) # Update the status box row update_status_box(app, variable_name, new_value) # Refresh UI to reflect changes app.refresh() def on_save_button_pressed(app, event: Button.Pressed) -> None: """Handle the Save button press to save the values.""" button = event.button button_name = button.name if button_name: variable_name = button_name.replace("save_val-", "") variable_name = variable_name.replace("_save", "") # Extract the variable name from button input_field = app.query_one(f"#{variable_name}_input", Input) new_value = int(input_field.value) setattr(config, variable_name.upper(), new_value) update_status_box(app, variable_name, new_value) app.refresh() def save_uart_settings(app, event: Button.Pressed) -> None: cur_uart_port = str(app.query_one(f"#uart_port_input", Input).value) cur_baud_rate = int(app.query_one(f"#baud_rate_input", Input).value) config.SERIAL_PORT = cur_uart_port config.BAUD_RATE = cur_baud_rate main_content = app.query_one(".main_content", Vertical) main_content.border_title = f"{config.SERIAL_PORT} {config.BAUD_RATE}" app.refresh() def change_baudrate(new_baudrate): """Change the baud rate using the app_instance's serial connection""" if app_instance is None: add_text("[ERROR] App instance not available") return False if not hasattr(app_instance, 'serial_connection'): add_text("[ERROR] No serial connection in app instance") return False input_field = app_instance.query_one(f"#baud_rate_input") input_field.value = str(new_baudrate) serial_conn = app_instance.serial_connection if serial_conn is None or not serial_conn.is_open: add_text("[ERROR] Serial port not initialized or closed") return False try: old_baudrate = serial_conn.baudrate serial_conn.baudrate = new_baudrate config.BAUD_RATE = new_baudrate main_content = app_instance.query_one(".main_content", Vertical) if functions.get_config_value("log_time") == 0: main_content.border_title = f"{config.SERIAL_PORT} {config.BAUD_RATE}" else: time = str(functions.get_config_value("log_time")) main_content.border_title = f"{config.SERIAL_PORT} {config.BAUD_RATE} \\[{time}.log]" return True except ValueError as e: add_text(f"[ERROR] Invalid baud rate {new_baudrate}: {e}") except serial.SerialException as e: add_text(f"[ERROR] Serial error changing baud rate: {e}") # Attempt to revert try: serial_conn.baudrate = old_baudrate except: add_text("[WARNING] Failed to revert baud rate") return False def update_status_box(app, variable_name, new_value): column_keys = list(app.status_box.columns.keys()) # We only have two columns: "Attribute" and "Value" if variable_name == "length": row_key = list(app.status_box.rows.keys())[0] # The first row column_key = column_keys[1] # The Value column for 'length' elif variable_name == "repeat": row_key = list(app.status_box.rows.keys())[1] # The first row column_key = column_keys[1] # The Value column for 'repeat' elif variable_name == "delay": row_key = list(app.status_box.rows.keys())[2] # The first row column_key = column_keys[1] # The Value column for 'delay' elif variable_name == "elapsed": row_key = list(app.status_box.rows.keys())[3] # The first row column_key = column_keys[1] # The Value column for 'delay' app.status_box.update_cell(row_key, column_key, str(new_value)) def run_custom_function(app, event): """Handle custom function buttons with enhanced logging""" button = event.button button_name = button.name debug = DEBUG_MODE # Set to False after testing log_message(f"[CUSTOM] Button pressed: '{button_name}'") if button_name: try: variable_name = int(button_name.replace("custom_function-", "")) log_message(f"[CUSTOM] Condition index: {variable_name}") if 0 <= variable_name < len(config.conditions): func_name = config.conditions[variable_name][3] log_message(f"[CUSTOM] Executing: {func_name}") # Use the centralized execution function success = execute_condition_action(func_name, debug) if not success: log_message(f"[CUSTOM] Failed to execute {func_name}") else: log_message(f"[CUSTOM] Invalid index: {variable_name}") except ValueError: log_message(f"[CUSTOM] Invalid button format: '{button_name}'") except Exception as e: log_message(f"[CUSTOM] Error: {str(e)}") if debug: log_message(f"[DEBUG] {traceback.format_exc()}") def write_to_log(text: str, log_time: int): """Write text to a log file named {log_time}.log in the logs directory""" # Create logs directory if it doesn't exist logs_dir = "logs" if not os.path.exists(logs_dir): os.makedirs(logs_dir) # Create filename using log_time value log_file = os.path.join(logs_dir, f"{log_time}.log") # Append text to log file with open(log_file, "a") as f: f.write(f"{text}") def add_text(text): """Add text to the log widget and optionally to a log file""" if hasattr(functions, 'text_area'): functions.text_area.write(text + "\n") log_time = get_config_value("log_time") if log_time > 0: write_to_log(text+"\n", log_time) def update_text(text): """Update text without adding newlines""" if hasattr(functions, 'text_area'): functions.text_area.write(text) def save_config(app): config_file = get_config_value("conFile") temp_file = config_file + ".tmp" new_file = str(app.query_one(f"#config_file_input", Input).value) try: # Get current values serial_port = get_config_value("serial_port") baud_rate = get_config_value("baud_rate") length = get_config_value("length") repeat = get_config_value("repeat") delay = get_config_value("delay") # Get triggers triggers = [] for i in range(8): triggers.append([ get_config_value(f"trigger_{i}_value"), get_config_value(f"trigger_{i}_state") ]) # Read existing config existing_content = "" custom_functions = [] imports = [] if os.path.exists(config_file): with open(config_file, 'r') as f: existing_content = f.read() # Extract imports and functions import_pattern = re.compile(r'^import .+?$|^from .+? import .+?$', re.MULTILINE) imports = import_pattern.findall(existing_content) func_pattern = re.compile(r'^(def \w+\(.*?\):.*?)(?=^(?:def \w+\(|\Z))', re.MULTILINE | re.DOTALL) custom_functions = [fn.strip() for fn in func_pattern.findall(existing_content) if fn.strip()] # Write new config file with open(temp_file, 'w') as f: # Write imports if imports: f.write("######\n# LEAVE THESE IMPORTS!\n######\n") f.write("\n".join(imports) + "\n\n") # Write config values f.write("######\n# config values\n######\n\n") f.write(f"SERIAL_PORT = {repr(serial_port)}\n") f.write(f"BAUD_RATE = {baud_rate}\n\n") f.write(f"LENGTH = {length}\n") f.write(f"REPEAT = {repeat}\n") f.write(f"DELAY = {delay}\n\n") # Write triggers f.write("###\n# ^ = pullup, v = pulldown\n###\n") f.write("triggers = [\n") for i, (value, state) in enumerate(triggers): f.write(f" [{repr(value)}, {state}], #{i}\n") f.write("]\n") # Write conditions if they exist if hasattr(config, 'conditions') and config.conditions: f.write("\n###\n# name, enabled, string to match\n###\n") f.write("conditions = [\n") for condition in config.conditions: f.write(f" {condition},\n") f.write("]\n") # Write custom functions with proper spacing if custom_functions: f.write("\n######\n# Custom functions\n######\n") f.write("\n\n".join(custom_functions)) f.write("\n") # Single newline at end # Finalize file if os.path.exists(new_file): os.remove(new_file) os.rename(temp_file, new_file) config.CONFILE = new_file add_text(f"[SAVED] config {new_file} saved") except Exception as e: log_message(f"Error saving config: {str(e)}") if os.path.exists(temp_file): os.remove(temp_file) raise def start_serial(): try: ser = serial.Serial( port=config.SERIAL_PORT, baudrate=config.BAUD_RATE, timeout=0.1, # Read timeout (seconds) write_timeout=1.0, # Write timeout inter_byte_timeout=0.05, # Between bytes exclusive=True, # Prevent multiple access rtscts=False, # Enable hardware flow control (disable for tigard) dsrdtr=False # Additional flow control (disable for tigard) ) add_text("Connected to serial port.") return ser except serial.SerialException as e: add_text(f"[ERROR] Serial exception: {e}") return None def send_uart_message(message): """Send a message via UART from anywhere in the application""" if not app_instance: log_message("[UART] Not sent - No app instance") return False if not hasattr(app_instance, 'serial_connection') or not app_instance.serial_connection.is_open: log_message("[UART] Not sent - UART disconnected") return False try: # Ensure message ends with newline if it's not empty if message and not message.endswith('\n'): message += '\n' # Send the message app_instance.serial_connection.write(message.encode('utf-8')) log_message(f"[UART] Sent: {message.strip()}") return True except Exception as e: log_message(f"[UART TX ERROR] {str(e)}") return False def flush_uart_buffer(): """Flush UART buffers and clear app_instance.serial_buffer.""" conn = getattr(app_instance, "serial_connection", None) if not conn or not conn.is_open: log_message("[UART] Flush skipped - No connection") return False try: conn.reset_input_buffer() conn.reset_output_buffer() app_instance.serial_buffer = "" log_message("[UART] Buffers flushed") return True except Exception as e: log_message(f"[UART FLUSH ERROR] {e}") return False def read_uart_buffer(): """Read data into app_instance.serial_buffer and return it.""" conn = getattr(app_instance, "serial_connection", None) if not conn or not conn.is_open: log_message("[UART] Read skipped - No connection") return app_instance.serial_buffer try: data = conn.read_all().decode("utf-8", errors="replace") if data: app_instance.serial_buffer += data log_message(f"[UART] Buffer read: {app_instance.serial_buffer.strip()}") return app_instance.serial_buffer except Exception as e: log_message(f"[UART RX ERROR] {e}") return app_instance.serial_buffer def get_conditions_buffer_size(debug=False): """Return the maximum length of condition strings with debug option""" if not hasattr(config, 'conditions') or not config.conditions: if debug: log_message("[DEBUG] No conditions defined, using default buffer size 256") return 256 valid_lengths = [len(cond[2]) for cond in config.conditions if cond[2]] if not valid_lengths: if debug: log_message("[DEBUG] All condition strings are empty, using default buffer size 256") return 256 max_size = max(valid_lengths) if debug: log_message(f"[DEBUG] Calculated buffer size: {max_size} (from {len(config.conditions)} conditions)") return max_size def check_conditions(self, buffer, debug=False): """Check buffer against all conditions by examining every position""" #if debug: #log_message(f"[DEBUG] Checking buffer ({len(buffer)} chars): {repr(buffer)}") if not hasattr(config, 'conditions') or not config.conditions: if debug: log_message("[DEBUG] No conditions to check against") return None for i, condition in enumerate(config.conditions): trigger_str = condition[2] if not trigger_str: # Skip empty trigger strings continue trigger_len = len(trigger_str) buffer_len = len(buffer) #if debug: #log_message(f"[DEBUG] Checking condition {i} for '{trigger_str}' (length: {trigger_len})") # Check every possible starting position in the buffer for pos in range(buffer_len - trigger_len + 1): # Compare slice of buffer with trigger string if buffer[pos:pos+trigger_len] == trigger_str: try: condition_active = config.conditions[i][1] # Get state from config if not condition_active: if debug: log_message(f"[DEBUG] Condition {i} matched at position {pos} but switch is OFF") continue if debug: log_message(f"[DEBUG] MATCHED condition {i} at position {pos}: {condition[0]} -> {condition[3]}") return condition[3] except Exception as e: if debug: log_message(f"[DEBUG] Condition check failed for {i}: {str(e)}") continue #if debug: #log_message("[DEBUG] No conditions matched") return None def execute_condition_action(action_name, debug=False): """Execute the named action function using run_custom_function logic""" if debug: log_message(f"[ACTION] Attempting to execute: {action_name}") try: # Check if action exists in config module module_name = 'config' module = importlib.import_module(module_name) if hasattr(module, action_name): if debug: log_message(f"[ACTION] Found {action_name} in {module_name}") getattr(module, action_name)() return True # Check if action exists in functions module if hasattr(sys.modules[__name__], action_name): if debug: log_message(f"[ACTION] Found {action_name} in functions") getattr(sys.modules[__name__], action_name)() return True # Check if action exists in globals if action_name in globals(): if debug: log_message(f"[ACTION] Found {action_name} in globals") globals()[action_name]() return True log_message(f"[ACTION] Function '{action_name}' not found in any module") return False except Exception as e: log_message(f"[ACTION] Error executing {action_name}: {str(e)}") if debug: log_message(f"[DEBUG] Full exception: {traceback.format_exc()}") return False def get_glitch_elapsed(): gtime = get_config_value("glitch_time") if gtime <= 0: return "000:00:00" # Assuming gtime contains the start timestamp elapsed = int(time.time() - gtime) return f"{elapsed//3600:03d}:{(elapsed%3600)//60:02d}:{elapsed%60:02d}" def start_glitch(glitch_len, trigger_repeats, delay): s.glitch.repeat = glitch_len s.glitch.ext_offset = delay #add_text(f"[GLITCHING]: length:{glitch_len}, offset:{delay}, repeat:{trigger_repeats}") triggers = [] # Get triggers triggers_set = False for i in range(8): triggers.append([ get_config_value(f"trigger_{i}_value"), get_config_value(f"trigger_{i}_state") ]) for i, (value, state) in enumerate(triggers): if state is True: triggers_set = True if value == "^": #add_text(f"[GLITCHING]: armed: {i} ^") s.arm(i, Scope.RISING_EDGE) elif value == "v": #add_text(f"[GLITCHING]: armed: {i} v") s.arm(i, Scope.FALLING_EDGE) if triggers_set is False: #add_text(f"[GLITCHING]: repeat:{trigger_repeats}") for _ in range(trigger_repeats): s.trigger() def launch_glitch(): length = functions.get_config_value("length") repeat = functions.get_config_value("repeat") delay = functions.get_config_value("delay") start_glitch(length, repeat, delay) async def glitch(self): functions.log_message("[GLITCHING] Starting glitch monitor") previous_gtime = None # Track the previous state while True: try: gtime = get_config_value("glitch_time") elapsed_time = get_glitch_elapsed() functions.update_status_box(self, "elapsed", elapsed_time) # Only update if the state has changed #if gtime != previous_gtime: if gtime > 0: self.status_box.border_subtitle = "running" self.status_box.styles.border_subtitle_color = "#5E99AE" self.status_box.styles.border_subtitle_style = "bold" length = functions.get_config_value("length") repeat = functions.get_config_value("repeat") delay = functions.get_config_value("delay") start_glitch(length, repeat, delay) else: self.status_box.border_subtitle = "stopped" self.status_box.styles.border_subtitle_color = "#B13840" self.status_box.styles.border_subtitle_style = "none" #previous_gtime = gtime # Update the previous state except Exception as e: print(f"Update error: {e}") await asyncio.sleep(0.1) def glitching_switch(value): switch = app_instance.query_one("#glitch-switch", Switch) switch.value = value # Force turn off def run_output_high(gpio, time): s.io.add(gpio, 1, delay=time) s.io.upload() s.trigger() def run_output_low(gpio, time): s.io.add(gpio, 0, delay=time) s.io.upload() s.trigger() async def monitor_buffer(self): """Background task to monitor serial buffer for conditions""" debug = True buffer_size = functions.get_conditions_buffer_size(debug) functions.log_message("[CONDITIONS] Starting monitor") while self.run_serial: if not getattr(self, '_serial_connected', False): await asyncio.sleep(1) continue async with self.buffer_lock: current_buffer = self.serial_buffer max_keep = buffer_size * 3 # Keep enough buffer to catch split matches if len(current_buffer) > max_keep: # Keep last max_keep characters, but ensure we don't cut a potential match keep_from = len(current_buffer) - max_keep # Find the last newline before this position to avoid breaking lines safe_cut = current_buffer.rfind('\n', 0, keep_from) if safe_cut != -1: keep_from = safe_cut + 1 self.serial_buffer = current_buffer[keep_from:] current_buffer = self.serial_buffer if debug: log_message(f"[DEBUG] Truncated buffer from {len(current_buffer)+keep_from} to {len(current_buffer)} chars") if current_buffer: action = functions.check_conditions(self, current_buffer, debug) if action: functions.log_message(f"[CONDITIONS] Triggering: {action}") success = functions.execute_condition_action(action, debug) if success: async with self.buffer_lock: # Clear the buffer after successful match self.serial_buffer = "" else: functions.log_message("[CONDITIONS] Action failed") await asyncio.sleep(0.1) def clear_text(): text_area.clear() def end_program(): exit() ################## # Faultycat stuff ################## def faulty_connect(comport: str = DEFAULT_FAULTY_COMPORT) -> bool: try: faulty_worker.set_serial_port(comport) if not faulty_worker.validate_serial_connection(): #if debug: #log_message(f"Connection failed on {comport}") return False faulty_worker.board_uart.open() time.sleep(0.1) #if debug: #log_message("Board connected") return True except Exception as e: #if debug: #log_message(f"Connection error: {e}") return False def faulty_arm() -> bool: try: uart, cmd = faulty_worker.board_uart, faulty_worker.board_configurator.board_commands uart.send(cmd.COMMAND_DISARM.value.encode("utf-8")) time.sleep(1) uart.send(cmd.COMMAND_ARM.value.encode("utf-8")) #if debug: #log_message("Board armed") return True except Exception as e: #if debug: #log_message(f"Arm error: {e}") return False def faulty_send_pulse() -> bool: try: faulty_worker.board_uart.send( faulty_worker.board_configurator.board_commands.COMMAND_PULSE.value.encode("utf-8") ) #if debug: #log_message("Pulse sent") return True except Exception as e: #if debug: #log_message(f"Pulse error: {e}") return False def faulty_disarm(close_uart: bool = True) -> bool: try: uart, cmd = faulty_worker.board_uart, faulty_worker.board_configurator.board_commands uart.send(cmd.COMMAND_DISARM.value.encode("utf-8")) if close_uart: uart.close() #if debug: #log_message("Board disarmed") return True except Exception as e: #if debug: #log_message(f"Disarm error: {e}") return False