Newer
Older
glitch-o-bolt / functions.py
  1. import sys
  2. import os
  3. import re
  4. import time
  5. import serial
  6. import importlib
  7. from scope import Scope
  8. from textual.widgets import Button, Input, Switch
  9. from textual.containers import Vertical
  10.  
  11. import asyncio
  12. import functions
  13.  
  14. DEBUG_MODE = False
  15.  
  16. app_instance = None # Global variable to store the app instance
  17. text_area = None # Store global reference to scrollable text area
  18. config = None # dynamic loading of config file
  19. log_time = 0 # timestamp for logfile
  20. glitch_time = 0 # timestamp for when glitching started
  21.  
  22. try:
  23. s = Scope()
  24. except IOError:
  25. s = None
  26. print("Warning: Scope not connected, running in simulation mode")
  27.  
  28. def set_config(cfg):
  29. global config
  30. config = cfg
  31.  
  32. def set_app_instance(app):
  33. """Store the app instance for UART access"""
  34. global app_instance
  35. app_instance = app
  36.  
  37. def log_message(message):
  38. if DEBUG_MODE:
  39. with open("debug.log", "a") as log_file:
  40. log_file.write(message + "\n")
  41.  
  42. def set_log_time(value):
  43. global log_time
  44. log_time = value
  45.  
  46. def set_glitch_time(value):
  47. global glitch_time
  48. glitch_time = value
  49.  
  50. def get_config_value(name: str) -> int:
  51. """Return the latest value of the given config variable, and create them if they don't exist."""
  52. if name == "length":
  53. if not hasattr(config, "LENGTH"):
  54. config.LENGTH = 0 # Default value if not set
  55. return config.LENGTH
  56. elif name == "repeat":
  57. if not hasattr(config, "REPEAT"):
  58. config.REPEAT = 0 # Default value if not set
  59. return config.REPEAT
  60. elif name == "serial_port":
  61. if not hasattr(config, "SERIAL_PORT"):
  62. config.SERIAL_PORT = "/dev/ttyUSB0" # Default value if not set
  63. return config.SERIAL_PORT
  64. elif name == "baud_rate":
  65. if not hasattr(config, "BAUD_RATE"):
  66. config.BAUD_RATE = 115200 # Default value if not set
  67. return config.BAUD_RATE
  68. elif name == "delay":
  69. if not hasattr(config, "DELAY"):
  70. config.DELAY = 0 # Default value if not set
  71. return config.DELAY
  72. elif name == "log_time":
  73. return log_time # Return the module variable directly
  74. elif name == "glitch_time":
  75. return glitch_time # Return the module variable directly
  76. elif name == "conFile":
  77. if not hasattr(config, "CONFILE"):
  78. config.CONFILE = "config.py" # Or any suitable default
  79. return config.CONFILE
  80. elif name.startswith("trigger_"):
  81. if "_value" in name:
  82. index = int(name.split('_')[1])
  83. return config.triggers[index][0]
  84. elif "_state" in name:
  85. index = int(name.split('_')[1])
  86. return config.triggers[index][1]
  87. else:
  88. return 0 # Default fallback for unknown names
  89.  
  90. def set_config_value(name: str, value: int):
  91.  
  92. if hasattr(config, name.upper()):
  93. current_value = getattr(config, name.upper())
  94. setattr(config, name.upper(), value)
  95.  
  96. # Update corresponding Input field
  97. input_field = app_instance.query_one(f"#{name}_input")
  98. input_field.value = str(value)
  99.  
  100. # Update the status box row
  101. update_status_box(app_instance, name, value)
  102.  
  103. # Refresh UI to reflect changes
  104. app_instance.refresh()
  105.  
  106. def get_condition_string(index):
  107. """Returns the string from the triggers list at the given index."""
  108. if 0 <= index < len(config.conditions):
  109. return config.conditions[index][0] # Return the string value
  110. else:
  111. raise IndexError("Index out of range")
  112.  
  113. def get_condition_value(index):
  114. """Returns the value from the triggers list at the given index."""
  115. if 0 <= index < len(config.conditions):
  116. return config.conditions[index][1] # Return the boolean value
  117. else:
  118. raise IndexError("Index out of range")
  119.  
  120. def set_condition_value(index: int, value: bool) -> None:
  121. """Update switch state in config"""
  122. if 0 <= index < len(config.conditions):
  123. if app_instance.query(f"#condition_switch_{index}"):
  124. switch = app_instance.query_one(f"#condition_switch_{index}", Switch)
  125. switch.value = value # Force turn off
  126. else:
  127. raise IndexError("Index out of range")
  128.  
  129. def ensure_triggers_exist():
  130. if not hasattr(config, "triggers") or not config.triggers or len(config.triggers) < 8:
  131. config.triggers = [["-", False] for _ in range(8)]
  132.  
  133. def get_trigger_string(index):
  134. """Returns the string from the triggers list at the given index."""
  135. if 0 <= index < len(config.triggers):
  136. return config.triggers[index][0] # Return the string value
  137. else:
  138. raise IndexError("Index out of range")
  139.  
  140. def get_trigger_value(index):
  141. """Returns the value from the triggers list at the given index."""
  142. if 0 <= index < len(config.triggers):
  143. return config.triggers[index][1] # Return the boolean value
  144. else:
  145. raise IndexError("Index out of range")
  146.  
  147. def set_trigger_value(index, value):
  148. if 0 <= index < len(config.triggers):
  149. switch = app_instance.query_one(f"#trigger_switch_{index}", Switch)
  150. switch.value = value # Force turn off
  151. else:
  152. raise IndexError("Index out of range")
  153.  
  154. def set_trigger_string(index: int, value: str):
  155. # Validate the input value
  156. valid_values = ["^", "v", "-"]
  157. if value not in valid_values:
  158. raise ValueError(f"Invalid trigger value. Must be one of {valid_values}")
  159.  
  160. # Update config
  161. config.triggers[index][0] = value
  162. config.triggers[index][1] = False
  163.  
  164. # Update the symbol display in the UI
  165. symbol_widget = app_instance.query_one(f"#trigger_symbol_{index}")
  166. symbol_widget.update(value)
  167.  
  168. # Update the switch in the UI
  169. switch_widget = app_instance.query_one(f"#trigger_switch_{index}")
  170. switch_widget.value = False
  171.  
  172. def toggle_trigger(self, index: int):
  173. current_symbol = config.triggers[index][0]
  174. cycle = ["^", "v", "-"]
  175. next_symbol = cycle[(cycle.index(current_symbol) + 1) % len(cycle)]
  176.  
  177. # Update config
  178. config.triggers[index][0] = next_symbol
  179. config.triggers[index][1] = False
  180.  
  181. # Update the symbol display in the UI
  182. symbol_widget = self.query_one(f"#trigger_symbol_{index}")
  183. symbol_widget.update(next_symbol)
  184.  
  185. # Update the switch in the UI
  186. switch_widget = self.query_one(f"#trigger_switch_{index}")
  187. switch_widget.value = False
  188. log_message("next symbol: "+next_symbol)
  189.  
  190. def set_uart_switch(state: bool | None = None) -> None:
  191. switch_uart = app_instance.query_one("#uart_switch")
  192. if state is None:
  193. switch_uart.value = not switch_uart.value # Toggle
  194. else:
  195. switch_uart.value = state # Set to specific state
  196.  
  197. def modify_value(variable_name: str, amount: int) -> int:
  198. """
  199. Modify a global variable by a given amount.
  200. Args:
  201. variable_name (str): The name of the variable to modify.
  202. amount (int): The amount to increment or decrement.
  203.  
  204. Returns:
  205. int: The updated value.
  206. """
  207. global config # Ensure we modify the variables from config.py
  208.  
  209. if variable_name == "length":
  210. config.LENGTH += amount
  211. return config.LENGTH
  212. elif variable_name == "repeat":
  213. config.REPEAT += amount
  214. return config.REPEAT
  215. elif variable_name == "delay":
  216. config.DELAY += amount
  217. return config.DELAY
  218. else:
  219. raise ValueError(f"Unknown variable: {variable_name}")
  220.  
  221. def on_button_pressed(app, event: Button.Pressed) -> None:
  222. """Handle button presses and update values dynamically."""
  223. button = event.button
  224. button_name = button.name
  225.  
  226. if button_name:
  227. # Strip everything before the first hyphen, including the hyphen itself
  228. button_name = button_name.split("-", 1)[-1] # Get the part after the first hyphen
  229. parts = button_name.split("_")
  230. if len(parts) == 2:
  231. variable_name, amount = parts[0], int(parts[1])
  232.  
  233. # Update the variable value in config.py
  234. if hasattr(config, variable_name.upper()):
  235. current_value = getattr(config, variable_name.upper())
  236. new_value = current_value + amount
  237. setattr(config, variable_name.upper(), new_value)
  238.  
  239. # Update corresponding Input field
  240. input_field = app.query_one(f"#{variable_name}_input")
  241. input_field.value = str(new_value)
  242.  
  243. # Update the status box row
  244. update_status_box(app, variable_name, new_value)
  245.  
  246. # Refresh UI to reflect changes
  247. app.refresh()
  248.  
  249. def on_save_button_pressed(app, event: Button.Pressed) -> None:
  250. """Handle the Save button press to save the values."""
  251. button = event.button
  252. button_name = button.name
  253.  
  254. if button_name:
  255. variable_name = button_name.replace("save_val-", "")
  256. variable_name = variable_name.replace("_save", "") # Extract the variable name from button
  257. input_field = app.query_one(f"#{variable_name}_input", Input)
  258.  
  259. new_value = int(input_field.value)
  260. setattr(config, variable_name.upper(), new_value)
  261. update_status_box(app, variable_name, new_value)
  262. app.refresh()
  263.  
  264. def save_uart_settings(app, event: Button.Pressed) -> None:
  265.  
  266. cur_uart_port = str(app.query_one(f"#uart_port_input", Input).value)
  267. cur_baud_rate = int(app.query_one(f"#baud_rate_input", Input).value)
  268.  
  269. config.SERIAL_PORT = cur_uart_port
  270. config.BAUD_RATE = cur_baud_rate
  271.  
  272. main_content = app.query_one(".main_content", Vertical)
  273. main_content.border_title = f"{config.SERIAL_PORT} {config.BAUD_RATE}"
  274. app.refresh()
  275.  
  276. def change_baudrate(new_baudrate):
  277. """Change the baud rate using the app_instance's serial connection"""
  278. if app_instance is None:
  279. add_text("[ERROR] App instance not available")
  280. return False
  281. if not hasattr(app_instance, 'serial_connection'):
  282. add_text("[ERROR] No serial connection in app instance")
  283. return False
  284.  
  285. input_field = app_instance.query_one(f"#baud_rate_input")
  286. input_field.value = str(new_baudrate)
  287. serial_conn = app_instance.serial_connection
  288. if serial_conn is None or not serial_conn.is_open:
  289. add_text("[ERROR] Serial port not initialized or closed")
  290. return False
  291. try:
  292. old_baudrate = serial_conn.baudrate
  293. serial_conn.baudrate = new_baudrate
  294. config.BAUD_RATE = new_baudrate
  295.  
  296. main_content = app_instance.query_one(".main_content", Vertical)
  297. if functions.get_config_value("log_time") == 0:
  298. main_content.border_title = f"{config.SERIAL_PORT} {config.BAUD_RATE}"
  299. else:
  300. time = str(functions.get_config_value("log_time"))
  301. main_content.border_title = f"{config.SERIAL_PORT} {config.BAUD_RATE} \\[{time}.log]"
  302. return True
  303. except ValueError as e:
  304. add_text(f"[ERROR] Invalid baud rate {new_baudrate}: {e}")
  305. except serial.SerialException as e:
  306. add_text(f"[ERROR] Serial error changing baud rate: {e}")
  307. # Attempt to revert
  308. try:
  309. serial_conn.baudrate = old_baudrate
  310. except:
  311. add_text("[WARNING] Failed to revert baud rate")
  312. return False
  313.  
  314. def update_status_box(app, variable_name, new_value):
  315. column_keys = list(app.status_box.columns.keys())
  316.  
  317. # We only have two columns: "Attribute" and "Value"
  318. if variable_name == "length":
  319. row_key = list(app.status_box.rows.keys())[0] # The first row
  320. column_key = column_keys[1] # The Value column for 'length'
  321. elif variable_name == "repeat":
  322. row_key = list(app.status_box.rows.keys())[1] # The first row
  323. column_key = column_keys[1] # The Value column for 'repeat'
  324. elif variable_name == "delay":
  325. row_key = list(app.status_box.rows.keys())[2] # The first row
  326. column_key = column_keys[1] # The Value column for 'delay'
  327. elif variable_name == "elapsed":
  328. row_key = list(app.status_box.rows.keys())[3] # The first row
  329. column_key = column_keys[1] # The Value column for 'delay'
  330.  
  331. app.status_box.update_cell(row_key, column_key, str(new_value))
  332.  
  333. def run_custom_function(app, event):
  334. """Handle custom function buttons with enhanced logging"""
  335. button = event.button
  336. button_name = button.name
  337. debug = DEBUG_MODE # Set to False after testing
  338.  
  339. log_message(f"[CUSTOM] Button pressed: '{button_name}'")
  340.  
  341. if button_name:
  342. try:
  343. variable_name = int(button_name.replace("custom_function-", ""))
  344. log_message(f"[CUSTOM] Condition index: {variable_name}")
  345.  
  346. if 0 <= variable_name < len(config.conditions):
  347. func_name = config.conditions[variable_name][3]
  348. log_message(f"[CUSTOM] Executing: {func_name}")
  349. # Use the centralized execution function
  350. success = execute_condition_action(func_name, debug)
  351. if not success:
  352. log_message(f"[CUSTOM] Failed to execute {func_name}")
  353. else:
  354. log_message(f"[CUSTOM] Invalid index: {variable_name}")
  355.  
  356. except ValueError:
  357. log_message(f"[CUSTOM] Invalid button format: '{button_name}'")
  358. except Exception as e:
  359. log_message(f"[CUSTOM] Error: {str(e)}")
  360. if debug:
  361. log_message(f"[DEBUG] {traceback.format_exc()}")
  362.  
  363. def write_to_log(text: str, log_time: int):
  364. """Write text to a log file named {log_time}.log in the logs directory"""
  365. # Create logs directory if it doesn't exist
  366. logs_dir = "logs"
  367. if not os.path.exists(logs_dir):
  368. os.makedirs(logs_dir)
  369. # Create filename using log_time value
  370. log_file = os.path.join(logs_dir, f"{log_time}.log")
  371. # Append text to log file
  372. with open(log_file, "a") as f:
  373. f.write(f"{text}")
  374.  
  375. def add_text(text):
  376. """Add text to the log widget and optionally to a log file"""
  377. if hasattr(functions, 'text_area'):
  378. functions.text_area.write(text + "\n")
  379. log_time = get_config_value("log_time")
  380. if log_time > 0:
  381. write_to_log(text+"\n", log_time)
  382.  
  383. def update_text(text):
  384. """Update text without adding newlines"""
  385. if hasattr(functions, 'text_area'):
  386. functions.text_area.write(text)
  387.  
  388. def save_config(app):
  389. config_file = get_config_value("conFile")
  390. temp_file = config_file + ".tmp"
  391. new_file = str(app.query_one(f"#config_file_input", Input).value)
  392. try:
  393. # Get current values
  394. serial_port = get_config_value("serial_port")
  395. baud_rate = get_config_value("baud_rate")
  396. length = get_config_value("length")
  397. repeat = get_config_value("repeat")
  398. delay = get_config_value("delay")
  399. # Get triggers
  400. triggers = []
  401. for i in range(8):
  402. triggers.append([
  403. get_config_value(f"trigger_{i}_value"),
  404. get_config_value(f"trigger_{i}_state")
  405. ])
  406. # Read existing config
  407. existing_content = ""
  408. custom_functions = []
  409. imports = []
  410. if os.path.exists(config_file):
  411. with open(config_file, 'r') as f:
  412. existing_content = f.read()
  413. # Extract imports and functions
  414. import_pattern = re.compile(r'^import .+?$|^from .+? import .+?$', re.MULTILINE)
  415. imports = import_pattern.findall(existing_content)
  416. func_pattern = re.compile(r'^(def \w+\(.*?\):.*?)(?=^(?:def \w+\(|\Z))', re.MULTILINE | re.DOTALL)
  417. custom_functions = [fn.strip() for fn in func_pattern.findall(existing_content) if fn.strip()]
  418. # Write new config file
  419. with open(temp_file, 'w') as f:
  420. # Write imports
  421. if imports:
  422. f.write("######\n# LEAVE THESE IMPORTS!\n######\n")
  423. f.write("\n".join(imports) + "\n\n")
  424. # Write config values
  425. f.write("######\n# config values\n######\n\n")
  426. f.write(f"SERIAL_PORT = {repr(serial_port)}\n")
  427. f.write(f"BAUD_RATE = {baud_rate}\n\n")
  428. f.write(f"LENGTH = {length}\n")
  429. f.write(f"REPEAT = {repeat}\n")
  430. f.write(f"DELAY = {delay}\n\n")
  431. # Write triggers
  432. f.write("###\n# ^ = pullup, v = pulldown\n###\n")
  433. f.write("triggers = [\n")
  434. for i, (value, state) in enumerate(triggers):
  435. f.write(f" [{repr(value)}, {state}], #{i}\n")
  436. f.write("]\n")
  437. # Write conditions if they exist
  438. if hasattr(config, 'conditions') and config.conditions:
  439. f.write("\n###\n# name, enabled, string to match\n###\n")
  440. f.write("conditions = [\n")
  441. for condition in config.conditions:
  442. f.write(f" {condition},\n")
  443. f.write("]\n")
  444. # Write custom functions with proper spacing
  445. if custom_functions:
  446. f.write("\n######\n# Custom functions\n######\n")
  447. f.write("\n\n".join(custom_functions))
  448. f.write("\n") # Single newline at end
  449. # Finalize file
  450. if os.path.exists(new_file):
  451. os.remove(new_file)
  452. os.rename(temp_file, new_file)
  453. config.CONFILE = new_file
  454. add_text(f"[SAVED] config {new_file} saved")
  455. except Exception as e:
  456. log_message(f"Error saving config: {str(e)}")
  457. if os.path.exists(temp_file):
  458. os.remove(temp_file)
  459. raise
  460.  
  461. def start_serial():
  462. try:
  463. ser = serial.Serial(
  464. port=config.SERIAL_PORT,
  465. baudrate=config.BAUD_RATE,
  466. timeout=0.1, # Read timeout (seconds)
  467. write_timeout=1.0, # Write timeout
  468. inter_byte_timeout=0.05, # Between bytes
  469. exclusive=True, # Prevent multiple access
  470. rtscts=True, # Enable hardware flow control
  471. dsrdtr=True # Additional flow control
  472. )
  473. add_text("Connected to serial port.")
  474. return ser
  475. except serial.SerialException as e:
  476. add_text(f"[ERROR] Serial exception: {e}")
  477. return None
  478.  
  479. def send_uart_message(message):
  480. """Send a message via UART from anywhere in the application"""
  481. if not app_instance:
  482. log_message("[UART] Not sent - No app instance")
  483. return False
  484. if not hasattr(app_instance, 'serial_connection') or not app_instance.serial_connection.is_open:
  485. log_message("[UART] Not sent - UART disconnected")
  486. return False
  487. try:
  488. # Ensure message ends with newline if it's not empty
  489. if message and not message.endswith('\n'):
  490. message += '\n'
  491. # Send the message
  492. app_instance.serial_connection.write(message.encode('utf-8'))
  493. log_message(f"[UART] Sent: {message.strip()}")
  494. return True
  495. except Exception as e:
  496. log_message(f"[UART TX ERROR] {str(e)}")
  497. return False
  498.  
  499. def get_conditions_buffer_size(debug=False):
  500. """Return the maximum length of condition strings with debug option"""
  501. if not hasattr(config, 'conditions') or not config.conditions:
  502. if debug:
  503. log_message("[DEBUG] No conditions defined, using default buffer size 256")
  504. return 256
  505. valid_lengths = [len(cond[2]) for cond in config.conditions if cond[2]]
  506. if not valid_lengths:
  507. if debug:
  508. log_message("[DEBUG] All condition strings are empty, using default buffer size 256")
  509. return 256
  510. max_size = max(valid_lengths)
  511. if debug:
  512. log_message(f"[DEBUG] Calculated buffer size: {max_size} (from {len(config.conditions)} conditions)")
  513. return max_size
  514.  
  515. def check_conditions(self, buffer, debug=False):
  516. """Check buffer against all conditions by examining every position"""
  517. if debug:
  518. log_message(f"[DEBUG] Checking buffer ({len(buffer)} chars): {repr(buffer)}")
  519. if not hasattr(config, 'conditions') or not config.conditions:
  520. if debug:
  521. log_message("[DEBUG] No conditions to check against")
  522. return None
  523. for i, condition in enumerate(config.conditions):
  524. trigger_str = condition[2]
  525. if not trigger_str: # Skip empty trigger strings
  526. continue
  527. trigger_len = len(trigger_str)
  528. buffer_len = len(buffer)
  529. if debug:
  530. log_message(f"[DEBUG] Checking condition {i} for '{trigger_str}' (length: {trigger_len})")
  531. # Check every possible starting position in the buffer
  532. for pos in range(buffer_len - trigger_len + 1):
  533. # Compare slice of buffer with trigger string
  534. if buffer[pos:pos+trigger_len] == trigger_str:
  535. try:
  536. condition_active = config.conditions[i][1] # Get state from config
  537. if not condition_active:
  538. if debug:
  539. log_message(f"[DEBUG] Condition {i} matched at position {pos} but switch is OFF")
  540. continue
  541. if debug:
  542. log_message(f"[DEBUG] MATCHED condition {i} at position {pos}: {condition[0]} -> {condition[3]}")
  543. return condition[3]
  544. except Exception as e:
  545. if debug:
  546. log_message(f"[DEBUG] Condition check failed for {i}: {str(e)}")
  547. continue
  548. if debug:
  549. log_message("[DEBUG] No conditions matched")
  550. return None
  551.  
  552. def execute_condition_action(action_name, debug=False):
  553. """Execute the named action function using run_custom_function logic"""
  554. if debug:
  555. log_message(f"[ACTION] Attempting to execute: {action_name}")
  556. try:
  557. # Check if action exists in config module
  558. module_name = 'config'
  559. module = importlib.import_module(module_name)
  560. if hasattr(module, action_name):
  561. if debug:
  562. log_message(f"[ACTION] Found {action_name} in {module_name}")
  563. getattr(module, action_name)()
  564. return True
  565. # Check if action exists in functions module
  566. if hasattr(sys.modules[__name__], action_name):
  567. if debug:
  568. log_message(f"[ACTION] Found {action_name} in functions")
  569. getattr(sys.modules[__name__], action_name)()
  570. return True
  571. # Check if action exists in globals
  572. if action_name in globals():
  573. if debug:
  574. log_message(f"[ACTION] Found {action_name} in globals")
  575. globals()[action_name]()
  576. return True
  577. log_message(f"[ACTION] Function '{action_name}' not found in any module")
  578. return False
  579. except Exception as e:
  580. log_message(f"[ACTION] Error executing {action_name}: {str(e)}")
  581. if debug:
  582. log_message(f"[DEBUG] Full exception: {traceback.format_exc()}")
  583. return False
  584.  
  585. def get_glitch_elapsed():
  586. gtime = get_config_value("glitch_time")
  587. if gtime <= 0:
  588. return "000:00:00"
  589. # Assuming gtime contains the start timestamp
  590. elapsed = int(time.time() - gtime)
  591. return f"{elapsed//3600:03d}:{(elapsed%3600)//60:02d}:{elapsed%60:02d}"
  592.  
  593. def start_glitch(glitch_len, trigger_repeats, delay):
  594. s.glitch.repeat = glitch_len
  595. s.glitch.ext_offset = delay
  596. #add_text(f"[GLITCHING]: length:{glitch_len}, offset:{delay}, repeat:{trigger_repeats}")
  597. triggers = [] # Get triggers
  598. triggers_set = False
  599. for i in range(8):
  600. triggers.append([
  601. get_config_value(f"trigger_{i}_value"),
  602. get_config_value(f"trigger_{i}_state")
  603. ])
  604. for i, (value, state) in enumerate(triggers):
  605. if state is True:
  606. triggers_set = True
  607. if value == "^":
  608. #add_text(f"[GLITCHING]: armed: {i} ^")
  609. s.arm(i, Scope.RISING_EDGE)
  610. elif value == "v":
  611. #add_text(f"[GLITCHING]: armed: {i} v")
  612. s.arm(i, Scope.FALLING_EDGE)
  613.  
  614. if triggers_set is False:
  615. #add_text(f"[GLITCHING]: repeat:{trigger_repeats}")
  616. for _ in range(trigger_repeats):
  617. s.trigger()
  618.  
  619. def launch_glitch():
  620. length = functions.get_config_value("length")
  621. repeat = functions.get_config_value("repeat")
  622. delay = functions.get_config_value("delay")
  623. start_glitch(length, repeat, delay)
  624.  
  625. async def glitch(self):
  626. functions.log_message("[GLITCHING] Starting glitch monitor")
  627. previous_gtime = None # Track the previous state
  628. while True:
  629. try:
  630. gtime = get_config_value("glitch_time")
  631. elapsed_time = get_glitch_elapsed()
  632. functions.update_status_box(self, "elapsed", elapsed_time)
  633. # Only update if the state has changed
  634. #if gtime != previous_gtime:
  635. if gtime > 0:
  636. self.status_box.border_subtitle = "running"
  637. self.status_box.styles.border_subtitle_color = "#5E99AE"
  638. self.status_box.styles.border_subtitle_style = "bold"
  639.  
  640. length = functions.get_config_value("length")
  641. repeat = functions.get_config_value("repeat")
  642. delay = functions.get_config_value("delay")
  643. start_glitch(length, repeat, delay)
  644. else:
  645. self.status_box.border_subtitle = "stopped"
  646. self.status_box.styles.border_subtitle_color = "#B13840"
  647. self.status_box.styles.border_subtitle_style = "none"
  648. #previous_gtime = gtime # Update the previous state
  649.  
  650. except Exception as e:
  651. print(f"Update error: {e}")
  652. await asyncio.sleep(0.1)
  653.  
  654. def glitching_switch(value):
  655. switch = app_instance.query_one("#glitch-switch", Switch)
  656. switch.value = value # Force turn off
  657.  
  658. def run_output_high(gpio, time):
  659. s.io.add(gpio, 1, delay=time)
  660. s.io.upload()
  661. s.trigger()
  662.  
  663. def run_output_low(gpio, time):
  664. s.io.add(gpio, 0, delay=time)
  665. s.io.upload()
  666. s.trigger()
  667.  
  668. async def monitor_buffer(self):
  669. """Background task to monitor serial buffer for conditions"""
  670. debug = True
  671. buffer_size = functions.get_conditions_buffer_size(debug)
  672. functions.log_message("[CONDITIONS] Starting monitor")
  673. while self.run_serial:
  674. if not getattr(self, '_serial_connected', False):
  675. await asyncio.sleep(1)
  676. continue
  677. async with self.buffer_lock:
  678. current_buffer = self.serial_buffer
  679. max_keep = buffer_size * 3 # Keep enough buffer to catch split matches
  680. if len(current_buffer) > max_keep:
  681. # Keep last max_keep characters, but ensure we don't cut a potential match
  682. keep_from = len(current_buffer) - max_keep
  683. # Find the last newline before this position to avoid breaking lines
  684. safe_cut = current_buffer.rfind('\n', 0, keep_from)
  685. if safe_cut != -1:
  686. keep_from = safe_cut + 1
  687. self.serial_buffer = current_buffer[keep_from:]
  688. current_buffer = self.serial_buffer
  689. if debug:
  690. log_message(f"[DEBUG] Truncated buffer from {len(current_buffer)+keep_from} to {len(current_buffer)} chars")
  691. if current_buffer:
  692. action = functions.check_conditions(self, current_buffer, debug)
  693. if action:
  694. functions.log_message(f"[CONDITIONS] Triggering: {action}")
  695. success = functions.execute_condition_action(action, debug)
  696. if success:
  697. async with self.buffer_lock:
  698. # Clear the buffer after successful match
  699. self.serial_buffer = ""
  700. else:
  701. functions.log_message("[CONDITIONS] Action failed")
  702. await asyncio.sleep(0.1)
  703.  
  704. def clear_text():
  705. text_area.clear()
  706.  
  707. def end_program():
  708. exit()
Buy Me A Coffee