"""Serial port wrapper for Next Generation class pumps.
The code in this file establishes an OS-appropriate serial port and provides
an interface for communicating with the pumps.
"""
from __future__ import annotations
from logging import Logger, getLogger
from time import sleep
from typing import TYPE_CHECKING, Union
from serial import SerialException, serial_for_url
from serial.serialutil import EIGHTBITS, PARITY_NONE, STOPBITS_ONE, SerialBase
from py_hplc.pump_error import PumpError
if TYPE_CHECKING:
from logging import Logger
[docs]class NextGenPumpBase:
"""Serial port wrapper for MX-class Teledyne pumps."""
def __init__(self, device: Union[SerialBase, str], logger: Logger = None) -> None:
if isinstance(device, str):
# fetch a platform-appropriate serial interface
self.serial = serial_for_url(
device,
baudrate=9600,
bytesize=EIGHTBITS,
do_not_open=True,
parity=PARITY_NONE,
stopbits=STOPBITS_ONE,
timeout=0.1, # 100 ms
) # we have to manually open this later
elif isinstance(device, SerialBase):
self.serial = device
else:
raise ValueError("Must init using a Serial object or valid port string")
# you'll have to reach in and add handlers yourself from the calling code
if logger is None: # append to the root logger
self.logger = getLogger(f"{getLogger().name}.{device}")
elif isinstance(logger, Logger): # append to the passed logger
self.logger = getLogger(f"{logger.name}.{device}")
# persistent identifying attributes
self.max_flowrate: float = None
self.max_pressure: float = None
self.version: str = None
self.pressure_units: str = None
self.head: str = None
# other -- for converting user args on the fly
# 0.00 mL vs 0.000 mL; could rep. as 2 || 3?
self.flowrate_factor: int = None # used as 10 ** flowrate_factor
# other configuration logic here
if not self.is_open:
self.open() # open the serial connection
self.identify() # populate attributes, takes about 0.16 s on avg
[docs] def open(self) -> None:
"""Opens the serial port associated with the pump.
Raises: SerialException: An exception describing what went wrong. In this case,
we failed to open the serial port.
"""
try:
self.serial.open()
self.logger.info("Serial port connected")
except SerialException as err:
self.logger.critical("Could not open a serial connection")
self.logger.exception(err)
raise
[docs] def identify(self) -> None:
"""Gets persistent pump properties."""
# general properties -----------------------------------------------------------
# pump head
response = self.command("pi")
if "OK," in response:
self.head = response.split(",")[4]
# max flowrate
response = self.command("mf")
if "OK,MF:" in response: # expect OK,MF:<max_flow>/
self.max_flowrate = float(response.split(":")[1][:-1])
# volumetric resolution - used for setting flowrates later
# expect OK,<flow>,<UPL>,<LPL>,<p_units>,0,<R/S>,0/
response = self.command("cs")
precision = len(response.split(",")[1].split(".")[1])
if precision == 2: # eg. "5.00"
self.flowrate_factor = -5 # FI takes microliters/min * 10 as ints
else: # eg. "5.000"
self.flowrate_factor = -6 # FI takes microliters/min as ints
# version
response = self.command("id")
if "OK," in response: # expect OK,<ID> Version <ver>/
self.version = response.split(",")[1][:-1].strip()
# for pumps that have a pressure sensor ----------------------------------------
# pressure units
response = self.command("pu")
if "OK," in response: # expect "OK,<p_units>/"
self.pressure_units = response.split(",")[1][:-1]
# max pressure
response = self.command("mp")
if "OK,MP:" in response: # expect "OK,MP:<max_pressure>/"
self.max_pressure = float(response.split(":")[1][:-1])
[docs] def command(self, command: str) -> str:
"""Sends the passed string to the pump as bytes.
Args:
command (str): The message to be sent as bytes
Raises:
PumpError: An exception describing what went wrong. In this case, the pump
reponded with an error code.
Returns:
dict[str, Any]: A dictionary containing at least a "response" key
with the pump's response
"""
response = self.write(command)
if "Er/" in response:
raise PumpError(
command=command,
response=response,
message=(
f"The pump threw an error '{response}'"
f"in response to a command: '{command}'"
),
port=self.serial.name,
)
return response # we parse this later
[docs] def write(self, msg: str, delay: float = 0.015) -> str:
"""Write a command to the pump.
A response will be returned after at least (2 * delay) seconds.
Delay defaults to 0.015 s per pump documentation.
If we fail to get a "OK" response, we will wait 0.1 s before attempting again,
up to 3 attempts.
Returns the pump's response string.
Raises:
PumpError: An exception describing what went wrong. In this case, we
couldn't get a response.
Args:
msg (str): The message to be sent
delay (float, optional): A float in seconds. Defaults to 0.015.
Returns:
str: the pump's decoded response string
"""
response = ""
tries = 1
cmd = "".join((msg, "\r")).encode() # defaults to utf-8
while tries <= 3: # pump docs recommend 3 attempts
# this would clear the pump's command buffer, but shouldn't be relied upon
# self.serial.write(b"#")
self.serial.reset_input_buffer()
self.serial.reset_output_buffer()
sleep(delay) # let the hardware buffers clear (could defer here if async)
self.serial.write(cmd)
self.serial.flush() # sleeps on a tight loop until everything is written
self.logger.debug("Sent %s (attempt %s/3)", msg, tries)
if msg == "#": # this won't give a response
break
sleep(delay) # let the pump respond
response = self.read() # returns an already-decoded string
if "OK" in response: # no need to retry
break
tries += 1
sleep(0.1) # recommended delay between successive transmissions
# let's throw an error if we couldn't get a response
if response == "" and msg != "#":
raise PumpError(
command=msg,
response=response,
message=(f"Couldn't get a message from the pump in response to {msg}"),
port=self.serial.name,
)
return response
[docs] def read(self) -> str:
"""Reads a single message from the pump.
Returns:
str: The pump's response, or an empty string if no response is given.
"""
response = b""
tries = 1
while tries <= 3:
response = self.serial.read_until(b"/") # we don't know the size a priori
self.logger.debug("Got response: %s (attempt %s/3)", response, tries)
if b"/" in response: # b"/" is the pump's EOL flag
break
tries += 1
return response.decode()
[docs] def close(self) -> None:
"""Closes the serial port associated with the pump."""
self.serial.close()
self.logger.info("Serial port closed")
@property
def is_open(self) -> bool:
"""Returns a boolean representing if the internal serial port is open."""
return self.serial.is_open