import platform
import serial
import time
import serial.tools.list_ports
import threading
from .ConfigBoard import BoardStatus
if platform.system() == "Windows":
DEFAULT_COMPORT = "COM1"
else:
DEFAULT_COMPORT = "/dev/ttyACM0"
DEFAULT_SERIAL_BAUDRATE = 921600
class UART(threading.Thread):
def __init__(self, serial_port: str = DEFAULT_COMPORT):
self.serial_worker = serial.Serial()
self.serial_worker.port = serial_port
self.serial_worker.baudrate = DEFAULT_SERIAL_BAUDRATE
self.recv_cancel = False
#self.daemon = True
def __del__(self):
self.serial_worker.close()
def __str__(self):
return f"Serial port: {self.serial_worker.port}"
def set_serial_port(self, serial_port: str):
self.serial_worker.port = serial_port
def set_serial_baudrate(self, serial_baudrate: int):
self.serial_worker.baudrate = serial_baudrate
def is_valid_connection(self) -> bool:
try:
self.open()
self.close()
return True
except serial.SerialException as e:
return False
def get_serial_ports(self):
return serial.tools.list_ports.comports()
def reset_buffer(self):
self.serial_worker.reset_input_buffer()
self.serial_worker.reset_output_buffer()
def cancel_recv(self):
self.recv_cancel = True
def open(self):
self.serial_worker.open()
self.reset_buffer()
def close(self):
self.reset_buffer()
self.serial_worker.close()
def is_connected(self):
return self.serial_worker.is_open
def send(self, data):
self.serial_worker.write(data)
self.serial_worker.write(b"\n\r")
self.serial_worker.flush()
def recv(self):
if not self.is_connected():
self.open()
try:
while not self.recv_cancel:
time.sleep(0.1)
bytestream = self.serial_worker.readline()
if self.recv_cancel:
self.recv_cancel = False
return None
return bytestream
except serial.SerialException as e:
print(e)
return None
except KeyboardInterrupt:
self.recv_cancel = True
return None
def send_recv(self, data):
self.send(data)
return self.recv()
def stop_worker(self):
self.recv_cancel = True
self.reset_buffer()
self.close()
self.join()