import time
import numpy as np
import pathlib
from . import base
# TODO Check trips?
# TODO Set limits?
# NOTE Weird manufacturer bug:
# When running "HVOF" just after to_defaultstate(), any future "HVON"
# command returns error "unrecognized command, syntax error".
# This can be only remedied by running a sequence "*RST" + "HVON".
#
# Hopefully, this will be prevented by explicitely checking whether
# HV is on/off before running HVON/HVOF in output_on/off().
[docs]
class PowerSupply_KI2290(base.VisaHardwareBase):
"""Digital power supply 5kV (Paschen experiment)
Model name: KEITHLEY KI2290
Communication:
- GPIB (USB adapter), using VISA protocol and SCPI commands
"""
SAVE_DIR = pathlib.Path.home() / "saved_data"
def __init__(self, **kwargs):
super().__init__("GPIB::14::INSTR", **kwargs)
self._last = {
"vset": 0,
"vout": 0,
"iout": 0,
"ouptut": False,
"sweep": np.full((3, 0), np.nan),
}
self._progress = 0
self._sweeping = False
# ==== Inherited abstract methods ====
def _safestate(self):
"""Safety kill-switch. Turns off anything dangerous after interrupt."""
self.raw_scpi("HVOF")
self._sweeping = False
def _readerror(self):
"""Read new error from device."""
errormsg = {
4: "Query error, output queue overflow",
8: "Recall error, stored config is corrupt",
16: "Execution, out-of-range parameter or operation interrupted",
32: "Command error, unrecognized command or syntax error",
}
esr = int(self.raw_scpi("*ESR?")) & 0b00111100
if esr in errormsg:
return errormsg[esr]
elif esr > 0:
return "Multiple errors, combined code {}".format(esr)
else:
return None
def _defaultstate(self):
"""Init device to a well defined state. (Auto called after connect.)"""
self.raw_scpi("*RST")
self.raw_scpi("*CLS")
self.raw_scpi("*RCL 0")
# Set voltage & current limits?
# def _clear_defaultstate(self):
# """Clean-up device settings (i.e. keys lock). (Auto called before disconnect.)"""
# pass
# ==== Commands ====
[docs]
@base.base_command
def set_voltage(self, voltage):
"""Set requested voltage value [V]."""
voltage = int(voltage)
if not (0 <= voltage <= 5000):
raise base.CommandError("Voltage is out of bounds 0-5kV")
self.raw_scpi("VSET {:d}".format(voltage))
self._last["vset"] = voltage
[docs]
@base.base_command
def voltage_req(self):
"""Return requested voltage (not output voltage) [V]."""
volts = float(self.raw_scpi("VSET?"))
self._last["vset"] = volts
return volts
[docs]
@base.base_command
def voltage_out(self):
"""Return meassured output voltage [V]."""
volts = float(self.raw_scpi("VOUT?"))
self._last["vout"] = volts
return volts
[docs]
@base.base_command
def current_out(self):
"""Return meassured output current [mA].
Note: The value takes approx. 1s to stabilize after a change in current.
"""
amps = float(self.raw_scpi("IOUT?"))
self._last["iout"] = amps
return amps
[docs]
@base.base_command
def output_on(self):
"""Turn on voltage output."""
if not self.is_output_on():
self.raw_scpi("HVON")
self._last["output"] = True
[docs]
@base.base_command
def output_off(self):
"""Turn off voltage output."""
if self.is_output_on():
self.raw_scpi("HVOF")
self._last["output"] = False
[docs]
@base.base_command
def is_output_on(self):
self._last["output"] = (int(self.raw_scpi("*STB?")) & 128) != 0
return self._last["output"]
[docs]
@base.compound_command
def output_on_for(self, duration):
"""Turn on voltage for fixed time duration [s]."""
duration = max(0, float(duration))
if duration > 60:
raise base.CommandError("Max duration is 60s")
self.output_on()
self.log.info("Voltage is ON for {:.2f} s".format(duration))
t0 = time.time()
while(time.time() - t0 < duration):
# Update current values in self._last
self.voltage_out()
self.current_out()
self._progress = (time.time() - t0) / duration
time.sleep(0.1)
self._progress = 1
self.output_off()
self.log.info("Task finished")
[docs]
@base.compound_command
def sweep(self, device_client, start, stop, steps, stabilize=0.5):
"""Execute a voltage sweep in range (start[V], stop[V]) with specified
number of steps. Data is stored in property ``sweep_data`` and also
saved to a file. Filename is returned.
Parameter stabilize[s] determines how long we wait in order to let
stabilize the output current.
"""
start = float(start)
stop = float(stop)
steps = int(steps)
stabilize = float(stabilize)
if not (0 <= start <= 5000 and 0 <= stop <= 5000):
raise base.CommandError("Start or stop is out of bounds 0-5kV")
if not (2 <= steps <= 100):
raise base.CommandError("Step count is out of bounds 2-100")
self.log.info("Sweep %.1f-%.1fV, %d steps, stabilizing delay %.1fs",
start, stop, steps, stabilize)
self.output_on()
self._last["sweep"] = np.full((3, steps), np.nan)
self._sweeping = True
t0 = time.time()
for i, volts in enumerate(np.linspace(start, stop, steps)):
self.set_voltage(np.round(volts))
time.sleep(stabilize)
self._last["sweep"][0, i] = time.time() - t0
self._last["sweep"][1, i] = self.voltage_out()
self._last["sweep"][2, i] = self.current_out()
self._progress = (i + 1) / steps
self.output_off()
self.log.info("Task finished")
self._sweeping = False
# Save the sweep data and emit to server
filepath = self._save_sweep_data()
if device_client is not None:
with filepath.open("r") as file:
device_client.emit_datafile(file, filepath.name)
return str(filepath)
@property
def sweep_data(self):
"""Data from last voltage sweep in format (voltage[V], current[mA])."""
return self._last["sweep"]
@base.base_command # Make sure sweep() is not modifying the data at the moment
def _save_sweep_data(self):
"""Save data from last sweep to file. Returns the filepath.
"""
if self._last["sweep"].size == 0:
raise base.CommandError("No data to save")
elif np.any(np.isnan(self._last["sweep"])):
self.log.warning("Saving incomplete data - sweep interrupted?")
self.SAVE_DIR.mkdir(exist_ok=True)
filepath = "{}_sweep_{}.txt".format(type(self).__name__, time.asctime())
filepath = self.SAVE_DIR / filepath.replace(" ", "_").replace(":", "-")
h = "Time[s] Voltage[V] Current[mA]"
np.savetxt(filepath, self._last["sweep"].T, fmt="%.6e", header=h)
self.log.info("Saved sweep to '{}'".format(filepath))
return filepath
# ==== Methods/commands related to DeviceClient ====
@base.idle_command
def _idle_update_all(self):
# Call to each of these methods updates self._last
self.voltage_req()
self.voltage_out()
self.current_out()
self.is_output_on()
[docs]
def update_frontend(self, device_client):
self._idle_update_all()
vout = self._last["vout"]
device_client.emit("value", {
"value": vout,
"formatted": "{:.0f} V".format(vout),
"label": "V out",
"min": 0,
"max": 5000,
"id": "vout"
})
iout = self._last["iout"]
device_client.emit("value", {
"value": iout,
"formatted": "{:.2f} mA".format(iout),
"label": "I out",
"min": 0,
"max": 100,
"id": "iout"
})
output = self._last["output"]
device_client.emit("value", {
"value": 1 if output else 0,
"formatted": "ON" if output else "OFF",
"label": "Output",
"min": 0,
"max": 1,
"id": "output"
})
device_client.emit("value", {
"value": self._progress,
"formatted": "",
"label": "Task progress",
"min": 0,
"max": 1,
"id": "progress"
})
if self._sweeping:
data = self._last["sweep"][:, np.isfinite(self._last["sweep"][1])]
if data.size > 0:
device_client.emit("graph", {
"title": "Sweeped voltage",
"x": data[0].tolist(),
"y": data[1].tolist(),
"xlabel": "U_out [V]",
"ylabel": "I_out [mA]",
"id": "graph_sweep"
})