123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213 |
- import minimalmodbus
- import struct
- class ModbusRTUMeter:
- """Meter class that uses a minimalmodbus Instrument to query an ABB meter."""
- def __init__(self, port, baudrate=38400, slaveaddress=1, timeout=0.5):
- """ Initialize the ModbusRTUmeter object.
- Arguments:
- * port: a serial port (string)
- * baudrate: the baudrate to use (integer)
- * slaveaddress: the address of the modbus device (integer)
- * Specification of the type. Used to limit the registers to a specific set.
- Returns:
- * An ABBMeter object
- """
- self.instrument = minimalmodbus.Instrument(port, slaveaddress, debug=DEBUG)
- self.instrument.serial.baudrate = baudrate
- self.instrument.serial.timeout = timeout
- def read(self, regnames=None):
- """ Read one, many or all registers from the device
- Args:
- * regnames (str or list). If None, read all. If string, read
- single register. If list, read all registers from list.
- Returns:
- * If single register, it returns a single value. If all or list,
- return a dict with the keys and values.
- Raises:
- * KeyError, TypeError, IOError
- """
- if regnames is None:
- return self._batch_read(self.REGS)
- if type(regnames) is list:
- registers = [register for register in self.REGS if register['name'] in regnames]
- if len(registers) < len(regnames):
- regs_not_available = [regname for regname in regnames if regname not in
- [register['name'] for register in self.REGS]]
- print("Warning: the following registers are not available on this device: " +
- ", ".join(regs_not_available))
- print("The available registers are: %s" +
- ", ".join(register['name'] for register in self.REGS))
- if len(registers) == 0:
- return {}
- registers.sort(key=lambda reg: reg['start'])
- return self._batch_read(registers)
- elif type(regnames) is str:
- registers = [register for register in self.REGS if register['name'] == regnames]
- if len(registers) == 0:
- return "Register not found on device."
- return self._read_single(registers[0])
- else:
- raise TypeError
- def _read_single(self, register):
- """
- Read a single register and return the value. Not to be called directly.
- Arguments:
- * register: a 'register' dict that contains info on the register.
- Returns:
- * The interpreted value from the meter.
- """
- if register['length'] == 1:
- return self.instrument.read_register(registeraddress=register['start'],
- number_of_decimals=register['decimals'],
- signed=register['signed'])
- if register['length'] == 2:
- if register.get('float'):
- return self.instrument.read_float(registeraddress=register['start'],
- number_of_registers=2)
- else:
- value = self.instrument.read_long(registeraddress=register['start'],
- signed=register['signed'])
- return value / 10 ** register['decimals']
- if register['length'] == 4:
- value = self.instrument.read_registers(registeraddress=register['start'],
- number_of_registers=register['length'])
- return self._convert_value(values=value,
- signed=register['signed'],
- number_of_decimals=register['decimals'],
- float=register.get('float'))
- def _read_multiple(self, registers):
- """
- Read multiple registers from the slave device and return their values as a dict.
- Arguments:
- * A list of registers (complete structs)
- Returns:
- * A dict containing all keys and values
- """
- first_reg = min([register['start'] for register in registers])
- num_regs = max([register['start'] + register['length'] for register in registers]) - first_reg
- values = self.instrument.read_registers(registeraddress=first_reg,
- number_of_registers=num_regs)
- return self._interpret_result(values, registers)
- def _batch_read(self, registers):
- """
- Read multiple registers in batches, limiting each batch to at most 125 registers.
- Arguments:
- * A list of registers (complete structs)
- Returns:
- * A dict containing all keys and values
- """
- # Count up to at most 128 registers:
- start_reg = registers[0]['start']
- batch = []
- results = {}
- for register in registers:
- if register['start'] + register['length'] - start_reg <= 125:
- batch.append(register)
- else:
- results.update(self._read_multiple(batch))
- batch = []
- batch.append(register)
- start_reg = register['start']
- results.update(self._read_multiple(batch))
- return results
- def _interpret_result(self, data, registers):
- """
- Pull the returned string apart and package the data back to its
- intended form.
- Arguments:
- * data: list of register values returned from the device
- * registers: the original requested set of registers
- Returns:
- * A dict containing the register names and resulting values
- """
- first_reg = min([register['start'] for register in registers])
- results = {}
- for register in registers:
- regname = register['name']
- start = register['start'] - first_reg
- end = start + register['length']
- values = data[start:end]
- results[regname] = self._convert_value(values=values,
- signed=register['signed'],
- number_of_decimals=register['decimals'],
- float=register.get('float'))
- return results
- def _convert_value(self, values, signed=False, number_of_decimals=0, float=False):
- """
- Convert a list of returned integers to the intended value.
- Arguments:
- * bytestring: a list of integers that together represent the value
- * signed: whether the value is a signed value
- * decimals: number of decimals the return value should contain
- """
- if float:
- if len(values) == 2:
- bytestring = struct.pack('>HH', *values)
- return struct.unpack('>f', bytestring)[0]
- elif len(values) == 4:
- bytestring = struct.pack('>HHHH', *values)
- return struct.unpack('>d', bytestring)[0]
- number_of_registers = len(values)
- formatcode_i = '>'
- formatcode_o = '>'
- if number_of_registers == 1:
- formatcode_i += "H"
- if signed:
- formatcode_o += "h"
- else:
- formatcode_o += "H"
- if number_of_registers == 2:
- formatcode_i += 'HH'
- if signed:
- formatcode_o += "l"
- else:
- formatcode_o += "L"
- if number_of_registers == 4:
- formatcode_i += "HHHH"
- if signed:
- formatcode_o += "q"
- else:
- formatcode_o += "Q"
- bytestring = struct.pack(formatcode_i, *values)
- value = struct.unpack(formatcode_o, bytestring)[0]
- if value in ABBMeter.NULLS:
- return None
- else:
- return float(value) / 10 ** number_of_decimals
- NULLS = []
- REGS = []
|