try:
import RPi.GPIO as gpio
import smbus # I2C interface
import spidev # SPI interface
except ImportError:
gpio = None
smbus = None
spidev = None
# I2C bus example https://www.instructables.com/id/Raspberry-Pi-I2C-Python/
#
# SPI https://github.com/doceme/py-spidev
import os
import json
import time
from . import base
# TODO Resolve all NOTEs in this file. (needs real life testing)
# TODO Is there a case, when closing a valve is bad? Or is closing a valve
# always safe? (It would be a useful assumption if true)
# NOTE Which DAC chip is it - MCP4802 (8-bit) or MCP4922 (12-bit) ???
[docs]
class ValveBoard(base.HardwareBase):
"""A board that connects vacuum chambers and gas sources.
Communication:
- RaspberryPi + I2C (``smbus``) and SPI (``spidev``) interfaces
- I2C -> 3x expander, togther they control 35 open/closed valves
and 1 three-way valve
- SPI -> 4x DA convertor, together they control 5 needle valves
.. note::
Expander is a MCP23017 microchip (16-bit == two 8-bit branches A, B)
DA converter is a MCP4802 microchip (8-bit) and four MCP4911 (10-bit)
"""
def __init__(self, testrun=False, **kwargs):
super().__init__(**kwargs)
global gpio
if not (gpio and smbus and spidev) and not testrun:
raise ImportError("RaspberryPi with modules 'RPi.GPIO', 'smbus'"
" and 'spidev' is required")
if testrun:
gpio = _TestGPIO()
self._mapping = self._load_mapping("./configurations/valve_mapping.json")
self._rules = Rules(valves=self._mapping["valve"])
self._i2c = InterfaceI2C(self._rules, self._mapping["valve"], testrun)
self._spi = InterfaceSPI(self._mapping["needle"], testrun)
self._activepath = set()
def _load_mapping(self, filename):
"""Load name-address mapping for valves from json file."""
path = os.path.join(os.path.dirname(__file__), filename)
json_data = json.load(open(path, "r"))
i2c_valve = {key: (int(a[0], 16), a[1], int(a[2])) for key, a
in json_data["i2c_valve"]["data"].items()}
spi_needle = json_data["spi_needle"]["data"]
return {"valve": i2c_valve, "needle": spi_needle}
# ==== Inherited abstract methods ====
def _connect(self):
self._i2c.open()
self._spi.open()
return True
def _disconnect(self):
self._i2c.close()
self._spi.close()
def _is_ready(self):
return True
# ==== Commands ====
@property
def valves(self):
return list(self._mapping["valve"])
@property
def needles(self):
return [str(i) for i in range(len(self._mapping["needle"]))]
[docs]
@base.base_command
def open_valve(self, name):
"""Open one valve.
:param str name: Valve name, see file 'valve_mapping.json'.
"""
if name not in self._mapping["valve"] or name == "gas01_switch":
raise base.CommandError("Invalid valve name '{}'".format(name))
self._i2c.set_valve(name, True)
[docs]
@base.base_command
def close_valve(self, name):
"""Close one valve.
:param str name: Valve name, see file 'valve_mapping.json'.
"""
if name not in self._mapping["valve"] or name == "gas01_switch":
raise base.CommandError("Invalid valve name '{}'".format(name))
self._i2c.set_valve(name, False)
[docs]
@base.base_command
def close_all(self):
"""Close all binary valves and needles. Not including gas01_switch.
Note: This sends close command to all valves regardless of their
assumed state. Thus it can be used as a reliable fail-safe.
"""
# DevNote: Closing one valve at a time is necessary. We cannot set each
# expander to 0x00 because there may be other devices on the
# same branch (pumps?). We only want to close valves.
valves = set(self._mapping["valve"].keys()) - {"gas01_switch"}
for name in valves:
# Not using self.close_valve() to avoid unnecessary name-checking
# and potential collision with some future features.
self._i2c.set_valve(name, False)
for i in range(5):
self.set_needle(i, 0)
if self._activepath:
self._activepath.clear()
[docs]
@base.base_command
def switch_gas01(self, which):
"""Change state of three-way valve that switches between gas 0 and 1.
:param int which: Pick gas, either 0 or 1.
"""
which = int(which)
if which not in [0, 1]:
raise base.CommandError("Use which=0 or 1")
self._i2c.set_valve("gas01_switch", which == 0) # NOTE Check if this is correct or inverted
[docs]
@base.base_command
def set_needle(self, which, percent):
"""Change state of needle valve connected to one of the chambers.
:param int name: Pick chamber number 0, 1, 2, 3, 4.
:param float percent: Percentage in range 0-100, where 0 is closed.
Note: Percent values below 1.0% are rounded to "real" 0 (i.e. without
empirical offset).
"""
which = int(which)
percent = float(percent)
if which not in range(5):
raise base.CommandError("Use which=0, ..., 4")
if not (0.0 <= percent <= 100):
raise base.CommandError("Value percent must be between 0-100")
# Version A: with offset
if percent >= 1.0:
offs = self._mapping["needle"]["needle" + str(which)][2]
value = offs + int(0.01*percent * (4095-offs))
else:
value = 0
# Version B: without offset
# value = int(0.01*float(percent) * 4095)
self._spi.set_needle("needle" + str(which), value)
[docs]
@base.compound_command
def set_needle_for(self, which, percent, duration):
"""Open needle valve for a limited duration ("gas puff").
:param int name: Pick chamber number 0, 1, 2, 3, 4.
:param float percent: Percentage in range 0-100, where 0 is closed.
:param float duration: Time in seconds.
"""
self.set_needle(which, percent)
time.sleep(max(0, float(duration)))
self.set_needle(which, 0)
[docs]
@base.base_command
def get_needle(self, which):
"""Read state of needle valve connected to one of the chambers.
:param str name: Pick chamber number '0', '1', '2', '3', '4'.
:return int percent: Percentage in range 0-100, where 0 is closed.
"""
which = int(which)
if which not in range(5):
raise base.CommandError("Use which=0, ..., 4")
value = self._spi.state_all()["needle" + str(which)]
# Version A: with offset
offs = self._mapping["needle"]["needle" + str(which)][2]
return max(value-offs, 0) / (4095-offs) * 100
# Version B: without offset
# return value / 4095 * 100
# ==== Higher level commands ====
[docs]
@base.base_command
def open_path(self, path):
"""Open a custom path (a custom set of binary valves). Any previous
path will be closed.
:param str path: A comma separated string containing the valves' names.
(Whitespace characters are ignored.)
Note: Active path is saved to memory and can be closed without closing
any other valves using method ``close_path()``.
"""
# NOTE: Reimplement with predefined paths, valve-locking and such.
valves = set(a.strip() for a in path.split(","))
all_valves = self._mapping["valve"].keys()
if "gas01_switch" in valves or not valves.issubset(all_valves):
raise base.CommandError("Path contains invalid name(s) or not"
" separated by ','")
# Close previous path
if self._activepath:
self.close_path()
# This approach (open path one valve at a time and revert back if a
# rule error was raised) produces clear feedback to the user, since
# ValveRuleError carries the name of the valve, that raised it.
try:
for v in valves:
self._i2c.set_valve(v, True)
self._activepath.add(v)
except ValveRuleError:
self.close_path()
raise
[docs]
@base.base_command
def close_path(self):
"""Close active path."""
while self._activepath:
self.close_valve(self._activepath.pop())
# ==== Methods/commands related to DeviceClient ====
[docs]
@base.idle_command
def update_frontend(self, device_client):
for needle in [str(i) for i in range(5)]:
value = self.get_needle(needle) # 0-100%
device_client.emit("value", {
"value": value,
"formatted": "{:.1f} %".format(value),
"label": "Needle " + needle,
"min": 0,
"max": 100,
"id": "needle" + needle
})
all_valves = self._i2c.state_all()
del all_valves["gas01_switch"]
value = sum(all_valves.values())
device_client.emit("value", {
"value": value,
"formatted": "{} / {}".format(value, len(all_valves)),
"label": "Open valves",
"min": 0,
"max": len(all_valves),
"id": "all_open_valves"
})
[docs]
class InterfaceI2C:
"""Internal helper class that controls the I2C interface and enforces
safety rules on the state of its slaves (binary-state valves).
"""
def __init__(self, rules, mapping, testrun=False):
if not smbus and not testrun:
raise ImportError("RaspberryPi with module 'smbus' is required")
self._rules = rules
self._mapping = mapping
self._testrun = testrun
self._expanders = {(addr, br) for addr, br, _ in mapping.values()}
self._bus = None
def _state_expander(self, address, branch):
"""Read current state of OLATA or OLATB register of expander."""
OLAT = {"A": 0x14, "B": 0x15} # OLAT=OutputLatch register
return self._bus.read_byte_data(address, OLAT[branch])
def _set_expander(self, address, branch, state):
"""Set current state of OLATA or OLATB register of expander."""
OLAT = {"A": 0x14, "B": 0x15} # OLAT=OutputLatch register
self._bus.write_byte_data(address, OLAT[branch], state) # NOTE Consider forced time delay in cycle
[docs]
def open(self):
"""Open the bus and execute all necessary initializations."""
if self._testrun:
self._bus = _TestI2C(self._mapping)
else:
self._bus = smbus.SMBus(1) # NOTE: Consider when (if) this can fail (add exception handling)
# Set i/o direction of all pins to output
IODIR = {"A": 0x00, "B": 0x01} # IODIR register
for addr, branch in self._expanders:
self._bus.write_byte_data(addr, IODIR[branch], 0x00) # NOTE Consider forced time delay in cycle
[docs]
def close(self):
"""Close the bus and clean up."""
self._bus.close() # NOTE: Consider when (if) this can fail (add exception handling)
self._bus = None
[docs]
def set_valve(self, name, state):
"""Set state of a single valve. Safety rules are enforced.
:param str name: valve name as given by ``mapping``
:param int bool: open/close or 'which way' in case of 3-way valve
"""
addr, branch, bitmask = self._mapping[name]
# Enforce rules - CommandError is raised if a rule is broken
valves = self.state_all()
valves[name] = state
if state and not self._rules.check(valves=valves):
raise ValveRuleError(
"Cannot open valve '{}' in current state".format(name))
exp = self._state_expander(addr, branch)
if state:
self._set_expander(addr, branch, exp | bitmask)
else:
self._set_expander(addr, branch, exp & (~bitmask))
[docs]
def state_all(self):
"""Read current state of all valves."""
state_exps = {a: self._state_expander(*a) for a in self._expanders}
state_valves = {name: bool(state_exps[(addr, br)] & bitmask)
for name, (addr, br, bitmask) in self._mapping.items()}
return state_valves
[docs]
class InterfaceSPI:
"""Internal helper class that controls the SPI interface (needle valves).
"""
def __init__(self, mapping, testrun=False):
if not (gpio and spidev) and not testrun:
raise ImportError("RaspberryPi with modules 'RPi.GPIO' and"
" 'spidev' is required")
self._mapping = mapping
self._needles = dict.fromkeys(mapping, 0)
self._bus = spidev.SpiDev() if not testrun else _TestSPI()
# self._bus.max_speed_hz = 1000000 # 1Mbps NOTE Is this slowdown necessary?
# self._bus.no_cs = True # NOTE Is this doing anything?
[docs]
def open(self):
"""Open the bus and execute all necessary initializations."""
gpio.setmode(gpio.BOARD)
for cspin in {a[0] for a in self._mapping.values()}:
gpio.setup(cspin, gpio.OUT)
gpio.output(cspin, True)
self._bus.open(bus=0, device=1)
# NOTE Consider a time delay here?
for name in self._needles:
self.set_needle(name, 0) # NOTE All needles close at the start - write this to docs
[docs]
def close(self):
"""Close the bus and clean up."""
self._bus.close()
[docs]
def set_needle(self, name, value):
"""Set state of a single valve. Safety rules are enforced.
:param str name: needle valve name as given by ``mapping``
:param int value: value in range(0, 4096)
"""
cspin, channel = self._mapping[name][0:2]
value = max(min(value, 4095), 0)
# Header bits: <channel><--><gain><powersafe_shutdown>
data = {"A": 0b0011, "B": 0b1011}[channel] << 12
# Data bits: Least significant 0, 2 or 4 bits are ignored in case of
# 12-bit, 10-bit or 8-bit DAC respectivelly. Thus the range
# of values (0, 4095) is the same, only the precision changes.
data |= value
gpio.output(cspin, False)
self._bus.writebytes([(data >> 8) & 0xff, data & 0xff])
gpio.output(cspin, True) # NOTE Maybe add some time delay?
self._needles[name] = value
[docs]
def state_all(self):
"""State of all needle valves according to software state-tracker. It
is not possible to read state directly from the DAC.
"""
return self._needles
[docs]
class ValveRuleError(base.CommandError):
"""Valve could not be opened because it would violate one of the rules on
valve combinations specified in ``class Rules``.
"""
[docs]
class Rules:
"""Internal helper class to define rules about valve combinations. Valves
are identified by their user-friendly names (file 'valve_mapping.json').
The purpose of rules is to avoid unsafe pipe connections. It is assumed
that closing a valve is always safe.
"""
def __init__(self, valves):
# Define rules
self.disjunctive = self.rule_disjunctive()
self.disabled = self.rule_disabled()
# Look for typos in rule definitions
for v in frozenset.union(*self.disjunctive):
assert v in valves, "Unknown name {} in disjunctive rule".format(v)
for v in self.disabled:
assert v in valves, "Unknown name {} in disabled rule".format(v)
[docs]
def rule_disjunctive(self):
"""Returns list of disjunctive rules (sets).
Rule: *Up to ONE valve from each set can be open at the same time.*
"""
disjunctive = []
# 1. Do not mix gasses in each bus
for n in range(5):
disjunctive.append(
{"gas{}_bus{}".format(g, n) for g in ["01", "2", "3", "4"]})
# 2. Do not connect bus to chamber-venting outlet
for n in range(5):
disjunctive.append(
{"bus{}_chamber".format(n), "chamber{}_vent".format(n)})
return tuple(frozenset(s) for s in disjunctive) # Immutable
[docs]
def rule_disabled(self):
"""Returns set of disabled valves.
Rule: *Valves that are elements of this set cannot be in open state.*
"""
disabled = set()
# NOTE Revise this in future
# 1. (TEMPORARY) Do not open gas recovery/venting
for n in range(5):
disabled.add("bus{}_out".format(n))
return frozenset(disabled) # Immutable
[docs]
def check(self, valves=None):
"""Check if a state does not break any rules.
:param dict valves: state of valves ``{str name: bool state}``
"""
valves_comb = {key for key, state in valves.items() if state}
# See rule_disabled()
for elem in self.disabled:
if elem in valves_comb:
return False
# See rule_disjunctive()
for rule in self.disjunctive:
if len(rule.intersection(valves_comb)) > 1:
return False
return True
class _TestI2C:
"""Testing class that can substitute smbus.SMBus class. It writes data to
console instead of I2C bus.
"""
def __init__(self, mapping):
self.REGISTER_DICT = {
0x00: ("IODIR", "A"), 0x01: ("IODIR", "B"),
0x14: ("OLAT", "A"), 0x15: ("OLAT", "B"),
}
expanders = {(addr, br) for addr, br, _ in mapping.values()}
self.regs = {
"IODIR": dict.fromkeys(expanders, 0xff),
"OLAT": dict.fromkeys(expanders, 0x00),
}
def write_byte_data(self, address, register, byte):
if register not in self.REGISTER_DICT:
print("TestI2C - ERROR Unknown register 0x{:02x}".format(register))
return
reg_name, br = self.REGISTER_DICT[register]
if reg_name == "OLAT" and self.regs["IODIR"][(address, br)] != 0x00:
print("TestI2C - ERROR Write before <IODIR> was set to output")
return
self.regs[reg_name][(address, br)] = byte
print("TestI2C - Write addr=0x{:02x}_{} byte=0b{:08b}"
" <{}>".format(address, br, byte, reg_name))
def read_byte_data(self, address, register):
if register not in self.REGISTER_DICT:
print("TestI2C - ERROR Unknown register 0x{:02x}".format(register))
return
reg_name, br = self.REGISTER_DICT[register]
return self.regs[reg_name][(address, br)]
def close(self):
print("TestI2C - Close")
class _TestSPI:
"""Testing class that can substitute spidev.SpiDev class. It writes data to
console instead of SPI bus.
"""
def __init__(self):
self.max_speed_hz = 1000000
self.no_cs = False
def open(self, bus, device):
print("TestSPI - Open")
def close(self):
print("TestSPI - Close")
def writebytes(self, list_bytes):
print("TestSPI - Write", *["0x{:02x}".format(a) for a in list_bytes])
class _TestGPIO:
"""Testing class that can substitute RPi.GPIO module. Outputs to console.
"""
BOARD = 0
BCM = 1
OUT = 0
IN = 1
LOW = 0
HIGH = 1
def __init__(self):
pass
def setmode(self, mode):
self.pinlayout = mode
str_mode = {self.BOARD: "BOARD", self.BCM: "BCM"}[mode]
print("TestGPIO - Layout set to mode {}".format(str_mode))
def setup(self, pin, mode):
str_mode = {self.OUT: "OUT", self.IN: "IN"}[mode]
print("TestGPIO - Pin {} set to mode {}".format(pin, str_mode))
def output(self, pin, value):
print("TestGPIO - Pin {} outputs {}".format(pin, value))