Newer
Older
Hardware / FaultInjection / prereqs / FaultyCat / Modules / Worker.py
0xRoM on 11 Feb 2 KB initial commit
  1. import threading
  2. import time
  3. import typer
  4. from rich.console import Console
  5. from rich.table import Table
  6.  
  7. from .UART import UART
  8. from .ConfigBoard import ConfigBoard
  9.  
  10. class FaultyWorker(threading.Thread):
  11. def __init__(self):
  12. super().__init__()
  13. #self.daemon = True
  14. self.workers = []
  15. self.board_uart = UART()
  16. self.board_configurator = ConfigBoard()
  17. self.pulse_count = self.board_configurator.BOARD_CONFIG["pulse_count"]
  18. self.pulse_time = self.board_configurator.BOARD_CONFIG["pulse_time"]
  19. def add_worker(self, worker):
  20. self.workers.append(worker)
  21. def stop_workers(self):
  22. for worker in self.workers:
  23. worker.join()
  24. def run_workers(self):
  25. for worker in self.workers:
  26. worker.start()
  27. def set_serial_port(self, serial_port):
  28. self.board_uart.set_serial_port(serial_port)
  29. def validate_serial_connection(self):
  30. return self.board_uart.is_valid_connection()
  31. def set_pulse_count(self, pulse_count):
  32. self.pulse_count = pulse_count
  33. self.board_configurator.BOARD_CONFIG["pulse_count"] = pulse_count
  34. def set_pulse_time(self, pulse_time):
  35. self.pulse_time = pulse_time
  36. self.board_configurator.BOARD_CONFIG["pulse_time"] = pulse_time
  37. def start_faulty_attack(self):
  38. try:
  39. self.board_uart.open()
  40. time.sleep(0.1)
  41. typer.secho("Board connected.", fg=typer.colors.GREEN)
  42. typer.secho("[*] ARMING BOARD, BE CAREFULL!", fg=typer.colors.BRIGHT_YELLOW)
  43. self.board_uart.send(self.board_configurator.board_commands.COMMAND_DISARM.value.encode("utf-8"))
  44. time.sleep(1)
  45. self.board_uart.send(self.board_configurator.board_commands.COMMAND_ARM.value.encode("utf-8"))
  46. typer.secho("[*] ARMED BOARD.", fg=typer.colors.BRIGHT_GREEN)
  47. time.sleep(1)
  48. typer.secho(f"[*] SENDING {self.pulse_count} PULSES.", fg=typer.colors.BRIGHT_GREEN)
  49. for i in range(self.pulse_count):
  50. typer.secho(f"\t- SENDING PULSE {i+1} OF {self.pulse_count}.", fg=typer.colors.BRIGHT_GREEN)
  51. self.board_uart.send(self.board_configurator.board_commands.COMMAND_PULSE.value.encode("utf-8"))
  52. time.sleep(self.pulse_time)
  53. typer.secho("DISARMING BOARD.", fg=typer.colors.BRIGHT_YELLOW)
  54. self.board_uart.send(self.board_configurator.board_commands.COMMAND_DISARM.value.encode("utf-8"))
  55. self.board_uart.close()
  56. typer.secho("BOARD DISARMING.", fg=typer.colors.BRIGHT_YELLOW)
  57. except Exception as e:
  58. typer.secho(f"Error: {e}", fg=typer.colors.BRIGHT_RED)
Buy Me A Coffee