Source code for device.power_supply_ki2290

import time
import numpy as np
import pathlib

from . import base


# TODO Check trips?
# TODO Set limits?

# NOTE Weird manufacturer bug:
#      When running "HVOF" just after to_defaultstate(), any future "HVON"
#      command returns error "unrecognized command, syntax error".
#      This can be only remedied by running a sequence "*RST" + "HVON".
#
#      Hopefully, this will be prevented by explicitely checking whether
#      HV is on/off before running HVON/HVOF in output_on/off().

[docs] class PowerSupply_KI2290(base.VisaHardwareBase): """Digital power supply 5kV (Paschen experiment) Model name: KEITHLEY KI2290 Communication: - GPIB (USB adapter), using VISA protocol and SCPI commands """ SAVE_DIR = pathlib.Path.home() / "saved_data" def __init__(self, **kwargs): super().__init__("GPIB::14::INSTR", **kwargs) self._last = { "vset": 0, "vout": 0, "iout": 0, "ouptut": False, "sweep": np.full((3, 0), np.nan), } self._progress = 0 self._sweeping = False # ==== Inherited abstract methods ==== def _safestate(self): """Safety kill-switch. Turns off anything dangerous after interrupt.""" self.raw_scpi("HVOF") self._sweeping = False def _readerror(self): """Read new error from device.""" errormsg = { 4: "Query error, output queue overflow", 8: "Recall error, stored config is corrupt", 16: "Execution, out-of-range parameter or operation interrupted", 32: "Command error, unrecognized command or syntax error", } esr = int(self.raw_scpi("*ESR?")) & 0b00111100 if esr in errormsg: return errormsg[esr] elif esr > 0: return "Multiple errors, combined code {}".format(esr) else: return None def _defaultstate(self): """Init device to a well defined state. (Auto called after connect.)""" self.raw_scpi("*RST") self.raw_scpi("*CLS") self.raw_scpi("*RCL 0") # Set voltage & current limits? # def _clear_defaultstate(self): # """Clean-up device settings (i.e. keys lock). (Auto called before disconnect.)""" # pass # ==== Commands ====
[docs] @base.base_command def set_voltage(self, voltage): """Set requested voltage value [V].""" voltage = int(voltage) if not (0 <= voltage <= 5000): raise base.CommandError("Voltage is out of bounds 0-5kV") self.raw_scpi("VSET {:d}".format(voltage)) self._last["vset"] = voltage
[docs] @base.base_command def voltage_req(self): """Return requested voltage (not output voltage) [V].""" volts = float(self.raw_scpi("VSET?")) self._last["vset"] = volts return volts
[docs] @base.base_command def voltage_out(self): """Return meassured output voltage [V].""" volts = float(self.raw_scpi("VOUT?")) self._last["vout"] = volts return volts
[docs] @base.base_command def current_out(self): """Return meassured output current [mA]. Note: The value takes approx. 1s to stabilize after a change in current. """ amps = float(self.raw_scpi("IOUT?")) self._last["iout"] = amps return amps
[docs] @base.base_command def output_on(self): """Turn on voltage output.""" if not self.is_output_on(): self.raw_scpi("HVON") self._last["output"] = True
[docs] @base.base_command def output_off(self): """Turn off voltage output.""" if self.is_output_on(): self.raw_scpi("HVOF") self._last["output"] = False
[docs] @base.base_command def is_output_on(self): self._last["output"] = (int(self.raw_scpi("*STB?")) & 128) != 0 return self._last["output"]
[docs] @base.compound_command def output_on_for(self, duration): """Turn on voltage for fixed time duration [s].""" duration = max(0, float(duration)) if duration > 60: raise base.CommandError("Max duration is 60s") self.output_on() self.log.info("Voltage is ON for {:.2f} s".format(duration)) t0 = time.time() while(time.time() - t0 < duration): # Update current values in self._last self.voltage_out() self.current_out() self._progress = (time.time() - t0) / duration time.sleep(0.1) self._progress = 1 self.output_off() self.log.info("Task finished")
[docs] @base.compound_command def sweep(self, device_client, start, stop, steps, stabilize=0.5): """Execute a voltage sweep in range (start[V], stop[V]) with specified number of steps. Data is stored in property ``sweep_data`` and also saved to a file. Filename is returned. Parameter stabilize[s] determines how long we wait in order to let stabilize the output current. """ start = float(start) stop = float(stop) steps = int(steps) stabilize = float(stabilize) if not (0 <= start <= 5000 and 0 <= stop <= 5000): raise base.CommandError("Start or stop is out of bounds 0-5kV") if not (2 <= steps <= 100): raise base.CommandError("Step count is out of bounds 2-100") self.log.info("Sweep %.1f-%.1fV, %d steps, stabilizing delay %.1fs", start, stop, steps, stabilize) self.output_on() self._last["sweep"] = np.full((3, steps), np.nan) self._sweeping = True t0 = time.time() for i, volts in enumerate(np.linspace(start, stop, steps)): self.set_voltage(np.round(volts)) time.sleep(stabilize) self._last["sweep"][0, i] = time.time() - t0 self._last["sweep"][1, i] = self.voltage_out() self._last["sweep"][2, i] = self.current_out() self._progress = (i + 1) / steps self.output_off() self.log.info("Task finished") self._sweeping = False # Save the sweep data and emit to server filepath = self._save_sweep_data() if device_client is not None: with filepath.open("r") as file: device_client.emit_datafile(file, filepath.name) return str(filepath)
@property def sweep_data(self): """Data from last voltage sweep in format (voltage[V], current[mA]).""" return self._last["sweep"] @base.base_command # Make sure sweep() is not modifying the data at the moment def _save_sweep_data(self): """Save data from last sweep to file. Returns the filepath. """ if self._last["sweep"].size == 0: raise base.CommandError("No data to save") elif np.any(np.isnan(self._last["sweep"])): self.log.warning("Saving incomplete data - sweep interrupted?") self.SAVE_DIR.mkdir(exist_ok=True) filepath = "{}_sweep_{}.txt".format(type(self).__name__, time.asctime()) filepath = self.SAVE_DIR / filepath.replace(" ", "_").replace(":", "-") h = "Time[s] Voltage[V] Current[mA]" np.savetxt(filepath, self._last["sweep"].T, fmt="%.6e", header=h) self.log.info("Saved sweep to '{}'".format(filepath)) return filepath # ==== Methods/commands related to DeviceClient ==== @base.idle_command def _idle_update_all(self): # Call to each of these methods updates self._last self.voltage_req() self.voltage_out() self.current_out() self.is_output_on()
[docs] def update_frontend(self, device_client): self._idle_update_all() vout = self._last["vout"] device_client.emit("value", { "value": vout, "formatted": "{:.0f} V".format(vout), "label": "V out", "min": 0, "max": 5000, "id": "vout" }) iout = self._last["iout"] device_client.emit("value", { "value": iout, "formatted": "{:.2f} mA".format(iout), "label": "I out", "min": 0, "max": 100, "id": "iout" }) output = self._last["output"] device_client.emit("value", { "value": 1 if output else 0, "formatted": "ON" if output else "OFF", "label": "Output", "min": 0, "max": 1, "id": "output" }) device_client.emit("value", { "value": self._progress, "formatted": "", "label": "Task progress", "min": 0, "max": 1, "id": "progress" }) if self._sweeping: data = self._last["sweep"][:, np.isfinite(self._last["sweep"][1])] if data.size > 0: device_client.emit("graph", { "title": "Sweeped voltage", "x": data[0].tolist(), "y": data[1].tolist(), "xlabel": "U_out [V]", "ylabel": "I_out [mA]", "id": "graph_sweep" })