| | import logging |
---|
| | import serial |
---|
| | from serial.tools.list_ports import comports |
---|
| | from typing import Union, List |
---|
| | |
---|
| | class ADCSettings(): |
---|
| | def __init__(self, dev:serial) -> None: |
---|
| | self._dev = dev |
---|
| | self._clk_freq = 25000000 |
---|
| | self._delay = 0 |
---|
| | |
---|
| | @property |
---|
| | def clk_freq(self) -> int: |
---|
| | """ |
---|
| | Get ADC CLK Frequency |
---|
| | """ |
---|
| | return self._clk_freq |
---|
| | |
---|
| | @clk_freq.setter |
---|
| | def clk_freq(self, freq:int) -> None: |
---|
| | """ |
---|
| | Set ADC CLK Frequency, valid between 0.5kHz and 31.25MHz |
---|
| | """ |
---|
| | BASE_CLK = 250000000 |
---|
| | pio_freq = freq*4 |
---|
| | divider = BASE_CLK/pio_freq |
---|
| | integer = int(divider) |
---|
| | frac = int((divider-integer)*256) |
---|
| | if frac == 256: |
---|
| | frac = 0 |
---|
| | integer += 1 |
---|
| | self._dev.write(f":ADC:PLL {integer},{frac}\n".encode("ascii")) |
---|
| | self._clk_freq = BASE_CLK/(integer+frac/256)/4 |
---|
| | |
---|
| | @property |
---|
| | def delay(self) -> int: |
---|
| | """ |
---|
| | Get delay between trigger and start of sampling in cycles (8.3ns) |
---|
| | """ |
---|
| | return self._delay |
---|
| | |
---|
| | @delay.setter |
---|
| | def delay(self, delay) -> None: |
---|
| | """ |
---|
| | Set delay between trigger and start of sampling in cycles (8.3ns) |
---|
| | """ |
---|
| | self._delay = delay |
---|
| | self._dev.write(f":ADC:DELAY {int(delay)}\n".encode("ascii")) |
---|
| | |
---|
| | class GlitchSettings(): |
---|
| | def __init__(self, dev:serial) -> None: |
---|
| | self._dev = dev |
---|
| | self._offset = 10 |
---|
| | self._repeat = 10 |
---|
| | |
---|
| | @property |
---|
| | def ext_offset(self) -> int: |
---|
| | """ |
---|
| | Delay between trigger and start of glitch in cycles (8.3ns) |
---|
| | """ |
---|
| | return self._offset |
---|
| | |
---|
| | @ext_offset.setter |
---|
| | def ext_offset(self, offset:int) -> None: |
---|
| | """ |
---|
| | Set delay between trigger and start of glitch in cycles (8.3ns) |
---|
| | """ |
---|
| | self._dev.write(f":GLITCH:DELAY {int(offset)}\n".encode("ascii")) |
---|
| | self._offset = offset |
---|
| | |
---|
| | @property |
---|
| | def repeat(self) -> int: |
---|
| | """Width of glitch in cycles (approx = 8.3 ns * width)""" |
---|
| | return self._repeat |
---|
| | |
---|
| | @repeat.setter |
---|
| | def repeat(self, width:int) -> None: |
---|
| | """ |
---|
| | Set width of glitch in cycles (8.3ns) |
---|
| | """ |
---|
| | self._dev.write(f":GLITCH:LEN {int(width)}\n".encode("ascii")) |
---|
| | self._repeat = width |
---|
| | |
---|
| | class GPIOSettings(): |
---|
| | def __init__(self, dev:serial) -> None: |
---|
| | self.gpio = [] |
---|
| | for i in range(0, 4): |
---|
| | self.gpio.append(list()) |
---|
| | self.dev = dev |
---|
| | self.MAX_CHANGES = 255 |
---|
| | self.MAX_DELAY = 2147483647 |
---|
| | |
---|
| | def add(self, pin:int, state:bool, delay:int=None, seconds:float=None) -> None: |
---|
| | """ |
---|
| | Add state change to gpio |
---|
| | |
---|
| | Arguments |
---|
| | --------- |
---|
| | pin : int |
---|
| | Which pin to add state change to, [0,3] |
---|
| | state : bool |
---|
| | What the state of the pin should be |
---|
| | delay : int |
---|
| | Number of cycles delay after state change, each cycle is 8.3ns |
---|
| | seconds : float |
---|
| | Seconds of delay after state change if delay is not provided |
---|
| | |
---|
| | Returns |
---|
| | ------- |
---|
| | None |
---|
| | """ |
---|
| | if pin < 0 or pin > 3: |
---|
| | raise ValueError("Pin must be between 0 and 3") |
---|
| | |
---|
| | if len(self.gpio[pin]) >= self.MAX_CHANGES: |
---|
| | raise ValueError("Pin reached max state changes") |
---|
| | |
---|
| | if delay is None: |
---|
| | if seconds is None: |
---|
| | raise ValueError("delay or seconds must be provided") |
---|
| | delay = int(seconds*100000000) |
---|
| | |
---|
| | if delay > self.MAX_DELAY: |
---|
| | raise ValueError("delay exceeds maximum") |
---|
| | |
---|
| | self.gpio[pin].append((delay << 1) | state) |
---|
| | |
---|
| | def reset(self) -> None: |
---|
| | """ |
---|
| | Reset all GPIO state changes |
---|
| | |
---|
| | Arguments |
---|
| | --------- |
---|
| | None |
---|
| | |
---|
| | Returns |
---|
| | ------- |
---|
| | None |
---|
| | """ |
---|
| | self.dev.write(b":GPIO:RESET\n") |
---|
| | for i in range(0, 4): |
---|
| | self.gpio[i].clear() |
---|
| | |
---|
| | def upload(self) -> None: |
---|
| | """ |
---|
| | Upload GPIO changes to device |
---|
| | |
---|
| | Arguments |
---|
| | --------- |
---|
| | None |
---|
| | |
---|
| | Returns |
---|
| | ------- |
---|
| | None |
---|
| | """ |
---|
| | self.dev.write(b":GPIO:RESET\n") |
---|
| | for i in range(0, 4): |
---|
| | for item in self.gpio[i]: |
---|
| | self.dev.write(f":GPIO:ADD {i},{item}\n".encode("ascii")) |
---|
| | |
---|
| | |
---|
| | class Scope(): |
---|
| | RISING_EDGE = 0 |
---|
| | FALLING_EDGE = 1 |
---|
| | |
---|
| | def __init__(self, port=None) -> None: |
---|
| | if port is None: |
---|
| | ports = comports() |
---|
| | matches = [p.device for p in ports if p.interface == "Curious Bolt API"] |
---|
| | if len(matches) != 1: |
---|
| | matches = [p.device for p in ports if p.product == "Curious Bolt"] |
---|
| | matches.sort() |
---|
| | matches.reverse() |
---|
| | if len(matches) != 2: |
---|
| | raise IOError('Curious Bolt device not found. Please check if it\'s connected, and pass its port explicitly if it is.') |
---|
| | port = matches[0] |
---|
| | |
---|
| | self._port = port |
---|
| | self._dev = serial.Serial(port, 115200*10, timeout=1.0) |
---|
| | self._dev.reset_input_buffer() |
---|
| | self._dev.write(b":VERSION?\n") |
---|
| | data = self._dev.readline().strip() |
---|
| | if data is None or data == b"": |
---|
| | raise ValueError("Unable to connect") |
---|
| | print(f"Connected to version: {data.decode('ascii')}") |
---|
| | self.adc = ADCSettings(self._dev) |
---|
| | self.glitch = GlitchSettings(self._dev) |
---|
| | self.io = GPIOSettings(self._dev) |
---|
| | |
---|
| | def arm(self, pin:int=0, edge:int=RISING_EDGE) -> None: |
---|
| | """ |
---|
| | Arms the glitch/gpio/adc based on trigger pin |
---|
| | |
---|
| | Arguments |
---|
| | --------- |
---|
| | pin : int |
---|
| | Which pin to use for trigger [0:7] |
---|
| | edge : int |
---|
| | On what edge to trigger can be RISING_EDGE or FALLING_EDGE |
---|
| | |
---|
| | Returns |
---|
| | ------- |
---|
| | None |
---|
| | """ |
---|
| | if pin < 0 or pin > 7: |
---|
| | raise ValueError("Pin invalid") |
---|
| | |
---|
| | if edge != self.RISING_EDGE and edge != self.FALLING_EDGE: |
---|
| | raise ValueError("Edge invalid") |
---|
| | |
---|
| | self._dev.write(f":TRIGGER:PIN {pin},{edge}\n".encode("ascii")) |
---|
| | |
---|
| | def trigger(self) -> None: |
---|
| | """ |
---|
| | Immediately trigger the glitch/gpio/adc |
---|
| | |
---|
| | Arguments |
---|
| | --------- |
---|
| | None |
---|
| | |
---|
| | Returns |
---|
| | ------- |
---|
| | None |
---|
| | """ |
---|
| | self._dev.write(b":TRIGGER:NOW\n") |
---|
| | |
---|
| | def default_setup(self) -> None: |
---|
| | """ |
---|
| | Load some safe defaults into settings |
---|
| | """ |
---|
| | self.glitch.repeat = 10 |
---|
| | self.glitch.ext_offset = 0 |
---|
| | self.adc.delay = 0 |
---|
| | self.adc.clk_freq = 10000000 |
---|
| | self.io.reset() |
---|
| | |
---|
| | def con(self) -> None: |
---|
| | """ |
---|
| | Connect to device if serial port is not open |
---|
| | """ |
---|
| | if not self._dev.is_open: |
---|
| | self._dev.open() |
---|
| | |
---|
| | def dis(self) -> None: |
---|
| | """ |
---|
| | Disconnect from serial port |
---|
| | """ |
---|
| | self._dev.close() |
---|
| | |
---|
| | def get_last_trace(self, as_int:bool=False) -> Union[List[int], List[float]]: |
---|
| | """ |
---|
| | Returns the latest captured data from ADC |
---|
| | |
---|
| | Arguments |
---|
| | --------- |
---|
| | as_int : bool |
---|
| | Returns the data as raw 10bit value from the adc |
---|
| | |
---|
| | Returns |
---|
| | ------- |
---|
| | data : list<int> |
---|
| | |
---|
| | """ |
---|
| | self._dev.reset_input_buffer() #Clear any data |
---|
| | self._dev.write(b":ADC:DATA?\n") |
---|
| | |
---|
| | data = self._dev.readline() |
---|
| | if data is None: |
---|
| | return [] |
---|
| | data = data.decode("ascii").strip() |
---|
| | if "ERR" in data: |
---|
| | logging.warning(f"Received: {data}") |
---|
| | return [] |
---|
| | data = data.split(",") |
---|
| | data = [x for x in data if x != ''] |
---|
| | if as_int: |
---|
| | return [int(x) for x in data] |
---|
| | volt_per_step = 2 / 10 / 1024 # 2V pk-pk, 10x amplified from source, in 10-bit ADC |
---|
| | return [(float(x)-512)*volt_per_step for x in data] |
---|
| | |
---|
| | def plot_last_trace(self, continuous=False): |
---|
| | try: |
---|
| | import matplotlib.pyplot as plt |
---|
| | except ImportError: |
---|
| | print("Dependencies missing, please install python package matplotlib") |
---|
| | return |
---|
| | plt.ion() |
---|
| | fig = plt.figure() |
---|
| | ax = fig.add_subplot(111) |
---|
| | ax.set_xlabel("Time since trigger (us)") |
---|
| | ax.set_ylabel("Voltage difference (mV)") |
---|
| | us_per_measurement = 1e6 / self.adc.clk_freq |
---|
| | line, = ax.plot([float(x) * us_per_measurement for x in range(50000)], [0] * 50000, 'b-') |
---|
| | |
---|
| | while True: |
---|
| | try: |
---|
| | res = self.get_last_trace() |
---|
| | if len(res) != 50000: |
---|
| | print(f"Got {len(res)} entries, skipping") |
---|
| | if continuous: |
---|
| | continue |
---|
| | else: |
---|
| | break |
---|
| | trace = [x*1000 for x in res] |
---|
| | line.set_ydata(trace) |
---|
| | ax.relim() |
---|
| | ax.autoscale_view() |
---|
| | fig.canvas.draw() |
---|
| | fig.canvas.flush_events() |
---|
| | if continuous: |
---|
| | self.trigger() |
---|
| | continue |
---|
| | else: |
---|
| | plt.show() |
---|
| | break |
---|
| | except KeyboardInterrupt: |
---|
| | break |
---|
| | |
---|
| | def update(self): |
---|
| | self._dev.write(b":BOOTLOADER\n") |
---|
| | |
---|
| | |
---|
| | |
---|
| | if __name__ == "__main__": |
---|
| | s = Scope() |
---|
| | s.default_setup() |
---|
| | s.trigger() |
---|
| | s.plot_last_trace(continuous=True) |
---|
| | |
---|
| | |