Newer
Older
glitch-o-bolt / glitch-o-bolt.py
  1. #!/usr/bin/env python3
  2. #
  3. # glitch-o-matic 2.0 - Optimized Version
  4. # Enhanced serial data performance while maintaining all existing features
  5. #
  6. # requirements: textual
  7. import os
  8. import sys
  9. import time
  10. import types
  11. import argparse
  12. import importlib.util
  13. import concurrent.futures
  14.  
  15. import asyncio
  16. import select
  17. import serial
  18. import functools
  19.  
  20. from scope import Scope
  21. from textual import events
  22. from textual.app import App, ComposeResult
  23. from textual.containers import Container, Vertical, Horizontal, Grid
  24. from textual.widgets import Static, DataTable, Input, Button, Switch, Log
  25. from textual.messages import Message
  26.  
  27. # Define specific names for each control row
  28. control_names = ["length", "repeat", "delay"]
  29.  
  30. def load_config(path):
  31. spec = importlib.util.spec_from_file_location("dynamic_config", path)
  32. module = importlib.util.module_from_spec(spec)
  33. spec.loader.exec_module(module)
  34. # Inject the loaded config as 'config'
  35. sys.modules['config'] = module
  36.  
  37. class PersistentInput(Input):
  38. PREFIX = "$> " # The permanent prefix
  39.  
  40. def __init__(self, **kwargs):
  41. super().__init__(**kwargs)
  42. self.value = self.PREFIX # Set initial value
  43.  
  44. def on_input_changed(self, event: Input.Changed) -> None:
  45. """Ensure the prefix is always present."""
  46. if not event.value.startswith(self.PREFIX):
  47. self.value = self.PREFIX # Restore the prefix
  48. elif len(event.value) < len(self.PREFIX):
  49. self.value = self.PREFIX # Prevent deleting
  50.  
  51. def set_app_instance(app):
  52. functions.app_instance = app
  53.  
  54. class SerialDataMessage(Message):
  55. def __init__(self, data: str):
  56. super().__init__() # Call the parent class constructor
  57. self.data = data # Store the serial data
  58.  
  59. class LayoutApp(App):
  60. CSS_PATH = "style.tcss"
  61.  
  62. async def on_ready(self) -> None:
  63. #set_app_instance(self)
  64. self.run_serial = True
  65. self.serial_buffer = "" # Add buffer storage
  66. self.buffer_lock = asyncio.Lock() # Add thread-safe lock
  67. try:
  68. functions.s = Scope()
  69. except IOError:
  70. s = None
  71. print("Warning: Scope not connected, running in simulation mode")
  72. # Start both serial tasks
  73. asyncio.create_task(self.connect_serial())
  74. asyncio.create_task(functions.monitor_buffer(self))
  75. asyncio.create_task(functions.glitch(self))
  76. functions.log_message("[DEBUG] Serial tasks created")
  77.  
  78. async def connect_serial(self):
  79. """Stable serial connection with proper error handling"""
  80. switch_uart = self.query_one("#uart_switch")
  81. while self.run_serial:
  82. if switch_uart.value:
  83. if not getattr(self, '_serial_connected', False):
  84. try:
  85. # Close existing connection if any
  86. if hasattr(self, 'serial_connection') and self.serial_connection:
  87. self.serial_connection.close()
  88. # Establish new connection
  89. self.serial_connection = functions.start_serial()
  90. if self.serial_connection and self.serial_connection.is_open:
  91. # Configure for reliable operation
  92. self.serial_connection.timeout = 0.5
  93. self.serial_connection.write_timeout = 1.0
  94. self._serial_connected = True
  95. functions.log_message("[SERIAL] Connected successfully")
  96. asyncio.create_task(self.read_serial_loop())
  97. else:
  98. raise serial.SerialException("Connection failed")
  99. except Exception as e:
  100. self._serial_connected = False
  101. functions.log_message(f"[SERIAL] Connection error: {str(e)}")
  102. switch_uart.value = False
  103. await asyncio.sleep(2) # Wait before retrying
  104. else:
  105. if getattr(self, '_serial_connected', False):
  106. if hasattr(self, 'serial_connection') and self.serial_connection:
  107. self.serial_connection.close()
  108. self._serial_connected = False
  109. functions.log_message("[SERIAL] Disconnected")
  110. await asyncio.sleep(1) # Check connection status periodically
  111.  
  112. async def read_serial_loop(self):
  113. """Serial reading that perfectly preserves original line endings"""
  114. buffer = ""
  115. while self.run_serial and getattr(self, '_serial_connected', False):
  116. try:
  117. # Read available data (minimum 1 byte)
  118. data = await asyncio.get_event_loop().run_in_executor(
  119. None,
  120. lambda: self.serial_connection.read(max(1, self.serial_connection.in_waiting))
  121. )
  122. if data:
  123. decoded = data.decode('utf-8', errors='ignore')
  124. # Store raw data in condition monitoring buffer
  125. async with self.buffer_lock:
  126. self.serial_buffer += decoded
  127. # Original character processing
  128. for char in decoded:
  129. if char == '\r':
  130. continue
  131. buffer += char
  132. if char == '\n':
  133. self.post_message(SerialDataMessage(buffer))
  134. buffer = ""
  135. if buffer:
  136. self.post_message(SerialDataMessage(buffer))
  137. buffer = ""
  138. await asyncio.sleep(0.01)
  139. except serial.SerialException as e:
  140. functions.log_message(f"[SERIAL] Read error: {str(e)}")
  141. self._serial_connected = False
  142. break
  143. except Exception as e:
  144. functions.log_message(f"[SERIAL] Unexpected error: {str(e)}")
  145. await asyncio.sleep(0.1)
  146.  
  147. async def monitor_conditions(self):
  148. """Background task to monitor serial buffer for conditions with debug"""
  149. debug = functions.DEBUG_MODE # Set to False to disable debug logging after testing
  150. buffer_size = functions.get_conditions_buffer_size(debug)
  151. if debug:
  152. functions.log_message("[DEBUG] Starting condition monitor")
  153. functions.log_message(f"[DEBUG] Initial buffer size: {buffer_size}")
  154. functions.log_message(f"[DEBUG] Current conditions: {config.conditions}")
  155. while self.run_serial:
  156. if hasattr(self, '_serial_connected') and self._serial_connected:
  157. # Get a snapshot of the buffer contents
  158. async with self.buffer_lock:
  159. current_buffer = self.serial_buffer
  160. if debug and current_buffer:
  161. functions.log_message(f"[DEBUG] Current buffer length: {len(current_buffer)}")
  162. # Keep reasonable buffer size
  163. if len(current_buffer) > buffer_size * 2:
  164. self.serial_buffer = current_buffer = current_buffer[-buffer_size*2:]
  165. if debug:
  166. functions.log_message(f"[DEBUG] Trimmed buffer to {len(current_buffer)} chars")
  167. # Check for conditions
  168. action = functions.check_conditions(self, current_buffer, debug)
  169. if action:
  170. if debug:
  171. functions.log_message(f"[DEBUG] Executing action: {action}")
  172. functions.execute_condition_action(action, debug)
  173. elif debug and current_buffer:
  174. functions.log_message("[DEBUG] No action triggered")
  175. await asyncio.sleep(0.1) # Check 10 times per secon
  176.  
  177. async def on_key(self, event: events.Key) -> None:
  178. """Handles input with proper newline preservation"""
  179. if event.key == "enter" and self.input_field.has_focus:
  180. text_to_send = self.input_field.value
  181. # Preserve exact input (don't strip) but ensure newline
  182. if not text_to_send.endswith('\n'):
  183. text_to_send += '\n'
  184. # Check if serial_connection exists and is open
  185. serial_connection = getattr(self, 'serial_connection', None)
  186. if serial_connection is not None and serial_connection.is_open:
  187. try:
  188. # Send raw bytes exactly as entered
  189. await asyncio.get_event_loop().run_in_executor(
  190. None,
  191. lambda: serial_connection.write(text_to_send.encode('utf-8'))
  192. )
  193. # Echo to console with prefix
  194. display_text = f"> {text_to_send.rstrip()}"
  195. functions.add_text(display_text)
  196. except Exception as e:
  197. functions.log_message(f"[UART TX ERROR] {str(e)}")
  198. functions.add_text(">> Failed to send")
  199. else:
  200. functions.add_text(">> Not sent - UART disconnected")
  201. self.input_field.value = ""
  202. event.prevent_default()
  203.  
  204. async def on_serial_data_message(self, message: SerialDataMessage) -> None:
  205. """Display serial data exactly as received"""
  206. if hasattr(functions, 'text_area'):
  207. # Write the data exactly as it should appear
  208. functions.text_area.write(message.data)
  209.  
  210. log_time = functions.get_config_value("log_time")
  211. if log_time > 0:
  212. functions.write_to_log(message.data, log_time)
  213. #functions.add_text(message.data)
  214. functions.text_area.scroll_end()
  215.  
  216. def read_from_serial_sync(self):
  217. """Synchronous line reading with proper timeout handling"""
  218. if not self.serial_connection:
  219. return b""
  220. try:
  221. # Read with timeout
  222. ready, _, _ = select.select([self.serial_connection], [], [], 0.01)
  223. if ready:
  224. # Read until newline or buffer limit
  225. return self.serial_connection.read_until(b'\n', size=4096)
  226. return b""
  227. except Exception as e:
  228. functions.log_message(f"[ERROR] Serial read error: {str(e)}")
  229. return b""
  230.  
  231. def on_button_pressed(self, event: Button.Pressed) -> None:
  232. button_id_parts = event.button.name
  233. parts = button_id_parts.split("-")
  234. button_id = parts[0]
  235. if(button_id == "exit_button"):
  236. functions.end_program()
  237. if(button_id == "clear_button"):
  238. functions.clear_text()
  239. if(button_id == "btn_glitch"):
  240. functions.launch_glitch()
  241. if(button_id == "save_config"):
  242. functions.save_config(self)
  243. if(button_id == "toggle_trigger"):
  244. functions.toggle_trigger(self, int(parts[1]))
  245. if(button_id == "change_val"):
  246. functions.on_button_pressed(self, event)
  247. if(button_id == "save_val"):
  248. functions.on_save_button_pressed(self, event)
  249. if(button_id == "custom_function"):
  250. functions.run_custom_function(self, event)
  251. if(button_id == "save_uart"):
  252. functions.save_uart_settings(self, event)
  253.  
  254. def on_switch_changed(self, event: Switch.Changed) -> None:
  255. """Handle switch toggle events"""
  256. switch = event.switch
  257. # Only handle switches with our specific class
  258. if "trigger-switch" in switch.classes:
  259. try:
  260. # Extract index from switch ID
  261. index = int(switch.id.split("_")[-1])
  262. new_state = bool(event.value) # Ensure boolean
  263. config.triggers[index][1] = new_state # Update config
  264. functions.set_triggers()
  265. except (ValueError, IndexError, AttributeError) as e:
  266. if functions.DEBUG_MODE:
  267. functions.log_message(f"[ERROR] Failed to process trigger switch: {str(e)}")
  268.  
  269. if "condition-switch" in switch.classes:
  270. try:
  271. # Extract index from switch ID
  272. index = int(switch.id.split("_")[-1])
  273. new_state = bool(event.value) # Ensure boolean
  274. # Update config
  275. config.conditions[index][1] = new_state
  276. if functions.DEBUG_MODE:
  277. functions.log_message(f"[CONDITION] Updated switch {index} to {new_state}")
  278. functions.log_message(f"[CONDITION] Current states: {[cond[1] for cond in config.conditions]}")
  279. except (ValueError, IndexError, AttributeError) as e:
  280. if functions.DEBUG_MODE:
  281. functions.log_message(f"[ERROR] Failed to process condition switch: {str(e)}")
  282.  
  283. if "logging-switch" in switch.classes:
  284. if functions.DEBUG_MODE:
  285. curr_time = functions.get_config_value("log_time")
  286. functions.log_message(f"[FUNCTION] logging toggled: {curr_time}")
  287. if bool(event.value) is True:
  288. functions.set_log_time(int(time.time())) # Uses the 'time' module
  289. else:
  290. functions.set_log_time(0)
  291. main_content = self.query_one("#main_content")
  292. log_time = functions.get_config_value("log_time") # Renamed to avoid conflict
  293. port = str(functions.get_config_value("serial_port"))
  294. baud = str(functions.get_config_value("baud_rate"))
  295.  
  296. if log_time == 0: # Now using 'log_time' instead of 'time'
  297. main_content.border_title = f"{port} {baud}"
  298. else:
  299. main_content.border_title = f"{port} {baud} \\[{log_time}.log]"
  300.  
  301. if "glitch-switch" in switch.classes:
  302. if functions.DEBUG_MODE:
  303. curr_time = functions.get_config_value("glitch_time")
  304. functions.log_message(f"[FUNCTION] glitching toggled: {curr_time}")
  305. if bool(event.value) is True:
  306. functions.set_glitch_time(int(time.time())) # Uses the 'time' module
  307. else:
  308. functions.set_glitch_time(0)
  309.  
  310. def compose(self) -> ComposeResult:
  311. with Vertical(classes="top_section"):
  312. # Use Vertical here instead of Horizontal
  313. with Vertical(classes="top_left"):
  314. # UART Box - appears second (below)
  315. with Vertical(classes="uart_box") as uart_box:
  316. uart_box.border_title = "uart settings"
  317.  
  318. with Horizontal(classes="onerow"):
  319. yield Static("port:", classes="uart_label")
  320. yield Input(
  321. classes="control_input",
  322. id="uart_port_input",
  323. name="uart_port_input",
  324. value=str(functions.get_config_value("serial_port"))
  325. )
  326.  
  327. with Horizontal(classes="onerow"):
  328. yield Static("baud:", classes="uart_label")
  329. yield Input(
  330. classes="control_input",
  331. id="baud_rate_input",
  332. name="baud_rate_input",
  333. value=str(functions.get_config_value("baud_rate"))
  334. )
  335. yield Button("save", classes="btn_save", id="save_uart", name="save_uart")
  336.  
  337. # Config Box - appears first (on top)
  338. with Vertical(classes="config_box") as config_box:
  339. config_box.border_title = "config"
  340.  
  341. with Horizontal(classes="onerow"):
  342. yield Static("file:", classes="uart_label")
  343. yield Input(
  344. classes="control_input",
  345. id="config_file_input",
  346. name="config_file_input",
  347. value=str(functions.get_config_value("conFile"))
  348. )
  349. with Horizontal(classes="onerow"):
  350. yield Button("save", classes="btn_save", id="save_config", name="save_config")
  351.  
  352. yield Static("glitch-o-bolt v2.0", classes="program_name")
  353. yield Static(" ") # Show blank space
  354. for name in control_names:
  355. with Horizontal(classes="control_row"):
  356. yield Static(f"{name}:", classes="control_label")
  357. for amount in [-100, -10, -1]:
  358. yield Button(str(amount), classes=f"btn btn{amount}", name=f"change_val-{name}_{amount}")
  359.  
  360. yield Input(
  361. classes="control_input",
  362. value=str(functions.get_config_value(name)),
  363. type="integer",
  364. id=f"{name}_input" # Use `id` instead of `name`
  365. )
  366. yield Button("save", classes="btn_save", name=f"save_val-{name}_save")
  367.  
  368. for amount in [1, 10, 100]:
  369. yield Button(f"+{amount}", classes=f"btn btn-{amount}", name=f"change_val-{name}_{amount}")
  370.  
  371. with Horizontal(classes="top_right"):
  372. with Vertical(classes="switch_box") as switch_box:
  373. #yield Static("glitch", classes="switch_title")
  374. yield Button("glitch", classes="btn_glitch", name=f"btn_glitch")
  375. yield Switch(classes="glitch-switch", id="glitch-switch", animate=False)
  376.  
  377. # Create and store DataTable for later updates
  378. self.status_box = DataTable(classes="top_box", name="status_box")
  379. self.status_box.border_title = "status"
  380. self.status_box.border_subtitle = "stopped"
  381. self.status_box.styles.border_subtitle_color = "#B13840"
  382. self.status_box.show_header = False
  383. self.status_box.show_cursor = False
  384.  
  385. self.status_box.add_columns("Attribute", "Value")
  386.  
  387. # Add rows for config values
  388. self.status_box.add_row(" length: ", str(functions.get_config_value("length")), key="row1")
  389. self.status_box.add_row(" repeat: ", str(functions.get_config_value("repeat")), key="row2")
  390. self.status_box.add_row(" delay: ", str(functions.get_config_value("delay")), key="row3")
  391. self.status_box.add_row("elapsed: ", str(functions.get_glitch_elapsed()), key="row4")
  392.  
  393. yield self.status_box # Yield the stored DataTable
  394. with Horizontal(classes="main_section"):
  395. with Vertical(classes="left_sidebar"):
  396. sidebar_content = Vertical(classes="sidebar_triggers_content")
  397. sidebar_content.border_title = "triggers"
  398. with sidebar_content:
  399. with Grid(classes="sidebar_triggers"):
  400. # Add rows with switches
  401. functions.ensure_triggers_exist()
  402. for i in range(8):
  403. yield Static(f"{i} -")
  404. yield Static(f"{functions.get_trigger_string(i)}", id=f"trigger_symbol_{i}", classes="sidebar_trigger_string")
  405. yield Switch(
  406. classes="trigger-switch sidebar_trigger_switch",
  407. value=functions.get_trigger_value(i),
  408. animate=False,
  409. id=f"trigger_switch_{i}"
  410. )
  411. yield Button("^v-", classes="btn_toggle_1", name=f"toggle_trigger-{i}")
  412.  
  413. if hasattr(config, "conditions") and config.conditions:
  414. sidebar_content2 = Vertical(classes="sidebar_conditions_content")
  415. sidebar_content2.border_title = "conditions"
  416. sidebar_content2.styles.height = len(config.conditions) + 1
  417.  
  418. with sidebar_content2:
  419. with Grid(classes="sidebar_conditions"):
  420. for i in range(len(config.conditions)):
  421. yield Static(f"{functions.get_condition_string(i)[:5]} ")
  422. if config.conditions[i][2] != "":
  423. yield Switch(
  424. id=f"condition_switch_{i}",
  425. classes="condition-switch sidebar_trigger_switch", # Added specific class
  426. value=functions.get_condition_value(i),
  427. animate=False
  428. )
  429. else:
  430. yield Static(" ")
  431. yield Button("run", classes="btn_toggle_1", name=f"custom_function-{i}")
  432. sidebar_content3 = Vertical(classes="sidebar_settings_content")
  433. sidebar_content3.border_title = "misc"
  434. with sidebar_content3:
  435. with Grid(classes="sidebar_settings_switches"):
  436. # Add rows with switches
  437. yield Static(f"uart")
  438. yield Switch(classes="sidebar_trigger_switch", value=False, animate=False, id="uart_switch")
  439.  
  440. yield Static(f"logging")
  441. yield Switch(classes="logging-switch sidebar_trigger_switch", value=False, animate=False)
  442.  
  443. # Centre the exit button
  444. with Vertical(classes="centre_settings_buttons"):
  445. yield Button("clear main", classes="btn_settings", name="clear_button")
  446. with Vertical(classes="centre_settings_buttons"):
  447. yield Button("exit", classes="btn_settings", name="exit_button")
  448.  
  449.  
  450. global text_area # Use global reference
  451. with Vertical(id="main_content", classes="main_content") as main_content:
  452. port = str(functions.get_config_value("serial_port"))
  453. baud = str(functions.get_config_value("baud_rate"))
  454.  
  455. if functions.get_config_value("log_time") == 0:
  456. main_content.border_title = f"{port} {baud}"
  457. else:
  458. time = str(functions.get_config_value("log_time"))
  459. main_content.border_title = f"{port} {baud} \\[{time}.log]"
  460.  
  461. # Use Log() widget instead of TextArea for scrollable content
  462. functions.text_area = Log(classes="scrollable_log")
  463. yield functions.text_area # Make it accessible later
  464. with Horizontal(classes="input_container") as input_row:
  465. yield Static("$> ", classes="input_prompt")
  466. self.input_field = Input(classes="input_area", placeholder="send to uart", id="command_input" ) # Store reference
  467. yield self.input_field
  468.  
  469. if __name__ == "__main__":
  470. parser = argparse.ArgumentParser()
  471. parser.add_argument("-c", "--config", default="config.py", help="Path to config file")
  472. args = parser.parse_args()
  473.  
  474. if not os.path.exists(args.config):
  475. print(f"Config file '{args.config}' not found. Creating an empty one...")
  476. with open(args.config, "w") as f:
  477. pass # Creates a blank file
  478.  
  479. load_config(args.config)
  480. import config
  481. import functions
  482. config.CONFILE = args.config
  483. functions.set_config(config)
  484.  
  485. app = LayoutApp()
  486. set_app_instance(app) # Pass the app instance to config
  487. app.run()
Buy Me A Coffee