Source code for device.gauge_controller_tpg36x

import socket
import time
import pathlib
import re
import numpy as np

from . import base


[docs] class GaugeController_TPG36x(base.HardwareBase): """PFEIFFER TPG 36x, x-Channel Vacuum Gauge Controller Supported models: - TPG 361 (1-channel) - TPG 362 (2-channel) - TPG 366 (6-channel) Communication: - ethernet interface, TCP/IP at port 8000 - menmonics protocol (see device comm. protocol manual) :param str model_name: one of 'TPG 361', 'TPG 362', 'TPG 366' :param str ip_address: if specified, connect directly to this address, takes precedence before the ``serial_number`` parameter :param str serial_number: if specified, pick device with this s/n """ KNOWN_MODELS = { # Mapping: Model name -> Part number "TPG 361": b'PT G28 040', "TPG 362": b'PT G28 290', "TPG 366": b'PT G28 770', } def __init__(self, model_name, ip_address=None, serial_number=None, labels=None, **kwargs): """Constructor """ super().__init__(**kwargs) if model_name in self.KNOWN_MODELS: self._model_name = model_name else: raise KeyError("Use one of known model names " + str(list(self.KNOWN_MODELS.keys()))) self._ip_address = ip_address self._serial_num = str(serial_number) if serial_number else serial_number self._socket = None self._dir_saved_data = pathlib.Path.home() / "saved_data" self._last = { "pressure": (0, ), "gaugeident": ("N/A", ), "logtime_now": 0, "logtime_total": 0, } self._labels = labels def _find_device(self): """Find a PFEIFFER device in network with a specified part number and serial number (if provided). Using UDP at port 7000, reverse engineered from PFEIFFER Ethernet Configuration Tool. """ MSG_HELLO = b'\x01\x00\x04\x05' # 01 00 LEN SUM MSG_GET_PN = b'\x02\x00\x06\x00\x07\x0f' # 02 00 LEN 00 07 SUM MSG_GET_SN = b'\x02\x00\x06\x00\x06\x0e' # 02 00 LEN 00 06 SUM def try_to_bind(socket): try: socket.bind(("", 7000)) return True except OSError as e: self.log.error("Cannot bind to port 7000/UDP. Possible" " clash with other GaugeController?") self.log.debug("%s: %s", type(e).__name__, e) return False def send(s, msg, ips): """Send message to all IPs, port 7000.""" s.settimeout(None) for ip in ips: s.sendto(msg, (ip, 7000)) def recvfrom(s, condition, timeout=1): """Recieve all messages until a timeout [s] is reached. Using func condition(msg, addr) pick correct IPs and return them. """ ips = set() s.settimeout(timeout) try: while True: msg, addr = s.recvfrom(64) if condition(msg, addr): ips.add(addr[0]) except socket.timeout: pass return ips ips = set() with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) if not try_to_bind(s): return None # Send: 3x "hello" to broadcast send(s, MSG_HELLO, 3 * ["<broadcast>"]) # Recv: wait for correct answers and remember those IPs ips |= recvfrom(s, lambda m, a: m == MSG_HELLO) # if len(ips) == 0: # Retry with longer timeout # ips |= recvfrom(s, lambda m, a: m == MSG_HELLO, timeout=5) with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: if not try_to_bind(s): return None # Send: request for p/n (part number) to each ip send(s, MSG_GET_PN, ips) # Recv: wait for answers and pick only correct p/n pn = self.KNOWN_MODELS[self._model_name] ips = recvfrom(s, lambda m, a: m[5:-1] == pn) # Find correct s/n if specified if self._serial_num is not None: send(s, MSG_GET_SN, ips) # s/n (serial number) sn = self._serial_num.encode("ascii") ips = recvfrom(s, lambda m, a: m[5:-1] == sn) return ips.pop() if len(ips) > 0 else None # If not, pick one ip and try to read its s/n elif len(ips) > 0: ip = ips.pop() send(s, MSG_GET_SN, [ip]) s.settimeout(1) try: self._serial_num = s.recv(64)[5:-1].decode("ascii") except socket.timeout: pass return ip else: return None @staticmethod def _parse_errorcode(errorcode): error = { "0000": None, "1000": "ERROR (controller error - see display on front panel)", "0100": "NO HWR (no hardware)", "0010": "PAR (inadmissible parameter)", "0001": "SYN (syntax error)", } return error[errorcode] # ==== Inherited abstract methods ==== def _connect(self): # Try to find a device if no ip was supplied if self._ip_address is not None: ip = self._ip_address else: ip = self._find_device() if ip is not None: self.log.info("Found device with s/n '{}' at {}".format(self._serial_num, ip)) elif self._serial_num is not None: self.log.error("No devices with s/n '{}' were found".format(self._serial_num)) return False else: self.log.error("No devices were found, please specify ip address manually") return False # Connect to TCP port 8000 try: self._socket = socket.create_connection((ip, 8000), timeout=5) self._socket.settimeout(None) # Set back to blocking after connect info = self.raw_mnemo("AYT", True).split(",") self._serial_num = info[2] self.log.info(info[0] + " (serial no. " + self._serial_num + ")") return True except socket.timeout: self.log.debug("Connect: Address {} is not responding".format(ip)) return False except OSError as e: self.log.debug("Connect: Address {} {}".format(ip, str(e))) return False def _disconnect(self): try: self._socket.close() except OSError as e: self.log.debug("Disconnect: socket close {}".format(str(e))) finally: self._socket = None def _is_ready(self): try: self.raw_mnemo("AYT") return True except (OSError, base.TransmissionError): return False # def _safestate(self): # pass # def _readerror(self): # return self._parse_errorcode(self.raw_mnemo("ERR", enquiry=True)) def _defaultstate(self): self.raw_mnemo("UNI, 2") # Set pressure unit -> Pa self.raw_mnemo("WDT, 1") # Automatic watchdog control of errors # self.raw_mnemo("LOC, 1") # Keylock ON self._idle_update_all() def _clear_defaultstate(self): self.raw_mnemo("LOC, 0") # Keylock OFF # ==== Properties ==== @property def model_name(self): return self._model_name @property def ip_address(self): if self._socket is not None: return self._socket.getpeername()[0] else: return self._ip_address @property def serial_number(self): return self._serial_num # ==== Private commands ==== # ==== Commands ====
[docs] @base.base_command def raw_mnemo(self, command, enquiry=False): """Send a command via Mnemonics protocol. :param str command: mnemonics string, optionally with parameters separated by comma, without <CR><LF> :param bool enquiry: send <ENQ> in addition to command and return data received from the device :return: ascii data received from device or None """ ENQ = b'\x05' ACK = b'\x06' NAK = b'\x15' def send_message(msg): """Send message.""" try: self._socket.sendall(msg) except OSError: raise base.TransmissionError("Socket send failed") def recv_message(): r"""Receive a message that is terminated by b'\r\n'. If N messages were sent to the socket rather than 1, returned bytes can contain any number of messages in the range 1, 2, ..., N glued together. """ buffer = b'' try: while not buffer.endswith(b'\r\n'): buffer += self._socket.recv(256) except OSError: raise base.TransmissionError("Socket receive failed") else: return buffer if self._status["debug_spam"]: self.log.debug("Mnemo command '{}', ENQ={}".format(command, enquiry)) send_message(command.encode("ascii") + b'\r') # Wait for <ACK>/<NAK>. Device may be in a continuous output mode so we # need to discard any other messages. When the device receives our # message it stops the continuous mode and sends <ACK>. msg = b'' while not (ACK in msg or NAK in msg): msg = recv_message() # This may be more than 1 msg glued together if self._status["debug_spam"]: self.log.debug("<ACK>" if ACK in msg else "<NAK>") # <NAK> - raise exception if NAK in msg: # Sending <ENQ> after <NAK> tells the device to send error data send_message(ENQ) err = self._parse_errorcode(recv_message()[:-2].decode("ascii")) if err is not None: raise base.CommandError(err) else: raise base.TransmissionError() # <ACK> - continue in case of data query elif enquiry: send_message(ENQ) return recv_message()[:-2].decode("ascii")
[docs] @base.base_command def pressure(self): """Returns list of current pressure values from all 1/2/6 gauges. """ # TODO: Do not ignore the gauge status (the odd elements in the list) data = self.raw_mnemo("PRX", True).split(",") self._last["pressure"] = tuple(float(a) for a in data[1::2]) return self._last["pressure"]
[docs] @base.base_command def gaugeident(self): """Returns list identifications of all 1/2/6 gauges. """ self._last["gaugeident"] = tuple(self.raw_mnemo("TID", True).split(",")) return self._last["gaugeident"]
[docs] @base.compound_command def log_pressure(self, device_client, duration, interval): """Log pressure values to file. :param float duration: total duration [s] :param float interval: time between measurements [s] """ try: duration = float(duration) except ValueError: # Try string in format: 12h34m56s or 34m56s or 56s hms = re.fullmatch(r"(?:(?:(\d+)h)?(\d+)m)?(\d+)s?", duration) if hms and any(hms.groups()): hms = (a for a in reversed(hms.groups()) if a is not None) duration = sum(s * float(val) for s, val in zip((1, 60, 3600), hms)) else: raise base.CommandError("Argument 'duration' not in format '12h34m56s'") interval = float(interval) try: self._dir_saved_data.mkdir(exist_ok=True) except FileNotFoundError: raise base.CommandError("Cannot access '{}'".format(self._dir_saved_data)) filepath = "pressure_{}.txt".format(time.strftime("%Y-%m-%d_%H-%M-%S")) filepath = self._dir_saved_data / filepath # .replace(" ", "_").replace(":", "-") try: with filepath.open("x") as out: p_text = ("Pressure_{}_{}[Pa]".format(i+1, gi) for i, gi in enumerate(self.gaugeident())) out.write("# Time[s] " + " ".join(p_text) + "\n") except FileNotFoundError: raise base.CommandError("Cannot access '{}'".format(filepath)) except FileExistsError: raise base.CommandError("Overwrite error - '{}' exists".format(filepath)) self.log.info("Pressure logging {:.0f}s, dt={:.2f}s".format(duration, interval)) start = time.time() elapsed = self._last["logtime_now"] = 0 self._last["logtime_total"] = duration while(elapsed < duration): try: with filepath.open("a") as out: p_text = ("{:.6e}".format(p) for p in self.pressure()) out.write("{:.6e} ".format(elapsed) + " ".join(p_text) + "\n") except FileNotFoundError: raise base.CommandError("Cannot access '{}'".format(filepath)) time.sleep(max(interval - (time.time()-start-elapsed), 0)) elapsed = time.time()-start self._last["logtime_now"] = elapsed self.log.info("Pressure logging finished!") self._last["logtime_now"] = self._last["logtime_total"] with filepath.open("r") as file: device_client.emit_datafile(file, filepath.name)
[docs] @base.compound_command def animate_backlight(self, duration): duration = float(duration) start = time.time() incr = 5 i = int(self.raw_mnemo("BAL", True)) while time.time() - start < duration: i += incr if i > 100: i = 99 incr *= -1 elif i < 0: i = 1 incr *= -1 self.raw_mnemo("BAL, {}".format(i)) # time.sleep(0.05) time.sleep(0.02 + 0.12*(np.abs(i)/100)**2)
# ==== Methods/commands related to DeviceClient ==== @base.idle_command def _idle_update_all(self): # Call to each of these methods updates self._last self.pressure() self.gaugeident()
[docs] def update_frontend(self, device_client): self._idle_update_all() list_p = self._last["pressure"] list_ident = self._last["gaugeident"] for i, (p, ident) in enumerate(zip(list_p, list_ident)): value = np.log(p) if p > 0 else -3 device_client.emit("value", { "value": value, "formatted": "{:.2e} Pa".format(p), "label": self._labels[i] if self._labels else "{}: {}".format(i+1, ident), "min": -3, "max": 5, "id": "gauge{}".format(i+1) }) # TODO: Use actual min and max settings that are read from gauges value = self._last["logtime_total"] - self._last["logtime_now"] if value > 24*3600: days = int(value/(24*3600)) formatted = "{}d {}h".format( days, int(value/3600) - days*24, ) else: formatted = time.strftime("%H:%M:%S", time.gmtime(value)) device_client.emit("value", { "value": value, "formatted": formatted, "label": "Remaining", "min": 0, "max": self._last["logtime_total"], "id": "logtime", })
[docs] class GaugeController_TPG361(GaugeController_TPG36x): """PFEIFFER TPG 361, 1-Channel Vacuum Gauge Controller """ def __init__(self, ip_address=None, serial_number=None, **kwargs): """Constructor :param str ip_address: if specified, connect directly to this address, overrides the ``serial_number`` parameter :param str serial_number: if specified, pick device with this s/n """ super().__init__("TPG 361", ip_address, serial_number, **kwargs)
[docs] class GaugeController_TPG362(GaugeController_TPG36x): """PFEIFFER TPG 362, 2-Channel Vacuum Gauge Controller """ def __init__(self, ip_address=None, serial_number=None, **kwargs): """Constructor :param str ip_address: if specified, connect directly to this address, overrides the ``serial_number`` parameter :param str serial_number: if specified, pick device with this s/n """ super().__init__("TPG 362", ip_address, serial_number, **kwargs)
[docs] class GaugeController_TPG366(GaugeController_TPG36x): """PFEIFFER TPG 366, 6-Channel Vacuum Gauge Controller """ def __init__(self, ip_address=None, serial_number=None, **kwargs): """Constructor :param str ip_address: if specified, connect directly to this address, overrides the ``serial_number`` parameter :param str serial_number: if specified, pick device with this s/n """ super().__init__("TPG 366", ip_address, serial_number, **kwargs)