Newer
Older
Hardware / FaultInjection / prereqs / CuriousBolt / scope.py
import logging
import serial
from serial.tools.list_ports import comports
from typing import Union, List

class ADCSettings():
    def __init__(self, dev:serial) -> None:
        self._dev = dev
        self._clk_freq = 25000000
        self._delay = 0

    @property
    def clk_freq(self) -> int:
        """
        Get ADC CLK Frequency
        """
        return self._clk_freq
    
    @clk_freq.setter
    def clk_freq(self, freq:int) -> None:
        """
        Set ADC CLK Frequency, valid between 0.5kHz and 31.25MHz
        """
        BASE_CLK = 250000000
        pio_freq = freq*4
        divider = BASE_CLK/pio_freq
        integer = int(divider)
        frac = int((divider-integer)*256)
        if frac == 256:
            frac = 0
            integer += 1
        self._dev.write(f":ADC:PLL {integer},{frac}\n".encode("ascii"))
        self._clk_freq = BASE_CLK/(integer+frac/256)/4
    
    @property
    def delay(self) -> int:
        """
        Get delay between trigger and start of sampling in cycles (8.3ns)
        """
        return self._delay
    
    @delay.setter
    def delay(self, delay) -> None:
        """
        Set delay between trigger and start of sampling in cycles (8.3ns)
        """
        self._delay = delay
        self._dev.write(f":ADC:DELAY {int(delay)}\n".encode("ascii"))

class GlitchSettings():
    def __init__(self, dev:serial) -> None:
        self._dev = dev
        self._offset = 10
        self._repeat = 10

    @property
    def ext_offset(self) -> int:
        """
        Delay between trigger and start of glitch in cycles (8.3ns)
        """
        return self._offset
    
    @ext_offset.setter
    def ext_offset(self, offset:int) -> None:
        """
        Set delay between trigger and start of glitch in cycles (8.3ns)
        """
        self._dev.write(f":GLITCH:DELAY {int(offset)}\n".encode("ascii"))
        self._offset = offset
    
    @property
    def repeat(self) -> int:
        """Width of glitch in cycles (approx = 8.3 ns * width)"""
        return self._repeat

    @repeat.setter
    def repeat(self, width:int) -> None:
        """
        Set width of glitch in cycles (8.3ns)
        """
        self._dev.write(f":GLITCH:LEN {int(width)}\n".encode("ascii"))
        self._repeat = width

class GPIOSettings():
    def __init__(self, dev:serial) -> None:
        self.gpio = []
        for i in range(0, 4):
            self.gpio.append(list())
        self.dev = dev
        self.MAX_CHANGES = 255
        self.MAX_DELAY = 2147483647
        
    def add(self, pin:int, state:bool, delay:int=None, seconds:float=None) -> None:
        """
        Add state change to gpio

        Arguments
        ---------
        pin : int
            Which pin to add state change to, [0,3]
        state : bool
            What the state of the pin should be
        delay : int
            Number of cycles delay after state change, each cycle is 8.3ns
        seconds : float
            Seconds of delay after state change if delay is not provided

        Returns
        -------
        None        
        """
        if pin < 0 or pin > 3:
            raise ValueError("Pin must be between 0 and 3")
        
        if len(self.gpio[pin]) >= self.MAX_CHANGES:
            raise ValueError("Pin reached max state changes")

        if delay is None:
            if seconds is None:
                raise ValueError("delay or seconds must be provided")
            delay = int(seconds*100000000)

        if delay > self.MAX_DELAY:
            raise ValueError("delay exceeds maximum")
        
        self.gpio[pin].append((delay << 1) | state)
    
    def reset(self) -> None:
        """
        Reset all GPIO state changes

        Arguments
        ---------
        None

        Returns
        -------
        None
        """
        self.dev.write(b":GPIO:RESET\n")
        for i in range(0, 4):
            self.gpio[i].clear()

    def upload(self) -> None:
        """
        Upload GPIO changes to device

        Arguments
        ---------
        None

        Returns
        -------
        None
        """
        self.dev.write(b":GPIO:RESET\n")
        for i in range(0, 4):
            for item in self.gpio[i]:
                self.dev.write(f":GPIO:ADD {i},{item}\n".encode("ascii"))
    

