- import platform
- import serial
- import time
- import serial.tools.list_ports
- import threading
-
- from .ConfigBoard import BoardStatus
-
- if platform.system() == "Windows":
- DEFAULT_COMPORT = "COM1"
- else:
- DEFAULT_COMPORT = "/dev/ttyACM0"
-
- DEFAULT_SERIAL_BAUDRATE = 921600
-
- class UART(threading.Thread):
- def __init__(self, serial_port: str = DEFAULT_COMPORT):
- self.serial_worker = serial.Serial()
- self.serial_worker.port = serial_port
- self.serial_worker.baudrate = DEFAULT_SERIAL_BAUDRATE
- self.recv_cancel = False
- #self.daemon = True
-
- def __del__(self):
- self.serial_worker.close()
-
- def __str__(self):
- return f"Serial port: {self.serial_worker.port}"
-
- def set_serial_port(self, serial_port: str):
- self.serial_worker.port = serial_port
-
- def set_serial_baudrate(self, serial_baudrate: int):
- self.serial_worker.baudrate = serial_baudrate
-
- def is_valid_connection(self) -> bool:
- try:
- self.open()
- self.close()
- return True
- except serial.SerialException as e:
- return False
-
- def get_serial_ports(self):
- return serial.tools.list_ports.comports()
-
- def reset_buffer(self):
- self.serial_worker.reset_input_buffer()
- self.serial_worker.reset_output_buffer()
-
- def cancel_recv(self):
- self.recv_cancel = True
-
- def open(self):
- self.serial_worker.open()
- self.reset_buffer()
-
- def close(self):
- self.reset_buffer()
- self.serial_worker.close()
-
- def is_connected(self):
- return self.serial_worker.is_open
-
- def send(self, data):
- self.serial_worker.write(data)
- self.serial_worker.write(b"\n\r")
- self.serial_worker.flush()
-
- def recv(self):
- if not self.is_connected():
- self.open()
- try:
- while not self.recv_cancel:
- time.sleep(0.1)
-
- bytestream = self.serial_worker.readline()
- if self.recv_cancel:
- self.recv_cancel = False
- return None
- return bytestream
- except serial.SerialException as e:
- print(e)
- return None
- except KeyboardInterrupt:
- self.recv_cancel = True
- return None
-
- def send_recv(self, data):
- self.send(data)
- return self.recv()
-
- def stop_worker(self):
- self.recv_cancel = True
- self.reset_buffer()
- self.close()
- self.join()