Newer
Older
glitch-o-bolt / glitch-o-bolt.py
#!/usr/bin/env python3
#
# glitch-o-matic 2.0 - Optimized Version
# Enhanced serial data performance while maintaining all existing features
#
# requirements: textual
import os
import sys
import time
import types
import argparse
import importlib.util
import concurrent.futures

import asyncio
import select
import serial
import functools

from scope import Scope
from textual import events
from textual.app import App, ComposeResult
from textual.containers import Container, Vertical, Horizontal, Grid
from textual.widgets import Static, DataTable, Input, Button, Switch, Log
from textual.messages import Message

# Define specific names for each control row
control_names = ["length", "repeat", "delay"]

def load_config(path):
    spec = importlib.util.spec_from_file_location("dynamic_config", path)
    module = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(module)
    
    # Inject the loaded config as 'config'
    sys.modules['config'] = module

class PersistentInput(Input):
    PREFIX = "$> "  # The permanent prefix

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.value = self.PREFIX  # Set initial value

    def on_input_changed(self, event: Input.Changed) -> None:
        """Ensure the prefix is always present."""
        if not event.value.startswith(self.PREFIX):
            self.value = self.PREFIX  # Restore the prefix
        elif len(event.value) < len(self.PREFIX):
            self.value = self.PREFIX  # Prevent deleting

def set_app_instance(app):
    functions.app_instance = app

class SerialDataMessage(Message):
    def __init__(self, data: str):
        super().__init__()  # Call the parent class constructor
        self.data = data  # Store the serial data

