import socket
import time
import pathlib
import re
import numpy as np
from . import base
[docs]
class GaugeController_TPG36x(base.HardwareBase):
"""PFEIFFER TPG 36x, x-Channel Vacuum Gauge Controller
Supported models:
- TPG 361 (1-channel)
- TPG 362 (2-channel)
- TPG 366 (6-channel)
Communication:
- ethernet interface, TCP/IP at port 8000
- menmonics protocol (see device comm. protocol manual)
:param str model_name: one of 'TPG 361', 'TPG 362', 'TPG 366'
:param str ip_address: if specified, connect directly to this address,
takes precedence before the ``serial_number`` parameter
:param str serial_number: if specified, pick device with this s/n
"""
KNOWN_MODELS = { # Mapping: Model name -> Part number
"TPG 361": b'PT G28 040',
"TPG 362": b'PT G28 290',
"TPG 366": b'PT G28 770',
}
def __init__(self, model_name, ip_address=None, serial_number=None, labels=None, **kwargs):
"""Constructor
"""
super().__init__(**kwargs)
if model_name in self.KNOWN_MODELS:
self._model_name = model_name
else:
raise KeyError("Use one of known model names "
+ str(list(self.KNOWN_MODELS.keys())))
self._ip_address = ip_address
self._serial_num = str(serial_number) if serial_number else serial_number
self._socket = None
self._dir_saved_data = pathlib.Path.home() / "saved_data"
self._last = {
"pressure": (0, ),
"gaugeident": ("N/A", ),
"logtime_now": 0,
"logtime_total": 0,
}
self._labels = labels
def _find_device(self):
"""Find a PFEIFFER device in network with a specified part number
and serial number (if provided). Using UDP at port 7000, reverse
engineered from PFEIFFER Ethernet Configuration Tool.
"""
MSG_HELLO = b'\x01\x00\x04\x05' # 01 00 LEN SUM
MSG_GET_PN = b'\x02\x00\x06\x00\x07\x0f' # 02 00 LEN 00 07 SUM
MSG_GET_SN = b'\x02\x00\x06\x00\x06\x0e' # 02 00 LEN 00 06 SUM
def try_to_bind(socket):
try:
socket.bind(("", 7000))
return True
except OSError as e:
self.log.error("Cannot bind to port 7000/UDP. Possible"
" clash with other GaugeController?")
self.log.debug("%s: %s", type(e).__name__, e)
return False
def send(s, msg, ips):
"""Send message to all IPs, port 7000."""
s.settimeout(None)
for ip in ips:
s.sendto(msg, (ip, 7000))
def recvfrom(s, condition, timeout=1):
"""Recieve all messages until a timeout [s] is reached. Using func
condition(msg, addr) pick correct IPs and return them.
"""
ips = set()
s.settimeout(timeout)
try:
while True:
msg, addr = s.recvfrom(64)
if condition(msg, addr):
ips.add(addr[0])
except socket.timeout:
pass
return ips
ips = set()
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
if not try_to_bind(s):
return None
# Send: 3x "hello" to broadcast
send(s, MSG_HELLO, 3 * ["<broadcast>"])
# Recv: wait for correct answers and remember those IPs
ips |= recvfrom(s, lambda m, a: m == MSG_HELLO)
# if len(ips) == 0: # Retry with longer timeout
# ips |= recvfrom(s, lambda m, a: m == MSG_HELLO, timeout=5)
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
if not try_to_bind(s):
return None
# Send: request for p/n (part number) to each ip
send(s, MSG_GET_PN, ips)
# Recv: wait for answers and pick only correct p/n
pn = self.KNOWN_MODELS[self._model_name]
ips = recvfrom(s, lambda m, a: m[5:-1] == pn)
# Find correct s/n if specified
if self._serial_num is not None:
send(s, MSG_GET_SN, ips) # s/n (serial number)
sn = self._serial_num.encode("ascii")
ips = recvfrom(s, lambda m, a: m[5:-1] == sn)
return ips.pop() if len(ips) > 0 else None
# If not, pick one ip and try to read its s/n
elif len(ips) > 0:
ip = ips.pop()
send(s, MSG_GET_SN, [ip])
s.settimeout(1)
try:
self._serial_num = s.recv(64)[5:-1].decode("ascii")
except socket.timeout:
pass
return ip
else:
return None
@staticmethod
def _parse_errorcode(errorcode):
error = {
"0000": None,
"1000": "ERROR (controller error - see display on front panel)",
"0100": "NO HWR (no hardware)",
"0010": "PAR (inadmissible parameter)",
"0001": "SYN (syntax error)",
}
return error[errorcode]
# ==== Inherited abstract methods ====
def _connect(self):
# Try to find a device if no ip was supplied
if self._ip_address is not None:
ip = self._ip_address
else:
ip = self._find_device()
if ip is not None:
self.log.info("Found device with s/n '{}' at {}".format(self._serial_num, ip))
elif self._serial_num is not None:
self.log.error("No devices with s/n '{}' were found".format(self._serial_num))
return False
else:
self.log.error("No devices were found, please specify ip address manually")
return False
# Connect to TCP port 8000
try:
self._socket = socket.create_connection((ip, 8000), timeout=5)
self._socket.settimeout(None) # Set back to blocking after connect
info = self.raw_mnemo("AYT", True).split(",")
self._serial_num = info[2]
self.log.info(info[0] + " (serial no. " + self._serial_num + ")")
return True
except socket.timeout:
self.log.debug("Connect: Address {} is not responding".format(ip))
return False
except OSError as e:
self.log.debug("Connect: Address {} {}".format(ip, str(e)))
return False
def _disconnect(self):
try:
self._socket.close()
except OSError as e:
self.log.debug("Disconnect: socket close {}".format(str(e)))
finally:
self._socket = None
def _is_ready(self):
try:
self.raw_mnemo("AYT")
return True
except (OSError, base.TransmissionError):
return False
# def _safestate(self):
# pass
# def _readerror(self):
# return self._parse_errorcode(self.raw_mnemo("ERR", enquiry=True))
def _defaultstate(self):
self.raw_mnemo("UNI, 2") # Set pressure unit -> Pa
self.raw_mnemo("WDT, 1") # Automatic watchdog control of errors
# self.raw_mnemo("LOC, 1") # Keylock ON
self._idle_update_all()
def _clear_defaultstate(self):
self.raw_mnemo("LOC, 0") # Keylock OFF
# ==== Properties ====
@property
def model_name(self):
return self._model_name
@property
def ip_address(self):
if self._socket is not None:
return self._socket.getpeername()[0]
else:
return self._ip_address
@property
def serial_number(self):
return self._serial_num
# ==== Private commands ====
# ==== Commands ====
[docs]
@base.base_command
def raw_mnemo(self, command, enquiry=False):
"""Send a command via Mnemonics protocol.
:param str command: mnemonics string, optionally with parameters
separated by comma, without <CR><LF>
:param bool enquiry: send <ENQ> in addition to command and return data
received from the device
:return: ascii data received from device or None
"""
ENQ = b'\x05'
ACK = b'\x06'
NAK = b'\x15'
def send_message(msg):
"""Send message."""
try:
self._socket.sendall(msg)
except OSError:
raise base.TransmissionError("Socket send failed")
def recv_message():
r"""Receive a message that is terminated by b'\r\n'. If N messages
were sent to the socket rather than 1, returned bytes can contain
any number of messages in the range 1, 2, ..., N glued together.
"""
buffer = b''
try:
while not buffer.endswith(b'\r\n'):
buffer += self._socket.recv(256)
except OSError:
raise base.TransmissionError("Socket receive failed")
else:
return buffer
if self._status["debug_spam"]:
self.log.debug("Mnemo command '{}', ENQ={}".format(command, enquiry))
send_message(command.encode("ascii") + b'\r')
# Wait for <ACK>/<NAK>. Device may be in a continuous output mode so we
# need to discard any other messages. When the device receives our
# message it stops the continuous mode and sends <ACK>.
msg = b''
while not (ACK in msg or NAK in msg):
msg = recv_message() # This may be more than 1 msg glued together
if self._status["debug_spam"]:
self.log.debug("<ACK>" if ACK in msg else "<NAK>")
# <NAK> - raise exception
if NAK in msg:
# Sending <ENQ> after <NAK> tells the device to send error data
send_message(ENQ)
err = self._parse_errorcode(recv_message()[:-2].decode("ascii"))
if err is not None:
raise base.CommandError(err)
else:
raise base.TransmissionError()
# <ACK> - continue in case of data query
elif enquiry:
send_message(ENQ)
return recv_message()[:-2].decode("ascii")
[docs]
@base.base_command
def pressure(self):
"""Returns list of current pressure values from all 1/2/6 gauges.
"""
# TODO: Do not ignore the gauge status (the odd elements in the list)
data = self.raw_mnemo("PRX", True).split(",")
self._last["pressure"] = tuple(float(a) for a in data[1::2])
return self._last["pressure"]
[docs]
@base.base_command
def gaugeident(self):
"""Returns list identifications of all 1/2/6 gauges.
"""
self._last["gaugeident"] = tuple(self.raw_mnemo("TID", True).split(","))
return self._last["gaugeident"]
[docs]
@base.compound_command
def log_pressure(self, device_client, duration, interval):
"""Log pressure values to file.
:param float duration: total duration [s]
:param float interval: time between measurements [s]
"""
try:
duration = float(duration)
except ValueError: # Try string in format: 12h34m56s or 34m56s or 56s
hms = re.fullmatch(r"(?:(?:(\d+)h)?(\d+)m)?(\d+)s?", duration)
if hms and any(hms.groups()):
hms = (a for a in reversed(hms.groups()) if a is not None)
duration = sum(s * float(val) for s, val in zip((1, 60, 3600), hms))
else:
raise base.CommandError("Argument 'duration' not in format '12h34m56s'")
interval = float(interval)
try:
self._dir_saved_data.mkdir(exist_ok=True)
except FileNotFoundError:
raise base.CommandError("Cannot access '{}'".format(self._dir_saved_data))
filepath = "pressure_{}.txt".format(time.strftime("%Y-%m-%d_%H-%M-%S"))
filepath = self._dir_saved_data / filepath # .replace(" ", "_").replace(":", "-")
try:
with filepath.open("x") as out:
p_text = ("Pressure_{}_{}[Pa]".format(i+1, gi)
for i, gi in enumerate(self.gaugeident()))
out.write("# Time[s] " + " ".join(p_text) + "\n")
except FileNotFoundError:
raise base.CommandError("Cannot access '{}'".format(filepath))
except FileExistsError:
raise base.CommandError("Overwrite error - '{}' exists".format(filepath))
self.log.info("Pressure logging {:.0f}s, dt={:.2f}s".format(duration, interval))
start = time.time()
elapsed = self._last["logtime_now"] = 0
self._last["logtime_total"] = duration
while(elapsed < duration):
try:
with filepath.open("a") as out:
p_text = ("{:.6e}".format(p) for p in self.pressure())
out.write("{:.6e} ".format(elapsed) + " ".join(p_text) + "\n")
except FileNotFoundError:
raise base.CommandError("Cannot access '{}'".format(filepath))
time.sleep(max(interval - (time.time()-start-elapsed), 0))
elapsed = time.time()-start
self._last["logtime_now"] = elapsed
self.log.info("Pressure logging finished!")
self._last["logtime_now"] = self._last["logtime_total"]
with filepath.open("r") as file:
device_client.emit_datafile(file, filepath.name)
[docs]
@base.compound_command
def animate_backlight(self, duration):
duration = float(duration)
start = time.time()
incr = 5
i = int(self.raw_mnemo("BAL", True))
while time.time() - start < duration:
i += incr
if i > 100:
i = 99
incr *= -1
elif i < 0:
i = 1
incr *= -1
self.raw_mnemo("BAL, {}".format(i))
# time.sleep(0.05)
time.sleep(0.02 + 0.12*(np.abs(i)/100)**2)
# ==== Methods/commands related to DeviceClient ====
@base.idle_command
def _idle_update_all(self):
# Call to each of these methods updates self._last
self.pressure()
self.gaugeident()
[docs]
def update_frontend(self, device_client):
self._idle_update_all()
list_p = self._last["pressure"]
list_ident = self._last["gaugeident"]
for i, (p, ident) in enumerate(zip(list_p, list_ident)):
value = np.log(p) if p > 0 else -3
device_client.emit("value", {
"value": value,
"formatted": "{:.2e} Pa".format(p),
"label": self._labels[i] if self._labels else "{}: {}".format(i+1, ident),
"min": -3,
"max": 5,
"id": "gauge{}".format(i+1)
})
# TODO: Use actual min and max settings that are read from gauges
value = self._last["logtime_total"] - self._last["logtime_now"]
if value > 24*3600:
days = int(value/(24*3600))
formatted = "{}d {}h".format(
days, int(value/3600) - days*24,
)
else:
formatted = time.strftime("%H:%M:%S", time.gmtime(value))
device_client.emit("value", {
"value": value,
"formatted": formatted,
"label": "Remaining",
"min": 0,
"max": self._last["logtime_total"],
"id": "logtime",
})
[docs]
class GaugeController_TPG361(GaugeController_TPG36x):
"""PFEIFFER TPG 361, 1-Channel Vacuum Gauge Controller
"""
def __init__(self, ip_address=None, serial_number=None, **kwargs):
"""Constructor
:param str ip_address: if specified, connect directly to this address,
overrides the ``serial_number`` parameter
:param str serial_number: if specified, pick device with this s/n
"""
super().__init__("TPG 361", ip_address, serial_number, **kwargs)
[docs]
class GaugeController_TPG362(GaugeController_TPG36x):
"""PFEIFFER TPG 362, 2-Channel Vacuum Gauge Controller
"""
def __init__(self, ip_address=None, serial_number=None, **kwargs):
"""Constructor
:param str ip_address: if specified, connect directly to this address,
overrides the ``serial_number`` parameter
:param str serial_number: if specified, pick device with this s/n
"""
super().__init__("TPG 362", ip_address, serial_number, **kwargs)
[docs]
class GaugeController_TPG366(GaugeController_TPG36x):
"""PFEIFFER TPG 366, 6-Channel Vacuum Gauge Controller
"""
def __init__(self, ip_address=None, serial_number=None, **kwargs):
"""Constructor
:param str ip_address: if specified, connect directly to this address,
overrides the ``serial_number`` parameter
:param str serial_number: if specified, pick device with this s/n
"""
super().__init__("TPG 366", ip_address, serial_number, **kwargs)