Newer
Older
Hardware / FaultInjection / prereqs / FaultyCat / Modules / UART.py
0xRoM on 11 Feb 2 KB initial commit
  1. import platform
  2. import serial
  3. import time
  4. import serial.tools.list_ports
  5. import threading
  6.  
  7. from .ConfigBoard import BoardStatus
  8.  
  9. if platform.system() == "Windows":
  10. DEFAULT_COMPORT = "COM1"
  11. else:
  12. DEFAULT_COMPORT = "/dev/ttyACM0"
  13.  
  14. DEFAULT_SERIAL_BAUDRATE = 921600
  15.  
  16. class UART(threading.Thread):
  17. def __init__(self, serial_port: str = DEFAULT_COMPORT):
  18. self.serial_worker = serial.Serial()
  19. self.serial_worker.port = serial_port
  20. self.serial_worker.baudrate = DEFAULT_SERIAL_BAUDRATE
  21. self.recv_cancel = False
  22. #self.daemon = True
  23.  
  24. def __del__(self):
  25. self.serial_worker.close()
  26.  
  27. def __str__(self):
  28. return f"Serial port: {self.serial_worker.port}"
  29.  
  30. def set_serial_port(self, serial_port: str):
  31. self.serial_worker.port = serial_port
  32. def set_serial_baudrate(self, serial_baudrate: int):
  33. self.serial_worker.baudrate = serial_baudrate
  34.  
  35. def is_valid_connection(self) -> bool:
  36. try:
  37. self.open()
  38. self.close()
  39. return True
  40. except serial.SerialException as e:
  41. return False
  42.  
  43. def get_serial_ports(self):
  44. return serial.tools.list_ports.comports()
  45.  
  46. def reset_buffer(self):
  47. self.serial_worker.reset_input_buffer()
  48. self.serial_worker.reset_output_buffer()
  49.  
  50. def cancel_recv(self):
  51. self.recv_cancel = True
  52.  
  53. def open(self):
  54. self.serial_worker.open()
  55. self.reset_buffer()
  56. def close(self):
  57. self.reset_buffer()
  58. self.serial_worker.close()
  59.  
  60. def is_connected(self):
  61. return self.serial_worker.is_open
  62.  
  63. def send(self, data):
  64. self.serial_worker.write(data)
  65. self.serial_worker.write(b"\n\r")
  66. self.serial_worker.flush()
  67. def recv(self):
  68. if not self.is_connected():
  69. self.open()
  70. try:
  71. while not self.recv_cancel:
  72. time.sleep(0.1)
  73. bytestream = self.serial_worker.readline()
  74. if self.recv_cancel:
  75. self.recv_cancel = False
  76. return None
  77. return bytestream
  78. except serial.SerialException as e:
  79. print(e)
  80. return None
  81. except KeyboardInterrupt:
  82. self.recv_cancel = True
  83. return None
  84. def send_recv(self, data):
  85. self.send(data)
  86. return self.recv()
  87.  
  88. def stop_worker(self):
  89. self.recv_cancel = True
  90. self.reset_buffer()
  91. self.close()
  92. self.join()
Buy Me A Coffee