import minimalmodbus import struct class ModbusRTUMeter: """Meter class that uses a minimalmodbus Instrument to query an ABB meter.""" def __init__(self, port, baudrate=38400, slaveaddress=1, timeout=0.5): """ Initialize the ModbusRTUmeter object. Arguments: * port: a serial port (string) * baudrate: the baudrate to use (integer) * slaveaddress: the address of the modbus device (integer) * Specification of the type. Used to limit the registers to a specific set. Returns: * An ABBMeter object """ self.instrument = minimalmodbus.Instrument(port, slaveaddress, debug=DEBUG) self.instrument.serial.baudrate = baudrate self.instrument.serial.timeout = timeout def read(self, regnames=None): """ Read one, many or all registers from the device Args: * regnames (str or list). If None, read all. If string, read single register. If list, read all registers from list. Returns: * If single register, it returns a single value. If all or list, return a dict with the keys and values. Raises: * KeyError, TypeError, IOError """ if regnames is None: return self._batch_read(self.REGS) if type(regnames) is list: registers = [register for register in self.REGS if register['name'] in regnames] if len(registers) < len(regnames): regs_not_available = [regname for regname in regnames if regname not in [register['name'] for register in self.REGS]] print("Warning: the following registers are not available on this device: " + ", ".join(regs_not_available)) print("The available registers are: %s" + ", ".join(register['name'] for register in self.REGS)) if len(registers) == 0: return {} registers.sort(key=lambda reg: reg['start']) return self._batch_read(registers) elif type(regnames) is str: registers = [register for register in self.REGS if register['name'] == regnames] if len(registers) == 0: return "Register not found on device." return self._read_single(registers[0]) else: raise TypeError def _read_single(self, register): """ Read a single register and return the value. Not to be called directly. Arguments: * register: a 'register' dict that contains info on the register. Returns: * The interpreted value from the meter. """ if register['length'] == 1: return self.instrument.read_register(registeraddress=register['start'], number_of_decimals=register['decimals'], signed=register['signed']) if register['length'] == 2: if register.get('float'): return self.instrument.read_float(registeraddress=register['start'], number_of_registers=2) else: value = self.instrument.read_long(registeraddress=register['start'], signed=register['signed']) return value / 10 ** register['decimals'] if register['length'] == 4: value = self.instrument.read_registers(registeraddress=register['start'], number_of_registers=register['length']) return self._convert_value(values=value, signed=register['signed'], number_of_decimals=register['decimals'], float=register.get('float')) def _read_multiple(self, registers): """ Read multiple registers from the slave device and return their values as a dict. Arguments: * A list of registers (complete structs) Returns: * A dict containing all keys and values """ first_reg = min([register['start'] for register in registers]) num_regs = max([register['start'] + register['length'] for register in registers]) - first_reg values = self.instrument.read_registers(registeraddress=first_reg, number_of_registers=num_regs) return self._interpret_result(values, registers) def _batch_read(self, registers): """ Read multiple registers in batches, limiting each batch to at most 125 registers. Arguments: * A list of registers (complete structs) Returns: * A dict containing all keys and values """ # Count up to at most 128 registers: start_reg = registers[0]['start'] batch = [] results = {} for register in registers: if register['start'] + register['length'] - start_reg <= 125: batch.append(register) else: results.update(self._read_multiple(batch)) batch = [] batch.append(register) start_reg = register['start'] results.update(self._read_multiple(batch)) return results def _interpret_result(self, data, registers): """ Pull the returned string apart and package the data back to its intended form. Arguments: * data: list of register values returned from the device * registers: the original requested set of registers Returns: * A dict containing the register names and resulting values """ first_reg = min([register['start'] for register in registers]) results = {} for register in registers: regname = register['name'] start = register['start'] - first_reg end = start + register['length'] values = data[start:end] results[regname] = self._convert_value(values=values, signed=register['signed'], number_of_decimals=register['decimals'], float=register.get('float')) return results def _convert_value(self, values, signed=False, number_of_decimals=0, float=False): """ Convert a list of returned integers to the intended value. Arguments: * bytestring: a list of integers that together represent the value * signed: whether the value is a signed value * decimals: number of decimals the return value should contain """ if float: if len(values) == 2: bytestring = struct.pack('>HH', *values) return struct.unpack('>f', bytestring)[0] elif len(values) == 4: bytestring = struct.pack('>HHHH', *values) return struct.unpack('>d', bytestring)[0] number_of_registers = len(values) formatcode_i = '>' formatcode_o = '>' if number_of_registers == 1: formatcode_i += "H" if signed: formatcode_o += "h" else: formatcode_o += "H" if number_of_registers == 2: formatcode_i += 'HH' if signed: formatcode_o += "l" else: formatcode_o += "L" if number_of_registers == 4: formatcode_i += "HHHH" if signed: formatcode_o += "q" else: formatcode_o += "Q" bytestring = struct.pack(formatcode_i, *values) value = struct.unpack(formatcode_o, bytestring)[0] if value in ABBMeter.NULLS: return None else: return float(value) / 10 ** number_of_decimals NULLS = [] REGS = []