class LayoutApp(App):
    CSS_PATH = "style.tcss"

    async def on_ready(self) -> None:
        #set_app_instance(self)
        self.run_serial = True
        self.serial_buffer = ""  # Add buffer storage
        self.buffer_lock = asyncio.Lock()  # Add thread-safe lock
        try:
            functions.s = Scope()
        except IOError:
            s = None
            print("Warning: Scope not connected, running in simulation mode")
        
        # Start both serial tasks
        asyncio.create_task(self.connect_serial())
        asyncio.create_task(functions.monitor_buffer(self))
        asyncio.create_task(functions.glitch(self))
        functions.log_message("[DEBUG] Serial tasks created")

    async def connect_serial(self):
        """Stable serial connection with proper error handling"""
        switch_uart = self.query_one("#uart_switch")
        
        while self.run_serial:
            if switch_uart.value:
                if not getattr(self, '_serial_connected', False):
                    try:
                        # Close existing connection if any
                        if hasattr(self, 'serial_connection') and self.serial_connection:
                            self.serial_connection.close()
                        
                        # Establish new connection
                        self.serial_connection = functions.start_serial()
                        if self.serial_connection and self.serial_connection.is_open:
                            # Configure for reliable operation
                            self.serial_connection.timeout = 0.5
                            self.serial_connection.write_timeout = 1.0
                            self._serial_connected = True
                            functions.log_message("[SERIAL] Connected successfully")
                            asyncio.create_task(self.read_serial_loop())
                        else:
                            raise serial.SerialException("Connection failed")
                    except Exception as e:
                        self._serial_connected = False
                        functions.log_message(f"[SERIAL] Connection error: {str(e)}")
                        switch_uart.value = False
                        await asyncio.sleep(2)  # Wait before retrying
            else:
                if getattr(self, '_serial_connected', False):
                    if hasattr(self, 'serial_connection') and self.serial_connection:
                        self.serial_connection.close()
                    self._serial_connected = False
                    functions.log_message("[SERIAL] Disconnected")
            
            await asyncio.sleep(1)  # Check connection status periodically

    async def read_serial_loop(self):
        """Serial reading that perfectly preserves original line endings"""
        buffer = ""
        
        while self.run_serial and getattr(self, '_serial_connected', False):
            try:
                # Read available data (minimum 1 byte)
                data = await asyncio.get_event_loop().run_in_executor(
                    None,
                    lambda: self.serial_connection.read(max(1, self.serial_connection.in_waiting))
                )
                
                if data:
                    decoded = data.decode('utf-8', errors='ignore')
                    
                    # Store raw data in condition monitoring buffer
                    async with self.buffer_lock:
                        self.serial_buffer += decoded
                    
                    # Original character processing
                    for char in decoded:
                        if char == '\r':
                            continue
                        
                        buffer += char
                        
                        if char == '\n':
                            self.post_message(SerialDataMessage(buffer))
                            buffer = ""
                    
                    if buffer:
                        self.post_message(SerialDataMessage(buffer))
                        buffer = ""
                
                await asyncio.sleep(0.01)
                
            except serial.SerialException as e:
                functions.log_message(f"[SERIAL] Read error: {str(e)}")
                self._serial_connected = False
                break
            except Exception as e:
                functions.log_message(f"[SERIAL] Unexpected error: {str(e)}")
                await asyncio.sleep(0.1)

    async def monitor_conditions(self):
        """Background task to monitor serial buffer for conditions with debug"""
        debug = functions.DEBUG_MODE  # Set to False to disable debug logging after testing
        buffer_size = functions.get_conditions_buffer_size(debug)
        
        if debug:
            functions.log_message("[DEBUG] Starting condition monitor")
            functions.log_message(f"[DEBUG] Initial buffer size: {buffer_size}")
            functions.log_message(f"[DEBUG] Current conditions: {config.conditions}")
        
        while self.run_serial:
            if hasattr(self, '_serial_connected') and self._serial_connected:
                # Get a snapshot of the buffer contents
                async with self.buffer_lock:
                    current_buffer = self.serial_buffer
                    if debug and current_buffer:
                        functions.log_message(f"[DEBUG] Current buffer length: {len(current_buffer)}")
                    
                    # Keep reasonable buffer size
                    if len(current_buffer) > buffer_size * 2:
                        self.serial_buffer = current_buffer = current_buffer[-buffer_size*2:]
                        if debug:
                            functions.log_message(f"[DEBUG] Trimmed buffer to {len(current_buffer)} chars")
                
                # Check for conditions
                action = functions.check_conditions(self, current_buffer, debug)
                if action:
                    if debug:
                        functions.log_message(f"[DEBUG] Executing action: {action}")
                    functions.execute_condition_action(action, debug)
                elif debug and current_buffer:
                    functions.log_message("[DEBUG] No action triggered")
        
        await asyncio.sleep(0.1)  # Check 10 times per secon

    async def on_key(self, event: events.Key) -> None:
        """Handles input with proper newline preservation"""
        if event.key == "enter" and self.input_field.has_focus:
            text_to_send = self.input_field.value
            
            # Preserve exact input (don't strip) but ensure newline
            if not text_to_send.endswith('\n'):
                text_to_send += '\n'
            
            # Check if serial_connection exists and is open
            serial_connection = getattr(self, 'serial_connection', None)
            if serial_connection is not None and serial_connection.is_open:
                try:
                    # Send raw bytes exactly as entered
                    await asyncio.get_event_loop().run_in_executor(
                        None,
                        lambda: serial_connection.write(text_to_send.encode('utf-8'))
                    )
                    
                    # Echo to console with prefix
                    display_text = f"> {text_to_send.rstrip()}"
                    functions.add_text(display_text)
                    
                except Exception as e:
                    functions.log_message(f"[UART TX ERROR] {str(e)}")
                    functions.add_text(">> Failed to send")
            else:
                functions.add_text(">> Not sent - UART disconnected")
            
            self.input_field.value = ""
            event.prevent_default()

    async def on_serial_data_message(self, message: SerialDataMessage) -> None:
        """Display serial data exactly as received"""
        if hasattr(functions, 'text_area'):
            # Write the data exactly as it should appear
            if functions.get_config_value("uart_output_enabled") is True:
                functions.text_area.write(message.data)

            log_time = functions.get_config_value("log_time")
            if log_time > 0:
                functions.write_to_log(message.data, log_time)
            #functions.add_text(message.data)
            functions.text_area.scroll_end()

    def read_from_serial_sync(self):
        """Synchronous line reading with proper timeout handling"""
        if not self.serial_connection:
            return b""
        
        try:
            # Read with timeout
            ready, _, _ = select.select([self.serial_connection], [], [], 0.01)
            if ready:
                # Read until newline or buffer limit
                return self.serial_connection.read_until(b'\n', size=4096)
            return b""
        except Exception as e:
            functions.log_message(f"[ERROR] Serial read error: {str(e)}")
            return b""

    def on_button_pressed(self, event: Button.Pressed) -> None:
        button_id_parts = event.button.name
        parts = button_id_parts.split("-")
        button_id = parts[0]
        if(button_id == "exit_button"):
            functions.end_program() 
        if(button_id == "clear_button"):
            functions.clear_text()
        if(button_id == "btn_glitch"):
            functions.launch_glitch()
        if(button_id == "save_config"):
            functions.save_config(self)
        if(button_id == "toggle_trigger"):
            functions.toggle_trigger(self, int(parts[1])) 
        if(button_id == "change_val"):
            functions.on_button_pressed(self, event)
        if(button_id == "save_val"):
            functions.on_save_button_pressed(self, event) 
        if(button_id == "custom_function"):
            functions.run_custom_function(self, event) 
        if(button_id == "save_uart"):
            functions.save_uart_settings(self, event) 

    def on_switch_changed(self, event: Switch.Changed) -> None:
        """Handle switch toggle events"""
        switch = event.switch
        
        # Only handle switches with our specific class
        if "trigger-switch" in switch.classes:
            try:
                # Extract index from switch ID
                index = int(switch.id.split("_")[-1])
                new_state = bool(event.value)  # Ensure boolean
                config.triggers[index][1] = new_state  # Update config
                functions.set_triggers()                  
            except (ValueError, IndexError, AttributeError) as e:
                if functions.DEBUG_MODE:
                    functions.log_message(f"[ERROR] Failed to process trigger switch: {str(e)}")

        if "condition-switch" in switch.classes:
            try:
                # Extract index from switch ID
                index = int(switch.id.split("_")[-1])
                new_state = bool(event.value)  # Ensure boolean
                
                # Update config
                config.conditions[index][1] = new_state
                
                if functions.DEBUG_MODE:
                    functions.log_message(f"[CONDITION] Updated switch {index} to {new_state}")
                    functions.log_message(f"[CONDITION] Current states: {[cond[1] for cond in config.conditions]}")
                    
            except (ValueError, IndexError, AttributeError) as e:
                if functions.DEBUG_MODE:
                    functions.log_message(f"[ERROR] Failed to process condition switch: {str(e)}")

        if "logging-switch" in switch.classes:
            if functions.DEBUG_MODE:
                curr_time = functions.get_config_value("log_time")
                functions.log_message(f"[FUNCTION] logging toggled: {curr_time}")
            
            if bool(event.value) is True:
                functions.set_log_time(int(time.time()))  # Uses the 'time' module
            else:
                functions.set_log_time(0)
            
            main_content = self.query_one("#main_content")
            log_time = functions.get_config_value("log_time")  # Renamed to avoid conflict
            port = str(functions.get_config_value("serial_port"))
            baud = str(functions.get_config_value("baud_rate"))

            if log_time == 0:  # Now using 'log_time' instead of 'time'
                main_content.border_title = f"{port} {baud}"
            else:
                main_content.border_title = f"{port} {baud} \\[{log_time}.log]"

        if "uart-output-switch" in switch.classes:
            if bool(event.value) is True:
                functions.set_config_value("uart_output_enabled", True)
            else:
                functions.set_config_value("uart_output_enabled", False)
            functions.log_message(f"[FUNCTION] uart output toggled: {event.value}")



        if "glitch-switch" in switch.classes:
            if functions.DEBUG_MODE:
                curr_time = functions.get_config_value("glitch_time")
                functions.log_message(f"[FUNCTION] glitching toggled: {curr_time}")
            
            if bool(event.value) is True:
                functions.set_glitch_time(int(time.time()))  # Uses the 'time' module
            else:
                functions.set_glitch_time(0)

    def compose(self) -> ComposeResult:
        with Vertical(classes="top_section"):
            # Use Vertical here instead of Horizontal
            with Vertical(classes="top_left"):
                
                # UART Box - appears second (below)
                with Vertical(classes="uart_box") as uart_box:
                    uart_box.border_title = "uart settings"

                    with Horizontal(classes="onerow"):
                        yield Static("port:", classes="uart_label")
                        yield Input(
                            classes="control_input",
                            id="uart_port_input",
                            name="uart_port_input",
                            value=str(functions.get_config_value("serial_port"))
                        )

                    with Horizontal(classes="onerow"):
                        yield Static("baud:", classes="uart_label")
                        yield Input(
                            classes="control_input",
                            id="baud_rate_input",
                            name="baud_rate_input",
                            value=str(functions.get_config_value("baud_rate"))
                        )
                        yield Button("save", classes="btn_save", id="save_uart", name="save_uart")

                # Config Box - appears first (on top)
                with Vertical(classes="config_box") as config_box:
                    config_box.border_title = "config"

                    with Horizontal(classes="onerow"):
                        yield Static("file:", classes="uart_label")
                        yield Input(
                            classes="control_input",
                            id="config_file_input",
                            name="config_file_input",
                            value=str(functions.get_config_value("conFile"))
                        )
                    with Horizontal(classes="onerow"):
                        yield Button("save", classes="btn_save", id="save_config", name="save_config")

            yield Static("glitch-o-bolt v2.0", classes="program_name")
            yield Static(" ")  # Show blank space
            
            for name in control_names:
                with Horizontal(classes="control_row"):
                    yield Static(f"{name}:", classes="control_label")
                    for amount in [-100, -10, -1]:
                        yield Button(str(amount), classes=f"btn btn{amount}", name=f"change_val-{name}_{amount}")

                    yield Input(
                        classes="control_input",
                        value=str(functions.get_config_value(name)),  
                        type="integer",
                        id=f"{name}_input"  # Use `id` instead of `name`
                    )
                    yield Button("save", classes="btn_save", name=f"save_val-{name}_save")

                    for amount in [1, 10, 100]:
                        yield Button(f"+{amount}", classes=f"btn btn-{amount}",  name=f"change_val-{name}_{amount}")

            with Horizontal(classes="top_right"):
                with Vertical(classes="switch_box") as switch_box:
                    #yield Static("glitch", classes="switch_title")
                    yield Button("glitch", classes="btn_glitch", name=f"btn_glitch")
                    yield Switch(classes="glitch-switch", id="glitch-switch", animate=False)

                # Create and store DataTable for later updates
                self.status_box = DataTable(classes="top_box", name="status_box")
                self.status_box.border_title = "status"
                self.status_box.border_subtitle = "stopped"
                self.status_box.styles.border_subtitle_color = "#B13840"
                self.status_box.show_header = False
                self.status_box.show_cursor = False

                self.status_box.add_columns("Attribute", "Value")

                # Add rows for config values
                self.status_box.add_row(" length: ", str(functions.get_config_value("length")), key="row1")
                self.status_box.add_row(" repeat: ", str(functions.get_config_value("repeat")), key="row2")
                self.status_box.add_row("  delay: ", str(functions.get_config_value("delay")), key="row3")
                self.status_box.add_row("elapsed: ", str(functions.get_glitch_elapsed()), key="row4")

                yield self.status_box  # Yield the stored DataTable
        
        with Horizontal(classes="main_section"):
            with Vertical(classes="left_sidebar"):
                sidebar_content = Vertical(classes="sidebar_triggers_content")
                sidebar_content.border_title = "triggers"
                
                with sidebar_content:
                    with Grid(classes="sidebar_triggers"):                  
                        # Add rows with switches
                        functions.ensure_triggers_exist()
                        for i in range(8):
                            yield Static(f"{i} -")
                            yield Static(f"{functions.get_trigger_string(i)}", id=f"trigger_symbol_{i}", classes="sidebar_trigger_string")
                            yield Switch(
                                classes="trigger-switch sidebar_trigger_switch",
                                value=functions.get_trigger_value(i),
                                animate=False,
                                id=f"trigger_switch_{i}"
                            )
                            yield Button("^v-", classes="btn_toggle_1", name=f"toggle_trigger-{i}")

                
                if hasattr(config, "conditions") and config.conditions:
                    sidebar_content2 = Vertical(classes="sidebar_conditions_content")
                    sidebar_content2.border_title = "conditions"
                    sidebar_content2.styles.height = len(config.conditions) + 1

                    with sidebar_content2:
                        with Grid(classes="sidebar_conditions"):
                            for i in range(len(config.conditions)):
                                yield Static(f"{functions.get_condition_string(i)[:5]} ")
                                
                                if config.conditions[i][2] != "":
                                    yield Switch(
                                        id=f"condition_switch_{i}",
                                        classes="condition-switch sidebar_trigger_switch",  # Added specific class
                                        value=functions.get_condition_value(i),
                                        animate=False
                                    )
                                else:
                                    yield Static(" ")
                                    
                                yield Button("run", classes="btn_toggle_1", name=f"custom_function-{i}")
                sidebar_content3 = Vertical(classes="sidebar_settings_content")
                sidebar_content3.border_title = "misc"
                with sidebar_content3:
                    with Grid(classes="sidebar_settings_switches"):                  
                        # Add rows with switches
                        yield Static(f"uart enable")
                        yield Switch(classes="sidebar_trigger_switch", value=False, animate=False, id="uart_switch")

                        yield Static(f"uart output")
                        yield Switch(classes="uart-output-switch sidebar_trigger_switch", value=False, animate=False)

                        yield Static(f"logging")
                        yield Switch(classes="logging-switch sidebar_trigger_switch", value=False, animate=False)

                    # Centre the exit button
                    with Vertical(classes="centre_settings_buttons"):
                        yield Button("clear main", classes="btn_settings", name="clear_button")
                    with Vertical(classes="centre_settings_buttons"):    
                        yield Button("exit", classes="btn_settings", name="exit_button")


            global text_area  # Use global reference
            with Vertical(id="main_content", classes="main_content") as main_content:
                port = str(functions.get_config_value("serial_port"))
                baud = str(functions.get_config_value("baud_rate"))

                if functions.get_config_value("log_time") == 0:
                    main_content.border_title = f"{port} {baud}"
                else:
                    time = str(functions.get_config_value("log_time"))
                    main_content.border_title = f"{port} {baud} \\[{time}.log]"

                # Use Log() widget instead of TextArea for scrollable content
                functions.text_area = Log(classes="scrollable_log")
                yield functions.text_area  # Make it accessible later
                
                with Horizontal(classes="input_container") as input_row:
                    yield Static("$> ", classes="input_prompt")
                    self.input_field = Input(classes="input_area", placeholder="send to uart", id="command_input" )  # Store reference
                    yield self.input_field

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("-c", "--config", default="config.py", help="Path to config file")
    args = parser.parse_args()

    if not os.path.exists(args.config):
        print(f"Config file '{args.config}' not found. Creating an empty one...")
        with open(args.config, "w") as f:
            pass  # Creates a blank file

    load_config(args.config)
    import config
    import functions
    config.CONFILE = args.config
    functions.set_config(config)

    app = LayoutApp()
    set_app_instance(app)  # Pass the app instance to config
    app.run()