"""
=====================================================================
ARDUINO SERIAL CONTROLLER CLASS (PYTHON)
=====================================================================
Version: 1.3.0
Author: [Your Name]
DESCRIPTION
---------------------------------------------------------------------
This module provides a class-based interface to communicate with
the Arduino Serial Pin Control and Monitoring Firmware v1.3.0.
It supports all implemented serial commands including:
- Version retrieval
- Dynamic pin mode configuration
- Pin map querying
- Output control (default state, timed pulse)
- Input monitoring and state reading
- Duration measurement and blocking wait
=====================================================================
DEPENDENCIES
---------------------------------------------------------------------
- pyserial (install with: pip install pyserial)
=====================================================================
EXAMPLE USAGE
---------------------------------------------------------------------
from arduino_controller import ArduinoController
import time
# Create controller instance
arduino = ArduinoController(port="COM3", baudrate=115200)
# Connect and initialise
arduino.connect()
print("Version:", arduino.get_version())
# Configure pins
arduino.set_mode(2, "INPUT")
arduino.set_mode(8, "OUTPUT")
arduino.set_mode(9, "OUTPUT")
arduino.set_mode(10, "OUTPUT")
arduino.set_mode(11, "OUTPUT")
print("Pin map:", arduino.get_pinmap())
# Set outputs low, pulse one pin, monitor input
arduino.set_default(8, "LOW")
arduino.set_default(9, "LOW")
arduino.set_default(10, "LOW")
arduino.set_default(11, "LOW")
arduino.set_for(8, "HIGH", 300)
arduino.watch(2)
# Read input state and duration
state = arduino.get_state(2)
duration = arduino.get_duration(2)
print(f"Pin 2 state: {state}, duration since change: {duration} ms")
# Wait for a state change to HIGH
arduino.wait_for(2, "HIGH")
# Disconnect
arduino.disconnect()
=====================================================================
"""
import serial
import time
class ArduinoController:
"""Provides a structured interface for Arduino serial control."""
def __init__(self, port, baudrate=115200, timeout=1.0):
self.port = port
self.baudrate = baudrate
self.timeout = timeout
self.ser = None
# -------------------------------------------------------------
# Connection management
# -------------------------------------------------------------
def connect(self):
"""Establish serial connection and wait for READY signal."""
self.ser = serial.Serial(self.port, self.baudrate, timeout=self.timeout)
time.sleep(2) # Allow Arduino reset
while True:
line = self._read_line()
if line == "READY":
break
def disconnect(self):
"""Close serial connection cleanly."""
if self.ser and self.ser.is_open:
self.ser.close()
# -------------------------------------------------------------
# Internal communication utilities
# -------------------------------------------------------------
def _send_command(self, command, expect_response=True):
"""Send a command string to the Arduino."""
if not self.ser or not self.ser.is_open:
raise ConnectionError("Serial connection not established.")
self.ser.write((command + "\n").encode("utf-8"))
if expect_response:
return self._read_line()
return None
def _read_line(self):
"""Read a single line from serial and strip whitespace."""
line = self.ser.readline().decode("utf-8").strip()
return line
# -------------------------------------------------------------
# Core command wrappers
# -------------------------------------------------------------
def get_version(self):
"""Return firmware version."""
return self._send_command("GET_VERSION")
def set_mode(self, pin, mode):
"""Configure a pin as INPUT or OUTPUT."""
return self._send_command(f"SET_MODE:{pin}:{mode}")
def get_pinmap(self):
"""Return current pin assignments."""
return self._send_command("GET_PINMAP")
def set_default(self, pin, state):
"""Set an output pin to a default HIGH or LOW state."""
return self._send_command(f"SET_DEFAULT:{pin}:{state}")
def set_for(self, pin, state, duration_ms):
"""Set an output pin state for a specific duration (ms)."""
return self._send_command(f"SET_FOR:{pin}:{state}:{duration_ms}")
def watch(self, pin):
"""Start watching an input pin for state changes."""
return self._send_command(f"WATCH:{pin}")
def watch_pin(self, pin, duration_ms=2000):
"""
Activates WATCH mode for a pin and collects CHANGE events for a period.
Args:
pin (int): Input pin number to watch.
duration_ms (int): How long to listen for events.
Returns:
list of dict: [{'pin': int, 'state': str, 'duration_ms': int}, ...]
"""
# Use the existing _send_command() method
self._send_command(f"WATCH:{pin}", expect_response=False)
messages = []
start = time.time()
while (time.time() - start) * 1000 < duration_ms:
if self.ser.in_waiting:
line = self._read_line()
if line.startswith("CHANGE:"):
# Example: CHANGE:2:HIGH:1421
parts = line.split(":")
if len(parts) == 4:
try:
messages.append({
'pin': int(parts[1]),
'state': parts[2],
'duration_ms': int(parts[3])
})
except ValueError:
pass
time.sleep(0.005)
return messages
def get_state(self, pin):
"""Return current state (HIGH or LOW) of a pin."""
response = self._send_command(f"GET_STATE:{pin}")
if response.startswith("STATE:"):
return response.split(":")[2]
return response
def get_duration(self, pin):
"""Return duration since last state change."""
response = self._send_command(f"GET_DURATION:{pin}")
if response.startswith("DURATION:"):
return int(response.split(":")[2])
return response
def wait_for(self, pin, state):
"""Block until specified pin reaches given state."""
response = self._send_command(f"WAIT_FOR:{pin}:{state}")
if response.startswith("WAIT_RESULT:"):
_, pin_str, final_state, duration = response.split(":")
return {"pin": int(pin_str), "state": final_state, "duration_ms": int(duration)}
return response
# -------------------------------------------------------------
# Continuous monitoring
# -------------------------------------------------------------
def monitor(self, callback=None):
"""
Continuously read from serial and process change notifications.
Optionally pass a callback(line) for custom handling.
"""
print("[INFO] Monitoring started. Press Ctrl+C to stop.")
try:
while True:
line = self._read_line()
if line:
if callback:
callback(line)
else:
print(line)
except KeyboardInterrupt:
print("\n[INFO] Monitoring stopped.")
# -------------------------------------------------------------
# Utility
# -------------------------------------------------------------
def flush(self):
"""Flush serial input buffer."""
if self.ser:
self.ser.reset_input_buffer()