- import logging
- import serial
- from serial.tools.list_ports import comports
- from typing import Union, List
-
- class ADCSettings():
- def __init__(self, dev:serial) -> None:
- self._dev = dev
- self._clk_freq = 25000000
- self._delay = 0
-
- @property
- def clk_freq(self) -> int:
- """
- Get ADC CLK Frequency
- """
- return self._clk_freq
-
- @clk_freq.setter
- def clk_freq(self, freq:int) -> None:
- """
- Set ADC CLK Frequency, valid between 0.5kHz and 31.25MHz
- """
- BASE_CLK = 250000000
- pio_freq = freq*4
- divider = BASE_CLK/pio_freq
- integer = int(divider)
- frac = int((divider-integer)*256)
- if frac == 256:
- frac = 0
- integer += 1
- self._dev.write(f":ADC:PLL {integer},{frac}\n".encode("ascii"))
- self._clk_freq = BASE_CLK/(integer+frac/256)/4
-
- @property
- def delay(self) -> int:
- """
- Get delay between trigger and start of sampling in cycles (8.3ns)
- """
- return self._delay
-
- @delay.setter
- def delay(self, delay) -> None:
- """
- Set delay between trigger and start of sampling in cycles (8.3ns)
- """
- self._delay = delay
- self._dev.write(f":ADC:DELAY {int(delay)}\n".encode("ascii"))
-
- class GlitchSettings():
- def __init__(self, dev:serial) -> None:
- self._dev = dev
- self._offset = 10
- self._repeat = 10
-
- @property
- def ext_offset(self) -> int:
- """
- Delay between trigger and start of glitch in cycles (8.3ns)
- """
- return self._offset
-
- @ext_offset.setter
- def ext_offset(self, offset:int) -> None:
- """
- Set delay between trigger and start of glitch in cycles (8.3ns)
- """
- self._dev.write(f":GLITCH:DELAY {int(offset)}\n".encode("ascii"))
- self._offset = offset
-
- @property
- def repeat(self) -> int:
- """Width of glitch in cycles (approx = 8.3 ns * width)"""
- return self._repeat
-
- @repeat.setter
- def repeat(self, width:int) -> None:
- """
- Set width of glitch in cycles (8.3ns)
- """
- self._dev.write(f":GLITCH:LEN {int(width)}\n".encode("ascii"))
- self._repeat = width
-
- class GPIOSettings():
- def __init__(self, dev:serial) -> None:
- self.gpio = []
- for i in range(0, 4):
- self.gpio.append(list())
- self.dev = dev
- self.MAX_CHANGES = 255
- self.MAX_DELAY = 2147483647
-
- def add(self, pin:int, state:bool, delay:int=None, seconds:float=None) -> None:
- """
- Add state change to gpio
-
- Arguments
- ---------
- pin : int
- Which pin to add state change to, [0,3]
- state : bool
- What the state of the pin should be
- delay : int
- Number of cycles delay after state change, each cycle is 8.3ns
- seconds : float
- Seconds of delay after state change if delay is not provided
-
- Returns
- -------
- None
- """
- if pin < 0 or pin > 3:
- raise ValueError("Pin must be between 0 and 3")
-
- if len(self.gpio[pin]) >= self.MAX_CHANGES:
- raise ValueError("Pin reached max state changes")
-
- if delay is None:
- if seconds is None:
- raise ValueError("delay or seconds must be provided")
- delay = int(seconds*100000000)
-
- if delay > self.MAX_DELAY:
- raise ValueError("delay exceeds maximum")
-
- self.gpio[pin].append((delay << 1) | state)
-
- def reset(self) -> None:
- """
- Reset all GPIO state changes
-
- Arguments
- ---------
- None
-
- Returns
- -------
- None
- """
- self.dev.write(b":GPIO:RESET\n")
- for i in range(0, 4):
- self.gpio[i].clear()
-
- def upload(self) -> None:
- """
- Upload GPIO changes to device
-
- Arguments
- ---------
- None
-
- Returns
- -------
- None
- """
- self.dev.write(b":GPIO:RESET\n")
- for i in range(0, 4):
- for item in self.gpio[i]:
- self.dev.write(f":GPIO:ADD {i},{item}\n".encode("ascii"))
-
-
- class Scope():
- RISING_EDGE = 0
- FALLING_EDGE = 1
-
- def __init__(self, port=None) -> None:
- if port is None:
- ports = comports()
- matches = [p.device for p in ports if p.interface == "Curious Bolt API"]
- if len(matches) != 1:
- matches = [p.device for p in ports if p.product == "Curious Bolt"]
- matches.sort()
- matches.reverse()
- if len(matches) != 2:
- raise IOError('Curious Bolt device not found. Please check if it\'s connected, and pass its port explicitly if it is.')
- port = matches[0]
-
- self._port = port
- self._dev = serial.Serial(port, 115200*10, timeout=1.0)
- self._dev.reset_input_buffer()
- self._dev.write(b":VERSION?\n")
- data = self._dev.readline().strip()
- if data is None or data == b"":
- raise ValueError("Unable to connect")
- print(f"Connected to version: {data.decode('ascii')}")
- self.adc = ADCSettings(self._dev)
- self.glitch = GlitchSettings(self._dev)
- self.io = GPIOSettings(self._dev)
-
- def arm(self, pin:int=0, edge:int=RISING_EDGE) -> None:
- """
- Arms the glitch/gpio/adc based on trigger pin
-
- Arguments
- ---------
- pin : int
- Which pin to use for trigger [0:7]
- edge : int
- On what edge to trigger can be RISING_EDGE or FALLING_EDGE
-
- Returns
- -------
- None
- """
- if pin < 0 or pin > 7:
- raise ValueError("Pin invalid")
-
- if edge != self.RISING_EDGE and edge != self.FALLING_EDGE:
- raise ValueError("Edge invalid")
-
- self._dev.write(f":TRIGGER:PIN {pin},{edge}\n".encode("ascii"))
-
- def trigger(self) -> None:
- """
- Immediately trigger the glitch/gpio/adc
-
- Arguments
- ---------
- None
-
- Returns
- -------
- None
- """
- self._dev.write(b":TRIGGER:NOW\n")
-
- def default_setup(self) -> None:
- """
- Load some safe defaults into settings
- """
- self.glitch.repeat = 10
- self.glitch.ext_offset = 0
- self.adc.delay = 0
- self.adc.clk_freq = 10000000
- self.io.reset()
-
- def con(self) -> None:
- """
- Connect to device if serial port is not open
- """
- if not self._dev.is_open:
- self._dev.open()
-
- def dis(self) -> None:
- """
- Disconnect from serial port
- """
- self._dev.close()
-
- def get_last_trace(self, as_int:bool=False) -> Union[List[int], List[float]]:
- """
- Returns the latest captured data from ADC
-
- Arguments
- ---------
- as_int : bool
- Returns the data as raw 10bit value from the adc
-
- Returns
- -------
- data : list<int>
-
- """
- self._dev.reset_input_buffer() #Clear any data
- self._dev.write(b":ADC:DATA?\n")
-
- data = self._dev.readline()
- if data is None:
- return []
- data = data.decode("ascii").strip()
- if "ERR" in data:
- logging.warning(f"Received: {data}")
- return []
- data = data.split(",")
- data = [x for x in data if x != '']
- if as_int:
- return [int(x) for x in data]
- volt_per_step = 2 / 10 / 1024 # 2V pk-pk, 10x amplified from source, in 10-bit ADC
- return [(float(x)-512)*volt_per_step for x in data]
-
- def plot_last_trace(self, continuous=False):
- try:
- import matplotlib.pyplot as plt
- except ImportError:
- print("Dependencies missing, please install python package matplotlib")
- return
- plt.ion()
- fig = plt.figure()
- ax = fig.add_subplot(111)
- ax.set_xlabel("Time since trigger (us)")
- ax.set_ylabel("Voltage difference (mV)")
- us_per_measurement = 1e6 / self.adc.clk_freq
- line, = ax.plot([float(x) * us_per_measurement for x in range(50000)], [0] * 50000, 'b-')
-
- while True:
- try:
- res = self.get_last_trace()
- if len(res) != 50000:
- print(f"Got {len(res)} entries, skipping")
- if continuous:
- continue
- else:
- break
- trace = [x*1000 for x in res]
- line.set_ydata(trace)
- ax.relim()
- ax.autoscale_view()
- fig.canvas.draw()
- fig.canvas.flush_events()
- if continuous:
- self.trigger()
- continue
- else:
- plt.show()
- break
- except KeyboardInterrupt:
- break
-
- def update(self):
- self._dev.write(b":BOOTLOADER\n")
-
-
-
- if __name__ == "__main__":
- s = Scope()
- s.default_setup()
- s.trigger()
- s.plot_last_trace(continuous=True)