import logging
import serial
from serial.tools.list_ports import comports
from typing import Union, List
class ADCSettings():
def __init__(self, dev:serial) -> None:
self._dev = dev
self._clk_freq = 25000000
self._delay = 0
@property
def clk_freq(self) -> int:
"""
Get ADC CLK Frequency
"""
return self._clk_freq
@clk_freq.setter
def clk_freq(self, freq:int) -> None:
"""
Set ADC CLK Frequency, valid between 0.5kHz and 31.25MHz
"""
BASE_CLK = 250000000
pio_freq = freq*4
divider = BASE_CLK/pio_freq
integer = int(divider)
frac = int((divider-integer)*256)
if frac == 256:
frac = 0
integer += 1
self._dev.write(f":ADC:PLL {integer},{frac}\n".encode("ascii"))
self._clk_freq = BASE_CLK/(integer+frac/256)/4
@property
def delay(self) -> int:
"""
Get delay between trigger and start of sampling in cycles (8.3ns)
"""
return self._delay
@delay.setter
def delay(self, delay) -> None:
"""
Set delay between trigger and start of sampling in cycles (8.3ns)
"""
self._delay = delay
self._dev.write(f":ADC:DELAY {int(delay)}\n".encode("ascii"))
class GlitchSettings():
def __init__(self, dev:serial) -> None:
self._dev = dev
self._offset = 10
self._repeat = 10
@property
def ext_offset(self) -> int:
"""
Delay between trigger and start of glitch in cycles (8.3ns)
"""
return self._offset
@ext_offset.setter
def ext_offset(self, offset:int) -> None:
"""
Set delay between trigger and start of glitch in cycles (8.3ns)
"""
self._dev.write(f":GLITCH:DELAY {int(offset)}\n".encode("ascii"))
self._offset = offset
@property
def repeat(self) -> int:
"""Width of glitch in cycles (approx = 8.3 ns * width)"""
return self._repeat
@repeat.setter
def repeat(self, width:int) -> None:
"""
Set width of glitch in cycles (8.3ns)
"""
self._dev.write(f":GLITCH:LEN {int(width)}\n".encode("ascii"))
self._repeat = width
class GPIOSettings():
def __init__(self, dev:serial) -> None:
self.gpio = []
for i in range(0, 4):
self.gpio.append(list())
self.dev = dev
self.MAX_CHANGES = 255
self.MAX_DELAY = 2147483647
def add(self, pin:int, state:bool, delay:int=None, seconds:float=None) -> None:
"""
Add state change to gpio
Arguments
---------
pin : int
Which pin to add state change to, [0,3]
state : bool
What the state of the pin should be
delay : int
Number of cycles delay after state change, each cycle is 8.3ns
seconds : float
Seconds of delay after state change if delay is not provided
Returns
-------
None
"""
if pin < 0 or pin > 3:
raise ValueError("Pin must be between 0 and 3")
if len(self.gpio[pin]) >= self.MAX_CHANGES:
raise ValueError("Pin reached max state changes")
if delay is None:
if seconds is None:
raise ValueError("delay or seconds must be provided")
delay = int(seconds*100000000)
if delay > self.MAX_DELAY:
raise ValueError("delay exceeds maximum")
self.gpio[pin].append((delay << 1) | state)
def reset(self) -> None:
"""
Reset all GPIO state changes
Arguments
---------
None
Returns
-------
None
"""
self.dev.write(b":GPIO:RESET\n")
for i in range(0, 4):
self.gpio[i].clear()
def upload(self) -> None:
"""
Upload GPIO changes to device
Arguments
---------
None
Returns
-------
None
"""
self.dev.write(b":GPIO:RESET\n")
for i in range(0, 4):
for item in self.gpio[i]:
self.dev.write(f":GPIO:ADD {i},{item}\n".encode("ascii"))
class Scope():
RISING_EDGE = 0
FALLING_EDGE = 1
def __init__(self, port=None) -> None:
if port is None:
ports = comports()
matches = [p.device for p in ports if p.interface == "Curious Bolt API"]
if len(matches) != 1:
matches = [p.device for p in ports if p.product == "Curious Bolt"]
matches.sort()
matches.reverse()
if len(matches) != 2:
raise IOError('Curious Bolt device not found. Please check if it\'s connected, and pass its port explicitly if it is.')
port = matches[0]
self._port = port
self._dev = serial.Serial(port, 115200*10, timeout=1.0)
self._dev.reset_input_buffer()
self._dev.write(b":VERSION?\n")
data = self._dev.readline().strip()
if data is None or data == b"":
raise ValueError("Unable to connect")
print(f"Connected to version: {data.decode('ascii')}")
self.adc = ADCSettings(self._dev)
self.glitch = GlitchSettings(self._dev)
self.io = GPIOSettings(self._dev)
def arm(self, pin:int=0, edge:int=RISING_EDGE) -> None:
"""
Arms the glitch/gpio/adc based on trigger pin
Arguments
---------
pin : int
Which pin to use for trigger [0:7]
edge : int
On what edge to trigger can be RISING_EDGE or FALLING_EDGE
Returns
-------
None
"""
if pin < 0 or pin > 7:
raise ValueError("Pin invalid")
if edge != self.RISING_EDGE and edge != self.FALLING_EDGE:
raise ValueError("Edge invalid")
self._dev.write(f":TRIGGER:PIN {pin},{edge}\n".encode("ascii"))
def trigger(self) -> None:
"""
Immediately trigger the glitch/gpio/adc
Arguments
---------
None
Returns
-------
None
"""
self._dev.write(b":TRIGGER:NOW\n")
def default_setup(self) -> None:
"""
Load some safe defaults into settings
"""
self.glitch.repeat = 10
self.glitch.ext_offset = 0
self.adc.delay = 0
self.adc.clk_freq = 10000000
self.io.reset()
def con(self) -> None:
"""
Connect to device if serial port is not open
"""
if not self._dev.is_open:
self._dev.open()
def dis(self) -> None:
"""
Disconnect from serial port
"""
self._dev.close()
def get_last_trace(self, as_int:bool=False) -> Union[List[int], List[float]]:
"""
Returns the latest captured data from ADC
Arguments
---------
as_int : bool
Returns the data as raw 10bit value from the adc
Returns
-------
data : list<int>
"""
self._dev.reset_input_buffer() #Clear any data
self._dev.write(b":ADC:DATA?\n")
data = self._dev.readline()
if data is None:
return []
data = data.decode("ascii").strip()
if "ERR" in data:
logging.warning(f"Received: {data}")
return []
data = data.split(",")
data = [x for x in data if x != '']
if as_int:
return [int(x) for x in data]
volt_per_step = 2 / 10 / 1024 # 2V pk-pk, 10x amplified from source, in 10-bit ADC
return [(float(x)-512)*volt_per_step for x in data]
def plot_last_trace(self, continuous=False):
try:
import matplotlib.pyplot as plt
except ImportError:
print("Dependencies missing, please install python package matplotlib")
return
plt.ion()
fig = plt.figure()
ax = fig.add_subplot(111)
ax.set_xlabel("Time since trigger (us)")
ax.set_ylabel("Voltage difference (mV)")
us_per_measurement = 1e6 / self.adc.clk_freq
line, = ax.plot([float(x) * us_per_measurement for x in range(50000)], [0] * 50000, 'b-')
while True:
try:
res = self.get_last_trace()
if len(res) != 50000:
print(f"Got {len(res)} entries, skipping")
if continuous:
continue
else:
break
trace = [x*1000 for x in res]
line.set_ydata(trace)
ax.relim()
ax.autoscale_view()
fig.canvas.draw()
fig.canvas.flush_events()
if continuous:
self.trigger()
continue
else:
plt.show()
break
except KeyboardInterrupt:
break
def update(self):
self._dev.write(b":BOOTLOADER\n")
if __name__ == "__main__":
s = Scope()
s.default_setup()
s.trigger()
s.plot_last_trace(continuous=True)