Newer
Older
12Sec_CTF_v1 / docs / arduinIO.py
root 13 days ago 7 KB solution writeups
"""
=====================================================================
ARDUINO SERIAL CONTROLLER CLASS (PYTHON)
=====================================================================
Version: 1.3.0
Author: [Your Name]

DESCRIPTION
---------------------------------------------------------------------
This module provides a class-based interface to communicate with
the Arduino Serial Pin Control and Monitoring Firmware v1.3.0.

It supports all implemented serial commands including:
- Version retrieval
- Dynamic pin mode configuration
- Pin map querying
- Output control (default state, timed pulse)
- Input monitoring and state reading
- Duration measurement and blocking wait

=====================================================================
DEPENDENCIES
---------------------------------------------------------------------
- pyserial (install with: pip install pyserial)

=====================================================================
EXAMPLE USAGE
---------------------------------------------------------------------
from arduino_controller import ArduinoController
import time

# Create controller instance
arduino = ArduinoController(port="COM3", baudrate=115200)

# Connect and initialise
arduino.connect()
print("Version:", arduino.get_version())

# Configure pins
arduino.set_mode(2, "INPUT")
arduino.set_mode(8, "OUTPUT")
arduino.set_mode(9, "OUTPUT")
arduino.set_mode(10, "OUTPUT")
arduino.set_mode(11, "OUTPUT")

print("Pin map:", arduino.get_pinmap())

# Set outputs low, pulse one pin, monitor input
arduino.set_default(8, "LOW")
arduino.set_default(9, "LOW")
arduino.set_default(10, "LOW")
arduino.set_default(11, "LOW")

arduino.set_for(8, "HIGH", 300)
arduino.watch(2)

# Read input state and duration
state = arduino.get_state(2)
duration = arduino.get_duration(2)
print(f"Pin 2 state: {state}, duration since change: {duration} ms")

# Wait for a state change to HIGH
arduino.wait_for(2, "HIGH")

# Disconnect
arduino.disconnect()
=====================================================================
"""

import serial
import time


class ArduinoController:
    """Provides a structured interface for Arduino serial control."""

    def __init__(self, port, baudrate=115200, timeout=1.0):
        self.port = port
        self.baudrate = baudrate
        self.timeout = timeout
        self.ser = None

    # -------------------------------------------------------------
    # Connection management
    # -------------------------------------------------------------
    def connect(self):
        """Establish serial connection and wait for READY signal."""
        self.ser = serial.Serial(self.port, self.baudrate, timeout=self.timeout)
        time.sleep(2)  # Allow Arduino reset
        while True:
            line = self._read_line()
            if line == "READY":
                break

    def disconnect(self):
        """Close serial connection cleanly."""
        if self.ser and self.ser.is_open:
            self.ser.close()

    # -------------------------------------------------------------
    # Internal communication utilities
    # -------------------------------------------------------------
    def _send_command(self, command, expect_response=True):
        """Send a command string to the Arduino."""
        if not self.ser or not self.ser.is_open:
            raise ConnectionError("Serial connection not established.")
        self.ser.write((command + "\n").encode("utf-8"))
        if expect_response:
            return self._read_line()
        return None

    def _read_line(self):
        """Read a single line from serial and strip whitespace."""
        line = self.ser.readline().decode("utf-8").strip()
        return line

    # -------------------------------------------------------------
    # Core command wrappers
    # -------------------------------------------------------------
    def get_version(self):
        """Return firmware version."""
        return self._send_command("GET_VERSION")

    def set_mode(self, pin, mode):
        """Configure a pin as INPUT or OUTPUT."""
        return self._send_command(f"SET_MODE:{pin}:{mode}")

    def get_pinmap(self):
        """Return current pin assignments."""
        return self._send_command("GET_PINMAP")

    def set_default(self, pin, state):
        """Set an output pin to a default HIGH or LOW state."""
        return self._send_command(f"SET_DEFAULT:{pin}:{state}")

    def set_for(self, pin, state, duration_ms):
        """Set an output pin state for a specific duration (ms)."""
        return self._send_command(f"SET_FOR:{pin}:{state}:{duration_ms}")

    def watch(self, pin):
        """Start watching an input pin for state changes."""
        return self._send_command(f"WATCH:{pin}")

    def watch_pin(self, pin, duration_ms=2000):
    """
    Activates WATCH mode for a pin and collects CHANGE events for a period.

    Args:
        pin (int): Input pin number to watch.
        duration_ms (int): How long to listen for events.

    Returns:
        list of dict: [{'pin': int, 'state': str, 'duration_ms': int}, ...]
    """
    # Use the existing _send_command() method
    self._send_command(f"WATCH:{pin}", expect_response=False)
    messages = []
    start = time.time()

    while (time.time() - start) * 1000 < duration_ms:
        if self.ser.in_waiting:
            line = self._read_line()
            if line.startswith("CHANGE:"):
                # Example: CHANGE:2:HIGH:1421
                parts = line.split(":")
                if len(parts) == 4:
                    try:
                        messages.append({
                            'pin': int(parts[1]),
                            'state': parts[2],
                            'duration_ms': int(parts[3])
                        })
                    except ValueError:
                        pass
        time.sleep(0.005)

    return messages


    def get_state(self, pin):
        """Return current state (HIGH or LOW) of a pin."""
        response = self._send_command(f"GET_STATE:{pin}")
        if response.startswith("STATE:"):
            return response.split(":")[2]
        return response

    def get_duration(self, pin):
        """Return duration since last state change."""
        response = self._send_command(f"GET_DURATION:{pin}")
        if response.startswith("DURATION:"):
            return int(response.split(":")[2])
        return response

    def wait_for(self, pin, state):
        """Block until specified pin reaches given state."""
        response = self._send_command(f"WAIT_FOR:{pin}:{state}")
        if response.startswith("WAIT_RESULT:"):
            _, pin_str, final_state, duration = response.split(":")
            return {"pin": int(pin_str), "state": final_state, "duration_ms": int(duration)}
        return response

    # -------------------------------------------------------------
    # Continuous monitoring
    # -------------------------------------------------------------
    def monitor(self, callback=None):
        """
        Continuously read from serial and process change notifications.
        Optionally pass a callback(line) for custom handling.
        """
        print("[INFO] Monitoring started. Press Ctrl+C to stop.")
        try:
            while True:
                line = self._read_line()
                if line:
                    if callback:
                        callback(line)
                    else:
                        print(line)
        except KeyboardInterrupt:
            print("\n[INFO] Monitoring stopped.")

    # -------------------------------------------------------------
    # Utility
    # -------------------------------------------------------------
    def flush(self):
        """Flush serial input buffer."""
        if self.ser:
            self.ser.reset_input_buffer()