class Scope():
    RISING_EDGE = 0
    FALLING_EDGE = 1

    def __init__(self, port=None) -> None:
        if port is None:
            ports = comports()
            matches = [p.device for p in ports if p.interface == "Curious Bolt API"]
            if len(matches) != 1:
                matches = [p.device for p in ports if p.product == "Curious Bolt"]
                matches.sort()
                matches.reverse()
                if len(matches) != 2:
                    raise IOError('Curious Bolt device not found. Please check if it\'s connected, and pass its port explicitly if it is.')
            port = matches[0]

        self._port = port
        self._dev = serial.Serial(port, 115200*10, timeout=1.0)
        self._dev.reset_input_buffer()
        self._dev.write(b":VERSION?\n")
        data = self._dev.readline().strip()
        if data is None or data == b"":
            raise ValueError("Unable to connect")
        print(f"Connected to version: {data.decode('ascii')}")
        self.adc = ADCSettings(self._dev)
        self.glitch = GlitchSettings(self._dev)
        self.io = GPIOSettings(self._dev)

    def arm(self, pin:int=0, edge:int=RISING_EDGE) -> None:
        """
        Arms the glitch/gpio/adc based on trigger pin

        Arguments
        ---------
        pin : int
            Which pin to use for trigger [0:7]
        edge : int
            On what edge to trigger can be RISING_EDGE or FALLING_EDGE

        Returns
        -------
        None
        """
        if pin < 0 or pin > 7:
            raise ValueError("Pin invalid")
        
        if edge != self.RISING_EDGE and edge != self.FALLING_EDGE:
            raise ValueError("Edge invalid")

        self._dev.write(f":TRIGGER:PIN {pin},{edge}\n".encode("ascii"))

    def trigger(self) -> None:
        """
        Immediately trigger the glitch/gpio/adc

        Arguments
        ---------
        None

        Returns
        -------
        None
        """
        self._dev.write(b":TRIGGER:NOW\n")
    
    def default_setup(self) -> None:
        """
        Load some safe defaults into settings
        """
        self.glitch.repeat = 10
        self.glitch.ext_offset = 0
        self.adc.delay = 0
        self.adc.clk_freq = 10000000
        self.io.reset()

    def con(self) -> None:
        """
        Connect to device if serial port is not open
        """
        if not self._dev.is_open:
            self._dev.open()

    def dis(self) -> None:
        """
        Disconnect from serial port
        """
        self._dev.close()

    def get_last_trace(self, as_int:bool=False) -> Union[List[int], List[float]]:
        """
        Returns the latest captured data from ADC

        Arguments
        ---------
        as_int : bool
            Returns the data as raw 10bit value from the adc
        
        Returns
        -------
        data : list<int>
        
        """
        self._dev.reset_input_buffer() #Clear any data
        self._dev.write(b":ADC:DATA?\n")

        data = self._dev.readline()
        if data is None:
            return []
        data = data.decode("ascii").strip()
        if "ERR" in data:
            logging.warning(f"Received: {data}")
            return []
        data = data.split(",")
        data = [x for x in data if x != '']
        if as_int:
            return [int(x) for x in data]
        volt_per_step = 2 / 10 / 1024  # 2V pk-pk, 10x amplified from source, in 10-bit ADC
        return [(float(x)-512)*volt_per_step for x in data]

    def plot_last_trace(self, continuous=False):
        try:
            import matplotlib.pyplot as plt 
        except ImportError:
            print("Dependencies missing, please install python package matplotlib")
            return
        plt.ion()
        fig = plt.figure()
        ax = fig.add_subplot(111)
        ax.set_xlabel("Time since trigger (us)")
        ax.set_ylabel("Voltage difference (mV)")
        us_per_measurement = 1e6 / self.adc.clk_freq
        line, = ax.plot([float(x) * us_per_measurement for x in range(50000)], [0] * 50000, 'b-')

        while True:
            try:
                res = self.get_last_trace()
                if len(res) != 50000:
                    print(f"Got {len(res)} entries, skipping")
                    if continuous:
                        continue
                    else:
                        break
                trace = [x*1000 for x in res]
                line.set_ydata(trace)
                ax.relim()
                ax.autoscale_view()
                fig.canvas.draw()
                fig.canvas.flush_events()
                if continuous:
                    self.trigger()
                    continue
                else:
                    plt.show()
                    break
            except KeyboardInterrupt:
                break

    def update(self):
        self._dev.write(b":BOOTLOADER\n")
    


if __name__ == "__main__":
    s = Scope()
    s.default_setup()
    s.trigger()
    s.plot_last_trace(continuous=True)