#!/usr/bin/env python3 # # glitch-o-matic 2.0 - Optimized Version # Enhanced serial data performance while maintaining all existing features # # requirements: textual import os import sys import time import types import argparse import importlib.util import concurrent.futures import asyncio import select import serial import functools from scope import Scope from textual import events from textual.app import App, ComposeResult from textual.containers import Container, Vertical, Horizontal, Grid from textual.widgets import Static, DataTable, Input, Button, Switch, Log from textual.messages import Message # Define specific names for each control row control_names = ["length", "repeat", "delay"] def load_config(path): spec = importlib.util.spec_from_file_location("dynamic_config", path) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) # Inject the loaded config as 'config' sys.modules['config'] = module class PersistentInput(Input): PREFIX = "$> " # The permanent prefix def __init__(self, **kwargs): super().__init__(**kwargs) self.value = self.PREFIX # Set initial value def on_input_changed(self, event: Input.Changed) -> None: """Ensure the prefix is always present.""" if not event.value.startswith(self.PREFIX): self.value = self.PREFIX # Restore the prefix elif len(event.value) < len(self.PREFIX): self.value = self.PREFIX # Prevent deleting def set_app_instance(app): functions.app_instance = app class SerialDataMessage(Message): def __init__(self, data: str): super().__init__() # Call the parent class constructor self.data = data # Store the serial data class LayoutApp(App): CSS_PATH = "style.tcss" async def on_ready(self) -> None: #set_app_instance(self) self.run_serial = True self.serial_buffer = "" # Add buffer storage self.buffer_lock = asyncio.Lock() # Add thread-safe lock try: functions.s = Scope() except IOError: s = None print("Warning: Scope not connected, running in simulation mode") # Start both serial tasks asyncio.create_task(self.connect_serial()) asyncio.create_task(functions.monitor_buffer(self)) asyncio.create_task(functions.glitch(self)) functions.log_message("[DEBUG] Serial tasks created") async def connect_serial(self): """Stable serial connection with proper error handling""" switch_uart = self.query_one("#uart_switch") while self.run_serial: if switch_uart.value: if not getattr(self, '_serial_connected', False): try: # Close existing connection if any if hasattr(self, 'serial_connection') and self.serial_connection: self.serial_connection.close() # Establish new connection self.serial_connection = functions.start_serial() if self.serial_connection and self.serial_connection.is_open: # Configure for reliable operation self.serial_connection.timeout = 0.5 self.serial_connection.write_timeout = 1.0 self._serial_connected = True functions.log_message("[SERIAL] Connected successfully") asyncio.create_task(self.read_serial_loop()) else: raise serial.SerialException("Connection failed") except Exception as e: self._serial_connected = False functions.log_message(f"[SERIAL] Connection error: {str(e)}") switch_uart.value = False await asyncio.sleep(2) # Wait before retrying else: if getattr(self, '_serial_connected', False): if hasattr(self, 'serial_connection') and self.serial_connection: self.serial_connection.close() self._serial_connected = False functions.log_message("[SERIAL] Disconnected") await asyncio.sleep(1) # Check connection status periodically async def read_serial_loop(self): """Serial reading that perfectly preserves original line endings""" buffer = "" while self.run_serial and getattr(self, '_serial_connected', False): try: # Read available data (minimum 1 byte) data = await asyncio.get_event_loop().run_in_executor( None, lambda: self.serial_connection.read(max(1, self.serial_connection.in_waiting)) ) if data: decoded = data.decode('utf-8', errors='ignore') # Store raw data in condition monitoring buffer async with self.buffer_lock: self.serial_buffer += decoded # Original character processing for char in decoded: if char == '\r': continue buffer += char if char == '\n': self.post_message(SerialDataMessage(buffer)) buffer = "" if buffer: self.post_message(SerialDataMessage(buffer)) buffer = "" await asyncio.sleep(0.01) except serial.SerialException as e: functions.log_message(f"[SERIAL] Read error: {str(e)}") self._serial_connected = False break except Exception as e: functions.log_message(f"[SERIAL] Unexpected error: {str(e)}") await asyncio.sleep(0.1) async def monitor_conditions(self): """Background task to monitor serial buffer for conditions with debug""" debug = functions.DEBUG_MODE # Set to False to disable debug logging after testing buffer_size = functions.get_conditions_buffer_size(debug) if debug: functions.log_message("[DEBUG] Starting condition monitor") functions.log_message(f"[DEBUG] Initial buffer size: {buffer_size}") functions.log_message(f"[DEBUG] Current conditions: {config.conditions}") while self.run_serial: if hasattr(self, '_serial_connected') and self._serial_connected: # Get a snapshot of the buffer contents async with self.buffer_lock: current_buffer = self.serial_buffer if debug and current_buffer: functions.log_message(f"[DEBUG] Current buffer length: {len(current_buffer)}") # Keep reasonable buffer size if len(current_buffer) > buffer_size * 2: self.serial_buffer = current_buffer = current_buffer[-buffer_size*2:] if debug: functions.log_message(f"[DEBUG] Trimmed buffer to {len(current_buffer)} chars") # Check for conditions action = functions.check_conditions(self, current_buffer, debug) if action: if debug: functions.log_message(f"[DEBUG] Executing action: {action}") functions.execute_condition_action(action, debug) elif debug and current_buffer: functions.log_message("[DEBUG] No action triggered") await asyncio.sleep(0.1) # Check 10 times per secon async def on_key(self, event: events.Key) -> None: """Handles input with proper newline preservation""" if event.key == "enter" and self.input_field.has_focus: text_to_send = self.input_field.value # Preserve exact input (don't strip) but ensure newline if not text_to_send.endswith('\n'): text_to_send += '\n' # Check if serial_connection exists and is open serial_connection = getattr(self, 'serial_connection', None) if serial_connection is not None and serial_connection.is_open: try: # Send raw bytes exactly as entered await asyncio.get_event_loop().run_in_executor( None, lambda: serial_connection.write(text_to_send.encode('utf-8')) ) # Echo to console with prefix display_text = f"> {text_to_send.rstrip()}" functions.add_text(display_text) except Exception as e: functions.log_message(f"[UART TX ERROR] {str(e)}") functions.add_text(">> Failed to send") else: functions.add_text(">> Not sent - UART disconnected") self.input_field.value = "" event.prevent_default() async def on_serial_data_message(self, message: SerialDataMessage) -> None: """Display serial data exactly as received""" if hasattr(functions, 'text_area'): # Write the data exactly as it should appear if functions.get_config_value("uart_output_enabled") is True: functions.text_area.write(message.data) log_time = functions.get_config_value("log_time") if log_time > 0: functions.write_to_log(message.data, log_time) #functions.add_text(message.data) functions.text_area.scroll_end() def read_from_serial_sync(self): """Synchronous line reading with proper timeout handling""" if not self.serial_connection: return b"" try: # Read with timeout ready, _, _ = select.select([self.serial_connection], [], [], 0.01) if ready: # Read until newline or buffer limit return self.serial_connection.read_until(b'\n', size=4096) return b"" except Exception as e: functions.log_message(f"[ERROR] Serial read error: {str(e)}") return b"" def on_button_pressed(self, event: Button.Pressed) -> None: button_id_parts = event.button.name parts = button_id_parts.split("-") button_id = parts[0] if(button_id == "exit_button"): functions.end_program() if(button_id == "clear_button"): functions.clear_text() if(button_id == "btn_glitch"): functions.launch_glitch() if(button_id == "save_config"): functions.save_config(self) if(button_id == "toggle_trigger"): functions.toggle_trigger(self, int(parts[1])) if(button_id == "change_val"): functions.on_button_pressed(self, event) if(button_id == "save_val"): functions.on_save_button_pressed(self, event) if(button_id == "custom_function"): functions.run_custom_function(self, event) if(button_id == "save_uart"): functions.save_uart_settings(self, event) def on_switch_changed(self, event: Switch.Changed) -> None: """Handle switch toggle events""" switch = event.switch # Only handle switches with our specific class if "trigger-switch" in switch.classes: try: # Extract index from switch ID index = int(switch.id.split("_")[-1]) new_state = bool(event.value) # Ensure boolean config.triggers[index][1] = new_state # Update config functions.set_triggers() except (ValueError, IndexError, AttributeError) as e: if functions.DEBUG_MODE: functions.log_message(f"[ERROR] Failed to process trigger switch: {str(e)}") if "condition-switch" in switch.classes: try: # Extract index from switch ID index = int(switch.id.split("_")[-1]) new_state = bool(event.value) # Ensure boolean # Update config config.conditions[index][1] = new_state if functions.DEBUG_MODE: functions.log_message(f"[CONDITION] Updated switch {index} to {new_state}") functions.log_message(f"[CONDITION] Current states: {[cond[1] for cond in config.conditions]}") except (ValueError, IndexError, AttributeError) as e: if functions.DEBUG_MODE: functions.log_message(f"[ERROR] Failed to process condition switch: {str(e)}") if "logging-switch" in switch.classes: if functions.DEBUG_MODE: curr_time = functions.get_config_value("log_time") functions.log_message(f"[FUNCTION] logging toggled: {curr_time}") if bool(event.value) is True: functions.set_log_time(int(time.time())) # Uses the 'time' module else: functions.set_log_time(0) main_content = self.query_one("#main_content") log_time = functions.get_config_value("log_time") # Renamed to avoid conflict port = str(functions.get_config_value("serial_port")) baud = str(functions.get_config_value("baud_rate")) if log_time == 0: # Now using 'log_time' instead of 'time' main_content.border_title = f"{port} {baud}" else: main_content.border_title = f"{port} {baud} \\[{log_time}.log]" if "uart-output-switch" in switch.classes: if bool(event.value) is True: functions.set_config_value("uart_output_enabled", True) else: functions.set_config_value("uart_output_enabled", False) functions.log_message(f"[FUNCTION] uart output toggled: {event.value}") if "glitch-switch" in switch.classes: if functions.DEBUG_MODE: curr_time = functions.get_config_value("glitch_time") functions.log_message(f"[FUNCTION] glitching toggled: {curr_time}") if bool(event.value) is True: functions.set_glitch_time(int(time.time())) # Uses the 'time' module else: functions.set_glitch_time(0) def compose(self) -> ComposeResult: with Vertical(classes="top_section"): # Use Vertical here instead of Horizontal with Vertical(classes="top_left"): # UART Box - appears second (below) with Vertical(classes="uart_box") as uart_box: uart_box.border_title = "uart settings" with Horizontal(classes="onerow"): yield Static("port:", classes="uart_label") yield Input( classes="control_input", id="uart_port_input", name="uart_port_input", value=str(functions.get_config_value("serial_port")) ) with Horizontal(classes="onerow"): yield Static("baud:", classes="uart_label") yield Input( classes="control_input", id="baud_rate_input", name="baud_rate_input", value=str(functions.get_config_value("baud_rate")) ) yield Button("save", classes="btn_save", id="save_uart", name="save_uart") # Config Box - appears first (on top) with Vertical(classes="config_box") as config_box: config_box.border_title = "config" with Horizontal(classes="onerow"): yield Static("file:", classes="uart_label") yield Input( classes="control_input", id="config_file_input", name="config_file_input", value=str(functions.get_config_value("conFile")) ) with Horizontal(classes="onerow"): yield Button("save", classes="btn_save", id="save_config", name="save_config") yield Static("glitch-o-bolt v2.0", classes="program_name") yield Static(" ") # Show blank space for name in control_names: with Horizontal(classes="control_row"): yield Static(f"{name}:", classes="control_label") for amount in [-100, -10, -1]: yield Button(str(amount), classes=f"btn btn{amount}", name=f"change_val-{name}_{amount}") yield Input( classes="control_input", value=str(functions.get_config_value(name)), type="integer", id=f"{name}_input" # Use `id` instead of `name` ) yield Button("save", classes="btn_save", name=f"save_val-{name}_save") for amount in [1, 10, 100]: yield Button(f"+{amount}", classes=f"btn btn-{amount}", name=f"change_val-{name}_{amount}") with Horizontal(classes="top_right"): with Vertical(classes="switch_box") as switch_box: #yield Static("glitch", classes="switch_title") yield Button("glitch", classes="btn_glitch", name=f"btn_glitch") yield Switch(classes="glitch-switch", id="glitch-switch", animate=False) # Create and store DataTable for later updates self.status_box = DataTable(classes="top_box", name="status_box") self.status_box.border_title = "status" self.status_box.border_subtitle = "stopped" self.status_box.styles.border_subtitle_color = "#B13840" self.status_box.show_header = False self.status_box.show_cursor = False self.status_box.add_columns("Attribute", "Value") # Add rows for config values self.status_box.add_row(" length: ", str(functions.get_config_value("length")), key="row1") self.status_box.add_row(" repeat: ", str(functions.get_config_value("repeat")), key="row2") self.status_box.add_row(" delay: ", str(functions.get_config_value("delay")), key="row3") self.status_box.add_row("elapsed: ", str(functions.get_glitch_elapsed()), key="row4") yield self.status_box # Yield the stored DataTable with Horizontal(classes="main_section"): with Vertical(classes="left_sidebar"): sidebar_content = Vertical(classes="sidebar_triggers_content") sidebar_content.border_title = "triggers" with sidebar_content: with Grid(classes="sidebar_triggers"): # Add rows with switches functions.ensure_triggers_exist() for i in range(8): yield Static(f"{i} -") yield Static(f"{functions.get_trigger_string(i)}", id=f"trigger_symbol_{i}", classes="sidebar_trigger_string") yield Switch( classes="trigger-switch sidebar_trigger_switch", value=functions.get_trigger_value(i), animate=False, id=f"trigger_switch_{i}" ) yield Button("^v-", classes="btn_toggle_1", name=f"toggle_trigger-{i}") if hasattr(config, "conditions") and config.conditions: sidebar_content2 = Vertical(classes="sidebar_conditions_content") sidebar_content2.border_title = "conditions" sidebar_content2.styles.height = len(config.conditions) + 1 with sidebar_content2: with Grid(classes="sidebar_conditions"): for i in range(len(config.conditions)): yield Static(f"{functions.get_condition_string(i)[:5]} ") if config.conditions[i][2] != "": yield Switch( id=f"condition_switch_{i}", classes="condition-switch sidebar_trigger_switch", # Added specific class value=functions.get_condition_value(i), animate=False ) else: yield Static(" ") yield Button("run", classes="btn_toggle_1", name=f"custom_function-{i}") sidebar_content3 = Vertical(classes="sidebar_settings_content") sidebar_content3.border_title = "misc" with sidebar_content3: with Grid(classes="sidebar_settings_switches"): # Add rows with switches yield Static(f"uart enable") yield Switch(classes="sidebar_trigger_switch", value=False, animate=False, id="uart_switch") yield Static(f"uart output") yield Switch(classes="uart-output-switch sidebar_trigger_switch", value=False, animate=False) yield Static(f"logging") yield Switch(classes="logging-switch sidebar_trigger_switch", value=False, animate=False) # Centre the exit button with Vertical(classes="centre_settings_buttons"): yield Button("clear main", classes="btn_settings", name="clear_button") with Vertical(classes="centre_settings_buttons"): yield Button("exit", classes="btn_settings", name="exit_button") global text_area # Use global reference with Vertical(id="main_content", classes="main_content") as main_content: port = str(functions.get_config_value("serial_port")) baud = str(functions.get_config_value("baud_rate")) if functions.get_config_value("log_time") == 0: main_content.border_title = f"{port} {baud}" else: time = str(functions.get_config_value("log_time")) main_content.border_title = f"{port} {baud} \\[{time}.log]" # Use Log() widget instead of TextArea for scrollable content functions.text_area = Log(classes="scrollable_log") yield functions.text_area # Make it accessible later with Horizontal(classes="input_container") as input_row: yield Static("$> ", classes="input_prompt") self.input_field = Input(classes="input_area", placeholder="send to uart", id="command_input" ) # Store reference yield self.input_field if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("-c", "--config", default="config.py", help="Path to config file") args = parser.parse_args() if not os.path.exists(args.config): print(f"Config file '{args.config}' not found. Creating an empty one...") with open(args.config, "w") as f: pass # Creates a blank file load_config(args.config) import config import functions config.CONFILE = args.config functions.set_config(config) app = LayoutApp() set_app_instance(app) # Pass the app instance to config app.run()