Source code for device.valve_board

try:
    import RPi.GPIO as gpio
    import smbus  # I2C interface
    import spidev  # SPI interface
except ImportError:
    gpio = None
    smbus = None
    spidev = None
# I2C bus example https://www.instructables.com/id/Raspberry-Pi-I2C-Python/
#
# SPI https://github.com/doceme/py-spidev

import os
import json
import time

from . import base


# TODO Resolve all NOTEs in this file. (needs real life testing)
# TODO Is there a case, when closing a valve is bad? Or is closing a valve
#      always safe? (It would be a useful assumption if true)

# NOTE Which DAC chip is it - MCP4802 (8-bit) or MCP4922 (12-bit) ???

[docs] class ValveBoard(base.HardwareBase): """A board that connects vacuum chambers and gas sources. Communication: - RaspberryPi + I2C (``smbus``) and SPI (``spidev``) interfaces - I2C -> 3x expander, togther they control 35 open/closed valves and 1 three-way valve - SPI -> 4x DA convertor, together they control 5 needle valves .. note:: Expander is a MCP23017 microchip (16-bit == two 8-bit branches A, B) DA converter is a MCP4802 microchip (8-bit) and four MCP4911 (10-bit) """ def __init__(self, testrun=False, **kwargs): super().__init__(**kwargs) global gpio if not (gpio and smbus and spidev) and not testrun: raise ImportError("RaspberryPi with modules 'RPi.GPIO', 'smbus'" " and 'spidev' is required") if testrun: gpio = _TestGPIO() self._mapping = self._load_mapping("./configurations/valve_mapping.json") self._rules = Rules(valves=self._mapping["valve"]) self._i2c = InterfaceI2C(self._rules, self._mapping["valve"], testrun) self._spi = InterfaceSPI(self._mapping["needle"], testrun) self._activepath = set() def _load_mapping(self, filename): """Load name-address mapping for valves from json file.""" path = os.path.join(os.path.dirname(__file__), filename) json_data = json.load(open(path, "r")) i2c_valve = {key: (int(a[0], 16), a[1], int(a[2])) for key, a in json_data["i2c_valve"]["data"].items()} spi_needle = json_data["spi_needle"]["data"] return {"valve": i2c_valve, "needle": spi_needle} # ==== Inherited abstract methods ==== def _connect(self): self._i2c.open() self._spi.open() return True def _disconnect(self): self._i2c.close() self._spi.close() def _is_ready(self): return True # ==== Commands ==== @property def valves(self): return list(self._mapping["valve"]) @property def needles(self): return [str(i) for i in range(len(self._mapping["needle"]))]
[docs] @base.base_command def open_valve(self, name): """Open one valve. :param str name: Valve name, see file 'valve_mapping.json'. """ if name not in self._mapping["valve"] or name == "gas01_switch": raise base.CommandError("Invalid valve name '{}'".format(name)) self._i2c.set_valve(name, True)
[docs] @base.base_command def close_valve(self, name): """Close one valve. :param str name: Valve name, see file 'valve_mapping.json'. """ if name not in self._mapping["valve"] or name == "gas01_switch": raise base.CommandError("Invalid valve name '{}'".format(name)) self._i2c.set_valve(name, False)
[docs] @base.base_command def close_all(self): """Close all binary valves and needles. Not including gas01_switch. Note: This sends close command to all valves regardless of their assumed state. Thus it can be used as a reliable fail-safe. """ # DevNote: Closing one valve at a time is necessary. We cannot set each # expander to 0x00 because there may be other devices on the # same branch (pumps?). We only want to close valves. valves = set(self._mapping["valve"].keys()) - {"gas01_switch"} for name in valves: # Not using self.close_valve() to avoid unnecessary name-checking # and potential collision with some future features. self._i2c.set_valve(name, False) for i in range(5): self.set_needle(i, 0) if self._activepath: self._activepath.clear()
[docs] @base.base_command def switch_gas01(self, which): """Change state of three-way valve that switches between gas 0 and 1. :param int which: Pick gas, either 0 or 1. """ which = int(which) if which not in [0, 1]: raise base.CommandError("Use which=0 or 1") self._i2c.set_valve("gas01_switch", which == 0) # NOTE Check if this is correct or inverted
[docs] @base.base_command def set_needle(self, which, percent): """Change state of needle valve connected to one of the chambers. :param int name: Pick chamber number 0, 1, 2, 3, 4. :param float percent: Percentage in range 0-100, where 0 is closed. Note: Percent values below 1.0% are rounded to "real" 0 (i.e. without empirical offset). """ which = int(which) percent = float(percent) if which not in range(5): raise base.CommandError("Use which=0, ..., 4") if not (0.0 <= percent <= 100): raise base.CommandError("Value percent must be between 0-100") # Version A: with offset if percent >= 1.0: offs = self._mapping["needle"]["needle" + str(which)][2] value = offs + int(0.01*percent * (4095-offs)) else: value = 0 # Version B: without offset # value = int(0.01*float(percent) * 4095) self._spi.set_needle("needle" + str(which), value)
[docs] @base.compound_command def set_needle_for(self, which, percent, duration): """Open needle valve for a limited duration ("gas puff"). :param int name: Pick chamber number 0, 1, 2, 3, 4. :param float percent: Percentage in range 0-100, where 0 is closed. :param float duration: Time in seconds. """ self.set_needle(which, percent) time.sleep(max(0, float(duration))) self.set_needle(which, 0)
[docs] @base.base_command def get_needle(self, which): """Read state of needle valve connected to one of the chambers. :param str name: Pick chamber number '0', '1', '2', '3', '4'. :return int percent: Percentage in range 0-100, where 0 is closed. """ which = int(which) if which not in range(5): raise base.CommandError("Use which=0, ..., 4") value = self._spi.state_all()["needle" + str(which)] # Version A: with offset offs = self._mapping["needle"]["needle" + str(which)][2] return max(value-offs, 0) / (4095-offs) * 100
# Version B: without offset # return value / 4095 * 100 # ==== Higher level commands ====
[docs] @base.base_command def open_path(self, path): """Open a custom path (a custom set of binary valves). Any previous path will be closed. :param str path: A comma separated string containing the valves' names. (Whitespace characters are ignored.) Note: Active path is saved to memory and can be closed without closing any other valves using method ``close_path()``. """ # NOTE: Reimplement with predefined paths, valve-locking and such. valves = set(a.strip() for a in path.split(",")) all_valves = self._mapping["valve"].keys() if "gas01_switch" in valves or not valves.issubset(all_valves): raise base.CommandError("Path contains invalid name(s) or not" " separated by ','") # Close previous path if self._activepath: self.close_path() # This approach (open path one valve at a time and revert back if a # rule error was raised) produces clear feedback to the user, since # ValveRuleError carries the name of the valve, that raised it. try: for v in valves: self._i2c.set_valve(v, True) self._activepath.add(v) except ValveRuleError: self.close_path() raise
[docs] @base.base_command def close_path(self): """Close active path.""" while self._activepath: self.close_valve(self._activepath.pop())
# ==== Methods/commands related to DeviceClient ====
[docs] @base.idle_command def update_frontend(self, device_client): for needle in [str(i) for i in range(5)]: value = self.get_needle(needle) # 0-100% device_client.emit("value", { "value": value, "formatted": "{:.1f} %".format(value), "label": "Needle " + needle, "min": 0, "max": 100, "id": "needle" + needle }) all_valves = self._i2c.state_all() del all_valves["gas01_switch"] value = sum(all_valves.values()) device_client.emit("value", { "value": value, "formatted": "{} / {}".format(value, len(all_valves)), "label": "Open valves", "min": 0, "max": len(all_valves), "id": "all_open_valves" })
[docs] class InterfaceI2C: """Internal helper class that controls the I2C interface and enforces safety rules on the state of its slaves (binary-state valves). """ def __init__(self, rules, mapping, testrun=False): if not smbus and not testrun: raise ImportError("RaspberryPi with module 'smbus' is required") self._rules = rules self._mapping = mapping self._testrun = testrun self._expanders = {(addr, br) for addr, br, _ in mapping.values()} self._bus = None def _state_expander(self, address, branch): """Read current state of OLATA or OLATB register of expander.""" OLAT = {"A": 0x14, "B": 0x15} # OLAT=OutputLatch register return self._bus.read_byte_data(address, OLAT[branch]) def _set_expander(self, address, branch, state): """Set current state of OLATA or OLATB register of expander.""" OLAT = {"A": 0x14, "B": 0x15} # OLAT=OutputLatch register self._bus.write_byte_data(address, OLAT[branch], state) # NOTE Consider forced time delay in cycle
[docs] def open(self): """Open the bus and execute all necessary initializations.""" if self._testrun: self._bus = _TestI2C(self._mapping) else: self._bus = smbus.SMBus(1) # NOTE: Consider when (if) this can fail (add exception handling) # Set i/o direction of all pins to output IODIR = {"A": 0x00, "B": 0x01} # IODIR register for addr, branch in self._expanders: self._bus.write_byte_data(addr, IODIR[branch], 0x00) # NOTE Consider forced time delay in cycle
[docs] def close(self): """Close the bus and clean up.""" self._bus.close() # NOTE: Consider when (if) this can fail (add exception handling) self._bus = None
[docs] def set_valve(self, name, state): """Set state of a single valve. Safety rules are enforced. :param str name: valve name as given by ``mapping`` :param int bool: open/close or 'which way' in case of 3-way valve """ addr, branch, bitmask = self._mapping[name] # Enforce rules - CommandError is raised if a rule is broken valves = self.state_all() valves[name] = state if state and not self._rules.check(valves=valves): raise ValveRuleError( "Cannot open valve '{}' in current state".format(name)) exp = self._state_expander(addr, branch) if state: self._set_expander(addr, branch, exp | bitmask) else: self._set_expander(addr, branch, exp & (~bitmask))
[docs] def state_all(self): """Read current state of all valves.""" state_exps = {a: self._state_expander(*a) for a in self._expanders} state_valves = {name: bool(state_exps[(addr, br)] & bitmask) for name, (addr, br, bitmask) in self._mapping.items()} return state_valves
[docs] class InterfaceSPI: """Internal helper class that controls the SPI interface (needle valves). """ def __init__(self, mapping, testrun=False): if not (gpio and spidev) and not testrun: raise ImportError("RaspberryPi with modules 'RPi.GPIO' and" " 'spidev' is required") self._mapping = mapping self._needles = dict.fromkeys(mapping, 0) self._bus = spidev.SpiDev() if not testrun else _TestSPI() # self._bus.max_speed_hz = 1000000 # 1Mbps NOTE Is this slowdown necessary? # self._bus.no_cs = True # NOTE Is this doing anything?
[docs] def open(self): """Open the bus and execute all necessary initializations.""" gpio.setmode(gpio.BOARD) for cspin in {a[0] for a in self._mapping.values()}: gpio.setup(cspin, gpio.OUT) gpio.output(cspin, True) self._bus.open(bus=0, device=1) # NOTE Consider a time delay here? for name in self._needles: self.set_needle(name, 0) # NOTE All needles close at the start - write this to docs
[docs] def close(self): """Close the bus and clean up.""" self._bus.close()
[docs] def set_needle(self, name, value): """Set state of a single valve. Safety rules are enforced. :param str name: needle valve name as given by ``mapping`` :param int value: value in range(0, 4096) """ cspin, channel = self._mapping[name][0:2] value = max(min(value, 4095), 0) # Header bits: <channel><--><gain><powersafe_shutdown> data = {"A": 0b0011, "B": 0b1011}[channel] << 12 # Data bits: Least significant 0, 2 or 4 bits are ignored in case of # 12-bit, 10-bit or 8-bit DAC respectivelly. Thus the range # of values (0, 4095) is the same, only the precision changes. data |= value gpio.output(cspin, False) self._bus.writebytes([(data >> 8) & 0xff, data & 0xff]) gpio.output(cspin, True) # NOTE Maybe add some time delay? self._needles[name] = value
[docs] def state_all(self): """State of all needle valves according to software state-tracker. It is not possible to read state directly from the DAC. """ return self._needles
[docs] class ValveRuleError(base.CommandError): """Valve could not be opened because it would violate one of the rules on valve combinations specified in ``class Rules``. """
[docs] class Rules: """Internal helper class to define rules about valve combinations. Valves are identified by their user-friendly names (file 'valve_mapping.json'). The purpose of rules is to avoid unsafe pipe connections. It is assumed that closing a valve is always safe. """ def __init__(self, valves): # Define rules self.disjunctive = self.rule_disjunctive() self.disabled = self.rule_disabled() # Look for typos in rule definitions for v in frozenset.union(*self.disjunctive): assert v in valves, "Unknown name {} in disjunctive rule".format(v) for v in self.disabled: assert v in valves, "Unknown name {} in disabled rule".format(v)
[docs] def rule_disjunctive(self): """Returns list of disjunctive rules (sets). Rule: *Up to ONE valve from each set can be open at the same time.* """ disjunctive = [] # 1. Do not mix gasses in each bus for n in range(5): disjunctive.append( {"gas{}_bus{}".format(g, n) for g in ["01", "2", "3", "4"]}) # 2. Do not connect bus to chamber-venting outlet for n in range(5): disjunctive.append( {"bus{}_chamber".format(n), "chamber{}_vent".format(n)}) return tuple(frozenset(s) for s in disjunctive) # Immutable
[docs] def rule_disabled(self): """Returns set of disabled valves. Rule: *Valves that are elements of this set cannot be in open state.* """ disabled = set() # NOTE Revise this in future # 1. (TEMPORARY) Do not open gas recovery/venting for n in range(5): disabled.add("bus{}_out".format(n)) return frozenset(disabled) # Immutable
[docs] def check(self, valves=None): """Check if a state does not break any rules. :param dict valves: state of valves ``{str name: bool state}`` """ valves_comb = {key for key, state in valves.items() if state} # See rule_disabled() for elem in self.disabled: if elem in valves_comb: return False # See rule_disjunctive() for rule in self.disjunctive: if len(rule.intersection(valves_comb)) > 1: return False return True
class _TestI2C: """Testing class that can substitute smbus.SMBus class. It writes data to console instead of I2C bus. """ def __init__(self, mapping): self.REGISTER_DICT = { 0x00: ("IODIR", "A"), 0x01: ("IODIR", "B"), 0x14: ("OLAT", "A"), 0x15: ("OLAT", "B"), } expanders = {(addr, br) for addr, br, _ in mapping.values()} self.regs = { "IODIR": dict.fromkeys(expanders, 0xff), "OLAT": dict.fromkeys(expanders, 0x00), } def write_byte_data(self, address, register, byte): if register not in self.REGISTER_DICT: print("TestI2C - ERROR Unknown register 0x{:02x}".format(register)) return reg_name, br = self.REGISTER_DICT[register] if reg_name == "OLAT" and self.regs["IODIR"][(address, br)] != 0x00: print("TestI2C - ERROR Write before <IODIR> was set to output") return self.regs[reg_name][(address, br)] = byte print("TestI2C - Write addr=0x{:02x}_{} byte=0b{:08b}" " <{}>".format(address, br, byte, reg_name)) def read_byte_data(self, address, register): if register not in self.REGISTER_DICT: print("TestI2C - ERROR Unknown register 0x{:02x}".format(register)) return reg_name, br = self.REGISTER_DICT[register] return self.regs[reg_name][(address, br)] def close(self): print("TestI2C - Close") class _TestSPI: """Testing class that can substitute spidev.SpiDev class. It writes data to console instead of SPI bus. """ def __init__(self): self.max_speed_hz = 1000000 self.no_cs = False def open(self, bus, device): print("TestSPI - Open") def close(self): print("TestSPI - Close") def writebytes(self, list_bytes): print("TestSPI - Write", *["0x{:02x}".format(a) for a in list_bytes]) class _TestGPIO: """Testing class that can substitute RPi.GPIO module. Outputs to console. """ BOARD = 0 BCM = 1 OUT = 0 IN = 1 LOW = 0 HIGH = 1 def __init__(self): pass def setmode(self, mode): self.pinlayout = mode str_mode = {self.BOARD: "BOARD", self.BCM: "BCM"}[mode] print("TestGPIO - Layout set to mode {}".format(str_mode)) def setup(self, pin, mode): str_mode = {self.OUT: "OUT", self.IN: "IN"}[mode] print("TestGPIO - Pin {} set to mode {}".format(pin, str_mode)) def output(self, pin, value): print("TestGPIO - Pin {} outputs {}".format(pin, value))