- import sys
- import os
- import re
- import time
- import serial
- import importlib
- from scope import Scope
- from textual.widgets import Button, Input, Switch
- from textual.containers import Vertical
-
- import asyncio
- import functions
-
- DEBUG_MODE = False
-
- app_instance = None # Global variable to store the app instance
- text_area = None # Store global reference to scrollable text area
- config = None # dynamic loading of config file
- log_time = 0 # timestamp for logfile
- glitch_time = 0 # timestamp for when glitching started
-
- try:
- s = Scope()
- except IOError:
- s = None
- print("Warning: Scope not connected, running in simulation mode")
-
- def set_config(cfg):
- global config
- config = cfg
-
- def set_app_instance(app):
- """Store the app instance for UART access"""
- global app_instance
- app_instance = app
-
- def log_message(message):
- if DEBUG_MODE:
- with open("debug.log", "a") as log_file:
- log_file.write(message + "\n")
-
- def set_log_time(value):
- global log_time
- log_time = value
-
- def set_glitch_time(value):
- global glitch_time
- glitch_time = value
-
- def get_config_value(name: str) -> int:
- """Return the latest value of the given config variable, and create them if they don't exist."""
- if name == "length":
- if not hasattr(config, "LENGTH"):
- config.LENGTH = 0 # Default value if not set
- return config.LENGTH
- elif name == "repeat":
- if not hasattr(config, "REPEAT"):
- config.REPEAT = 0 # Default value if not set
- return config.REPEAT
- elif name == "serial_port":
- if not hasattr(config, "SERIAL_PORT"):
- config.SERIAL_PORT = "/dev/ttyUSB0" # Default value if not set
- return config.SERIAL_PORT
- elif name == "baud_rate":
- if not hasattr(config, "BAUD_RATE"):
- config.BAUD_RATE = 115200 # Default value if not set
- return config.BAUD_RATE
- elif name == "delay":
- if not hasattr(config, "DELAY"):
- config.DELAY = 0 # Default value if not set
- return config.DELAY
- elif name == "log_time":
- return log_time # Return the module variable directly
- elif name == "glitch_time":
- return glitch_time # Return the module variable directly
- elif name == "conFile":
- if not hasattr(config, "CONFILE"):
- config.CONFILE = "config.py" # Or any suitable default
- return config.CONFILE
- elif name.startswith("trigger_"):
- if "_value" in name:
- index = int(name.split('_')[1])
- return config.triggers[index][0]
- elif "_state" in name:
- index = int(name.split('_')[1])
- return config.triggers[index][1]
- else:
- return 0 # Default fallback for unknown names
-
- def set_config_value(name: str, value: int):
-
- if hasattr(config, name.upper()):
- current_value = getattr(config, name.upper())
- setattr(config, name.upper(), value)
-
- # Update corresponding Input field
- input_field = app_instance.query_one(f"#{name}_input")
- input_field.value = str(value)
-
- # Update the status box row
- update_status_box(app_instance, name, value)
-
- # Refresh UI to reflect changes
- app_instance.refresh()
-
- def get_condition_string(index):
- """Returns the string from the triggers list at the given index."""
- if 0 <= index < len(config.conditions):
- return config.conditions[index][0] # Return the string value
- else:
- raise IndexError("Index out of range")
-
- def get_condition_value(index):
- """Returns the value from the triggers list at the given index."""
- if 0 <= index < len(config.conditions):
- return config.conditions[index][1] # Return the boolean value
- else:
- raise IndexError("Index out of range")
-
- def set_condition_value(index: int, value: bool) -> None:
- """Update switch state in config"""
- if 0 <= index < len(config.conditions):
- if app_instance.query(f"#condition_switch_{index}"):
- switch = app_instance.query_one(f"#condition_switch_{index}", Switch)
- switch.value = value # Force turn off
- else:
- raise IndexError("Index out of range")
-
- def ensure_triggers_exist():
- if not hasattr(config, "triggers") or not config.triggers or len(config.triggers) < 8:
- config.triggers = [["-", False] for _ in range(8)]
-
- def get_trigger_string(index):
- """Returns the string from the triggers list at the given index."""
- if 0 <= index < len(config.triggers):
- return config.triggers[index][0] # Return the string value
- else:
- raise IndexError("Index out of range")
-
- def get_trigger_value(index):
- """Returns the value from the triggers list at the given index."""
- if 0 <= index < len(config.triggers):
- return config.triggers[index][1] # Return the boolean value
- else:
- raise IndexError("Index out of range")
-
- def set_trigger_value(index, value):
- if 0 <= index < len(config.triggers):
- switch = app_instance.query_one(f"#trigger_switch_{index}", Switch)
- switch.value = value # Force turn off
- else:
- raise IndexError("Index out of range")
-
- def set_trigger_string(index: int, value: str):
- # Validate the input value
- valid_values = ["^", "v", "-"]
- if value not in valid_values:
- raise ValueError(f"Invalid trigger value. Must be one of {valid_values}")
-
- # Update config
- config.triggers[index][0] = value
- config.triggers[index][1] = False
-
- # Update the symbol display in the UI
- symbol_widget = app_instance.query_one(f"#trigger_symbol_{index}")
- symbol_widget.update(value)
-
- # Update the switch in the UI
- switch_widget = app_instance.query_one(f"#trigger_switch_{index}")
- switch_widget.value = False
-
- def toggle_trigger(self, index: int):
- current_symbol = config.triggers[index][0]
- cycle = ["^", "v", "-"]
- next_symbol = cycle[(cycle.index(current_symbol) + 1) % len(cycle)]
-
- # Update config
- config.triggers[index][0] = next_symbol
- config.triggers[index][1] = False
-
- # Update the symbol display in the UI
- symbol_widget = self.query_one(f"#trigger_symbol_{index}")
- symbol_widget.update(next_symbol)
-
- # Update the switch in the UI
- switch_widget = self.query_one(f"#trigger_switch_{index}")
- switch_widget.value = False
- log_message("next symbol: "+next_symbol)
-
- def set_uart_switch(state: bool | None = None) -> None:
- switch_uart = app_instance.query_one("#uart_switch")
- if state is None:
- switch_uart.value = not switch_uart.value # Toggle
- else:
- switch_uart.value = state # Set to specific state
-
- def modify_value(variable_name: str, amount: int) -> int:
- """
- Modify a global variable by a given amount.
-
- Args:
- variable_name (str): The name of the variable to modify.
- amount (int): The amount to increment or decrement.
-
- Returns:
- int: The updated value.
- """
- global config # Ensure we modify the variables from config.py
-
- if variable_name == "length":
- config.LENGTH += amount
- return config.LENGTH
- elif variable_name == "repeat":
- config.REPEAT += amount
- return config.REPEAT
- elif variable_name == "delay":
- config.DELAY += amount
- return config.DELAY
- else:
- raise ValueError(f"Unknown variable: {variable_name}")
-
- def on_button_pressed(app, event: Button.Pressed) -> None:
- """Handle button presses and update values dynamically."""
- button = event.button
- button_name = button.name
-
- if button_name:
- # Strip everything before the first hyphen, including the hyphen itself
- button_name = button_name.split("-", 1)[-1] # Get the part after the first hyphen
-
- parts = button_name.split("_")
- if len(parts) == 2:
- variable_name, amount = parts[0], int(parts[1])
-
- # Update the variable value in config.py
- if hasattr(config, variable_name.upper()):
- current_value = getattr(config, variable_name.upper())
- new_value = current_value + amount
- setattr(config, variable_name.upper(), new_value)
-
- # Update corresponding Input field
- input_field = app.query_one(f"#{variable_name}_input")
- input_field.value = str(new_value)
-
- # Update the status box row
- update_status_box(app, variable_name, new_value)
-
- # Refresh UI to reflect changes
- app.refresh()
-
- def on_save_button_pressed(app, event: Button.Pressed) -> None:
- """Handle the Save button press to save the values."""
- button = event.button
- button_name = button.name
-
- if button_name:
- variable_name = button_name.replace("save_val-", "")
- variable_name = variable_name.replace("_save", "") # Extract the variable name from button
- input_field = app.query_one(f"#{variable_name}_input", Input)
-
- new_value = int(input_field.value)
- setattr(config, variable_name.upper(), new_value)
-
- update_status_box(app, variable_name, new_value)
- app.refresh()
-
- def save_uart_settings(app, event: Button.Pressed) -> None:
-
- cur_uart_port = str(app.query_one(f"#uart_port_input", Input).value)
- cur_baud_rate = int(app.query_one(f"#baud_rate_input", Input).value)
-
- config.SERIAL_PORT = cur_uart_port
- config.BAUD_RATE = cur_baud_rate
-
- main_content = app.query_one(".main_content", Vertical)
- main_content.border_title = f"{config.SERIAL_PORT} {config.BAUD_RATE}"
- app.refresh()
-
- def change_baudrate(new_baudrate):
- """Change the baud rate using the app_instance's serial connection"""
- if app_instance is None:
- add_text("[ERROR] App instance not available")
- return False
-
- if not hasattr(app_instance, 'serial_connection'):
- add_text("[ERROR] No serial connection in app instance")
- return False
-
- input_field = app_instance.query_one(f"#baud_rate_input")
- input_field.value = str(new_baudrate)
-
- serial_conn = app_instance.serial_connection
-
- if serial_conn is None or not serial_conn.is_open:
- add_text("[ERROR] Serial port not initialized or closed")
- return False
-
- try:
- old_baudrate = serial_conn.baudrate
- serial_conn.baudrate = new_baudrate
- config.BAUD_RATE = new_baudrate
-
- main_content = app_instance.query_one(".main_content", Vertical)
- if functions.get_config_value("log_time") == 0:
- main_content.border_title = f"{config.SERIAL_PORT} {config.BAUD_RATE}"
- else:
- time = str(functions.get_config_value("log_time"))
- main_content.border_title = f"{config.SERIAL_PORT} {config.BAUD_RATE} \\[{time}.log]"
-
- return True
-
- except ValueError as e:
- add_text(f"[ERROR] Invalid baud rate {new_baudrate}: {e}")
- except serial.SerialException as e:
- add_text(f"[ERROR] Serial error changing baud rate: {e}")
- # Attempt to revert
- try:
- serial_conn.baudrate = old_baudrate
- except:
- add_text("[WARNING] Failed to revert baud rate")
- return False
-
- def update_status_box(app, variable_name, new_value):
- column_keys = list(app.status_box.columns.keys())
-
- # We only have two columns: "Attribute" and "Value"
- if variable_name == "length":
- row_key = list(app.status_box.rows.keys())[0] # The first row
- column_key = column_keys[1] # The Value column for 'length'
- elif variable_name == "repeat":
- row_key = list(app.status_box.rows.keys())[1] # The first row
- column_key = column_keys[1] # The Value column for 'repeat'
- elif variable_name == "delay":
- row_key = list(app.status_box.rows.keys())[2] # The first row
- column_key = column_keys[1] # The Value column for 'delay'
- elif variable_name == "elapsed":
- row_key = list(app.status_box.rows.keys())[3] # The first row
- column_key = column_keys[1] # The Value column for 'delay'
-
- app.status_box.update_cell(row_key, column_key, str(new_value))
-
- def run_custom_function(app, event):
- """Handle custom function buttons with enhanced logging"""
- button = event.button
- button_name = button.name
- debug = DEBUG_MODE # Set to False after testing
-
- log_message(f"[CUSTOM] Button pressed: '{button_name}'")
-
- if button_name:
- try:
- variable_name = int(button_name.replace("custom_function-", ""))
- log_message(f"[CUSTOM] Condition index: {variable_name}")
-
- if 0 <= variable_name < len(config.conditions):
- func_name = config.conditions[variable_name][3]
- log_message(f"[CUSTOM] Executing: {func_name}")
-
- # Use the centralized execution function
- success = execute_condition_action(func_name, debug)
-
- if not success:
- log_message(f"[CUSTOM] Failed to execute {func_name}")
- else:
- log_message(f"[CUSTOM] Invalid index: {variable_name}")
-
- except ValueError:
- log_message(f"[CUSTOM] Invalid button format: '{button_name}'")
- except Exception as e:
- log_message(f"[CUSTOM] Error: {str(e)}")
- if debug:
- log_message(f"[DEBUG] {traceback.format_exc()}")
-
- def write_to_log(text: str, log_time: int):
- """Write text to a log file named {log_time}.log in the logs directory"""
- # Create logs directory if it doesn't exist
- logs_dir = "logs"
- if not os.path.exists(logs_dir):
- os.makedirs(logs_dir)
-
- # Create filename using log_time value
- log_file = os.path.join(logs_dir, f"{log_time}.log")
-
- # Append text to log file
- with open(log_file, "a") as f:
- f.write(f"{text}")
-
- def add_text(text):
- """Add text to the log widget and optionally to a log file"""
- if hasattr(functions, 'text_area'):
- functions.text_area.write(text + "\n")
-
- log_time = get_config_value("log_time")
- if log_time > 0:
- write_to_log(text+"\n", log_time)
-
- def update_text(text):
- """Update text without adding newlines"""
- if hasattr(functions, 'text_area'):
- functions.text_area.write(text)
-
- def save_config(app):
- config_file = get_config_value("conFile")
- temp_file = config_file + ".tmp"
- new_file = str(app.query_one(f"#config_file_input", Input).value)
-
- try:
- # Get current values
- serial_port = get_config_value("serial_port")
- baud_rate = get_config_value("baud_rate")
- length = get_config_value("length")
- repeat = get_config_value("repeat")
- delay = get_config_value("delay")
-
- # Get triggers
- triggers = []
- for i in range(8):
- triggers.append([
- get_config_value(f"trigger_{i}_value"),
- get_config_value(f"trigger_{i}_state")
- ])
-
- # Read existing config
- existing_content = ""
- custom_functions = []
- imports = []
- if os.path.exists(config_file):
- with open(config_file, 'r') as f:
- existing_content = f.read()
-
- # Extract imports and functions
- import_pattern = re.compile(r'^import .+?$|^from .+? import .+?$', re.MULTILINE)
- imports = import_pattern.findall(existing_content)
-
- func_pattern = re.compile(r'^(def \w+\(.*?\):.*?)(?=^(?:def \w+\(|\Z))', re.MULTILINE | re.DOTALL)
- custom_functions = [fn.strip() for fn in func_pattern.findall(existing_content) if fn.strip()]
-
- # Write new config file
- with open(temp_file, 'w') as f:
- # Write imports
- if imports:
- f.write("######\n# LEAVE THESE IMPORTS!\n######\n")
- f.write("\n".join(imports) + "\n\n")
-
- # Write config values
- f.write("######\n# config values\n######\n\n")
- f.write(f"SERIAL_PORT = {repr(serial_port)}\n")
- f.write(f"BAUD_RATE = {baud_rate}\n\n")
- f.write(f"LENGTH = {length}\n")
- f.write(f"REPEAT = {repeat}\n")
- f.write(f"DELAY = {delay}\n\n")
-
- # Write triggers
- f.write("###\n# ^ = pullup, v = pulldown\n###\n")
- f.write("triggers = [\n")
- for i, (value, state) in enumerate(triggers):
- f.write(f" [{repr(value)}, {state}], #{i}\n")
- f.write("]\n")
-
- # Write conditions if they exist
- if hasattr(config, 'conditions') and config.conditions:
- f.write("\n###\n# name, enabled, string to match\n###\n")
- f.write("conditions = [\n")
- for condition in config.conditions:
- f.write(f" {condition},\n")
- f.write("]\n")
-
- # Write custom functions with proper spacing
- if custom_functions:
- f.write("\n######\n# Custom functions\n######\n")
- f.write("\n\n".join(custom_functions))
- f.write("\n") # Single newline at end
-
- # Finalize file
- if os.path.exists(new_file):
- os.remove(new_file)
- os.rename(temp_file, new_file)
- config.CONFILE = new_file
- add_text(f"[SAVED] config {new_file} saved")
-
- except Exception as e:
- log_message(f"Error saving config: {str(e)}")
- if os.path.exists(temp_file):
- os.remove(temp_file)
- raise
-
- def start_serial():
- try:
- ser = serial.Serial(
- port=config.SERIAL_PORT,
- baudrate=config.BAUD_RATE,
- timeout=0.1, # Read timeout (seconds)
- write_timeout=1.0, # Write timeout
- inter_byte_timeout=0.05, # Between bytes
- exclusive=True, # Prevent multiple access
- rtscts=True, # Enable hardware flow control
- dsrdtr=True # Additional flow control
- )
- add_text("Connected to serial port.")
- return ser
- except serial.SerialException as e:
- add_text(f"[ERROR] Serial exception: {e}")
- return None
-
- def send_uart_message(message):
- """Send a message via UART from anywhere in the application"""
- if not app_instance:
- log_message("[UART] Not sent - No app instance")
- return False
-
- if not hasattr(app_instance, 'serial_connection') or not app_instance.serial_connection.is_open:
- log_message("[UART] Not sent - UART disconnected")
- return False
-
- try:
- # Ensure message ends with newline if it's not empty
- if message and not message.endswith('\n'):
- message += '\n'
-
- # Send the message
- app_instance.serial_connection.write(message.encode('utf-8'))
- log_message(f"[UART] Sent: {message.strip()}")
- return True
- except Exception as e:
- log_message(f"[UART TX ERROR] {str(e)}")
- return False
-
- def get_conditions_buffer_size(debug=False):
- """Return the maximum length of condition strings with debug option"""
- if not hasattr(config, 'conditions') or not config.conditions:
- if debug:
- log_message("[DEBUG] No conditions defined, using default buffer size 256")
- return 256
-
- valid_lengths = [len(cond[2]) for cond in config.conditions if cond[2]]
- if not valid_lengths:
- if debug:
- log_message("[DEBUG] All condition strings are empty, using default buffer size 256")
- return 256
-
- max_size = max(valid_lengths)
- if debug:
- log_message(f"[DEBUG] Calculated buffer size: {max_size} (from {len(config.conditions)} conditions)")
- return max_size
-
- def check_conditions(self, buffer, debug=False):
- """Check buffer against all conditions by examining every position"""
- if debug:
- log_message(f"[DEBUG] Checking buffer ({len(buffer)} chars): {repr(buffer)}")
-
- if not hasattr(config, 'conditions') or not config.conditions:
- if debug:
- log_message("[DEBUG] No conditions to check against")
- return None
-
- for i, condition in enumerate(config.conditions):
- trigger_str = condition[2]
- if not trigger_str: # Skip empty trigger strings
- continue
-
- trigger_len = len(trigger_str)
- buffer_len = len(buffer)
-
- if debug:
- log_message(f"[DEBUG] Checking condition {i} for '{trigger_str}' (length: {trigger_len})")
-
- # Check every possible starting position in the buffer
- for pos in range(buffer_len - trigger_len + 1):
- # Compare slice of buffer with trigger string
- if buffer[pos:pos+trigger_len] == trigger_str:
- try:
- condition_active = config.conditions[i][1] # Get state from config
-
- if not condition_active:
- if debug:
- log_message(f"[DEBUG] Condition {i} matched at position {pos} but switch is OFF")
- continue
-
- if debug:
- log_message(f"[DEBUG] MATCHED condition {i} at position {pos}: {condition[0]} -> {condition[3]}")
- return condition[3]
-
- except Exception as e:
- if debug:
- log_message(f"[DEBUG] Condition check failed for {i}: {str(e)}")
- continue
-
- if debug:
- log_message("[DEBUG] No conditions matched")
- return None
-
- def execute_condition_action(action_name, debug=False):
- """Execute the named action function using run_custom_function logic"""
- if debug:
- log_message(f"[ACTION] Attempting to execute: {action_name}")
-
- try:
- # Check if action exists in config module
- module_name = 'config'
- module = importlib.import_module(module_name)
-
- if hasattr(module, action_name):
- if debug:
- log_message(f"[ACTION] Found {action_name} in {module_name}")
- getattr(module, action_name)()
- return True
-
- # Check if action exists in functions module
- if hasattr(sys.modules[__name__], action_name):
- if debug:
- log_message(f"[ACTION] Found {action_name} in functions")
- getattr(sys.modules[__name__], action_name)()
- return True
-
- # Check if action exists in globals
- if action_name in globals():
- if debug:
- log_message(f"[ACTION] Found {action_name} in globals")
- globals()[action_name]()
- return True
-
- log_message(f"[ACTION] Function '{action_name}' not found in any module")
- return False
-
- except Exception as e:
- log_message(f"[ACTION] Error executing {action_name}: {str(e)}")
- if debug:
- log_message(f"[DEBUG] Full exception: {traceback.format_exc()}")
- return False
-
- def get_glitch_elapsed():
- gtime = get_config_value("glitch_time")
- if gtime <= 0:
- return "000:00:00"
- # Assuming gtime contains the start timestamp
- elapsed = int(time.time() - gtime)
- return f"{elapsed//3600:03d}:{(elapsed%3600)//60:02d}:{elapsed%60:02d}"
-
- def start_glitch(glitch_len, trigger_repeats, delay):
- s.glitch.repeat = glitch_len
- s.glitch.ext_offset = delay
- #add_text(f"[GLITCHING]: length:{glitch_len}, offset:{delay}, repeat:{trigger_repeats}")
-
- triggers = [] # Get triggers
- triggers_set = False
- for i in range(8):
- triggers.append([
- get_config_value(f"trigger_{i}_value"),
- get_config_value(f"trigger_{i}_state")
- ])
- for i, (value, state) in enumerate(triggers):
- if state is True:
- triggers_set = True
- if value == "^":
- #add_text(f"[GLITCHING]: armed: {i} ^")
- s.arm(i, Scope.RISING_EDGE)
- elif value == "v":
- #add_text(f"[GLITCHING]: armed: {i} v")
- s.arm(i, Scope.FALLING_EDGE)
-
- if triggers_set is False:
- #add_text(f"[GLITCHING]: repeat:{trigger_repeats}")
- for _ in range(trigger_repeats):
- s.trigger()
-
- def launch_glitch():
- length = functions.get_config_value("length")
- repeat = functions.get_config_value("repeat")
- delay = functions.get_config_value("delay")
- start_glitch(length, repeat, delay)
-
- async def glitch(self):
- functions.log_message("[GLITCHING] Starting glitch monitor")
- previous_gtime = None # Track the previous state
-
- while True:
- try:
- gtime = get_config_value("glitch_time")
- elapsed_time = get_glitch_elapsed()
- functions.update_status_box(self, "elapsed", elapsed_time)
-
- # Only update if the state has changed
- #if gtime != previous_gtime:
- if gtime > 0:
- self.status_box.border_subtitle = "running"
- self.status_box.styles.border_subtitle_color = "#5E99AE"
- self.status_box.styles.border_subtitle_style = "bold"
-
- length = functions.get_config_value("length")
- repeat = functions.get_config_value("repeat")
- delay = functions.get_config_value("delay")
- start_glitch(length, repeat, delay)
- else:
- self.status_box.border_subtitle = "stopped"
- self.status_box.styles.border_subtitle_color = "#B13840"
- self.status_box.styles.border_subtitle_style = "none"
-
- #previous_gtime = gtime # Update the previous state
-
- except Exception as e:
- print(f"Update error: {e}")
-
- await asyncio.sleep(0.1)
-
- def glitching_switch(value):
- switch = app_instance.query_one("#glitch-switch", Switch)
- switch.value = value # Force turn off
-
- def run_output_high(gpio, time):
- s.io.add(gpio, 1, delay=time)
- s.io.upload()
- s.trigger()
-
- def run_output_low(gpio, time):
- s.io.add(gpio, 0, delay=time)
- s.io.upload()
- s.trigger()
-
- async def monitor_buffer(self):
- """Background task to monitor serial buffer for conditions"""
- debug = True
- buffer_size = functions.get_conditions_buffer_size(debug)
-
- functions.log_message("[CONDITIONS] Starting monitor")
-
- while self.run_serial:
- if not getattr(self, '_serial_connected', False):
- await asyncio.sleep(1)
- continue
-
- async with self.buffer_lock:
- current_buffer = self.serial_buffer
- max_keep = buffer_size * 3 # Keep enough buffer to catch split matches
-
- if len(current_buffer) > max_keep:
- # Keep last max_keep characters, but ensure we don't cut a potential match
- keep_from = len(current_buffer) - max_keep
- # Find the last newline before this position to avoid breaking lines
- safe_cut = current_buffer.rfind('\n', 0, keep_from)
- if safe_cut != -1:
- keep_from = safe_cut + 1
- self.serial_buffer = current_buffer[keep_from:]
- current_buffer = self.serial_buffer
- if debug:
- log_message(f"[DEBUG] Truncated buffer from {len(current_buffer)+keep_from} to {len(current_buffer)} chars")
-
- if current_buffer:
- action = functions.check_conditions(self, current_buffer, debug)
- if action:
- functions.log_message(f"[CONDITIONS] Triggering: {action}")
- success = functions.execute_condition_action(action, debug)
-
- if success:
- async with self.buffer_lock:
- # Clear the buffer after successful match
- self.serial_buffer = ""
- else:
- functions.log_message("[CONDITIONS] Action failed")
-
- await asyncio.sleep(0.1)
-
- def clear_text():
- text_area.clear()
-
- def end_program():
- exit()