modbus_rtu.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. import minimalmodbus
  2. import struct
  3. class ModbusRTUMeter:
  4. """Meter class that uses a minimalmodbus Instrument to query an ABB meter."""
  5. def __init__(self, port, baudrate=38400, slaveaddress=1, timeout=0.5):
  6. """ Initialize the ModbusRTUmeter object.
  7. Arguments:
  8. * port: a serial port (string)
  9. * baudrate: the baudrate to use (integer)
  10. * slaveaddress: the address of the modbus device (integer)
  11. * Specification of the type. Used to limit the registers to a specific set.
  12. Returns:
  13. * An ABBMeter object
  14. """
  15. self.instrument = minimalmodbus.Instrument(port, slaveaddress, debug=DEBUG)
  16. self.instrument.serial.baudrate = baudrate
  17. self.instrument.serial.timeout = timeout
  18. def read(self, regnames=None):
  19. """ Read one, many or all registers from the device
  20. Args:
  21. * regnames (str or list). If None, read all. If string, read
  22. single register. If list, read all registers from list.
  23. Returns:
  24. * If single register, it returns a single value. If all or list,
  25. return a dict with the keys and values.
  26. Raises:
  27. * KeyError, TypeError, IOError
  28. """
  29. if regnames is None:
  30. return self._batch_read(self.REGS)
  31. if type(regnames) is list:
  32. registers = [register for register in self.REGS if register['name'] in regnames]
  33. if len(registers) < len(regnames):
  34. regs_not_available = [regname for regname in regnames if regname not in
  35. [register['name'] for register in self.REGS]]
  36. print("Warning: the following registers are not available on this device: " +
  37. ", ".join(regs_not_available))
  38. print("The available registers are: %s" +
  39. ", ".join(register['name'] for register in self.REGS))
  40. if len(registers) == 0:
  41. return {}
  42. registers.sort(key=lambda reg: reg['start'])
  43. return self._batch_read(registers)
  44. elif type(regnames) is str:
  45. registers = [register for register in self.REGS if register['name'] == regnames]
  46. if len(registers) == 0:
  47. return "Register not found on device."
  48. return self._read_single(registers[0])
  49. else:
  50. raise TypeError
  51. def _read_single(self, register):
  52. """
  53. Read a single register and return the value. Not to be called directly.
  54. Arguments:
  55. * register: a 'register' dict that contains info on the register.
  56. Returns:
  57. * The interpreted value from the meter.
  58. """
  59. if register['length'] == 1:
  60. return self.instrument.read_register(registeraddress=register['start'],
  61. number_of_decimals=register['decimals'],
  62. signed=register['signed'])
  63. if register['length'] == 2:
  64. if register.get('float'):
  65. return self.instrument.read_float(registeraddress=register['start'],
  66. number_of_registers=2)
  67. else:
  68. value = self.instrument.read_long(registeraddress=register['start'],
  69. signed=register['signed'])
  70. return value / 10 ** register['decimals']
  71. if register['length'] == 4:
  72. value = self.instrument.read_registers(registeraddress=register['start'],
  73. number_of_registers=register['length'])
  74. return self._convert_value(values=value,
  75. signed=register['signed'],
  76. number_of_decimals=register['decimals'],
  77. float=register.get('float'))
  78. def _read_multiple(self, registers):
  79. """
  80. Read multiple registers from the slave device and return their values as a dict.
  81. Arguments:
  82. * A list of registers (complete structs)
  83. Returns:
  84. * A dict containing all keys and values
  85. """
  86. first_reg = min([register['start'] for register in registers])
  87. num_regs = max([register['start'] + register['length'] for register in registers]) - first_reg
  88. values = self.instrument.read_registers(registeraddress=first_reg,
  89. number_of_registers=num_regs)
  90. return self._interpret_result(values, registers)
  91. def _batch_read(self, registers):
  92. """
  93. Read multiple registers in batches, limiting each batch to at most 125 registers.
  94. Arguments:
  95. * A list of registers (complete structs)
  96. Returns:
  97. * A dict containing all keys and values
  98. """
  99. # Count up to at most 128 registers:
  100. start_reg = registers[0]['start']
  101. batch = []
  102. results = {}
  103. for register in registers:
  104. if register['start'] + register['length'] - start_reg <= 125:
  105. batch.append(register)
  106. else:
  107. results.update(self._read_multiple(batch))
  108. batch = []
  109. batch.append(register)
  110. start_reg = register['start']
  111. results.update(self._read_multiple(batch))
  112. return results
  113. def _interpret_result(self, data, registers):
  114. """
  115. Pull the returned string apart and package the data back to its
  116. intended form.
  117. Arguments:
  118. * data: list of register values returned from the device
  119. * registers: the original requested set of registers
  120. Returns:
  121. * A dict containing the register names and resulting values
  122. """
  123. first_reg = min([register['start'] for register in registers])
  124. results = {}
  125. for register in registers:
  126. regname = register['name']
  127. start = register['start'] - first_reg
  128. end = start + register['length']
  129. values = data[start:end]
  130. results[regname] = self._convert_value(values=values,
  131. signed=register['signed'],
  132. number_of_decimals=register['decimals'],
  133. float=register.get('float'))
  134. return results
  135. def _convert_value(self, values, signed=False, number_of_decimals=0, float=False):
  136. """
  137. Convert a list of returned integers to the intended value.
  138. Arguments:
  139. * bytestring: a list of integers that together represent the value
  140. * signed: whether the value is a signed value
  141. * decimals: number of decimals the return value should contain
  142. """
  143. if float:
  144. if len(values) == 2:
  145. bytestring = struct.pack('>HH', *values)
  146. return struct.unpack('>f', bytestring)[0]
  147. elif len(values) == 4:
  148. bytestring = struct.pack('>HHHH', *values)
  149. return struct.unpack('>d', bytestring)[0]
  150. number_of_registers = len(values)
  151. formatcode_i = '>'
  152. formatcode_o = '>'
  153. if number_of_registers == 1:
  154. formatcode_i += "H"
  155. if signed:
  156. formatcode_o += "h"
  157. else:
  158. formatcode_o += "H"
  159. if number_of_registers == 2:
  160. formatcode_i += 'HH'
  161. if signed:
  162. formatcode_o += "l"
  163. else:
  164. formatcode_o += "L"
  165. if number_of_registers == 4:
  166. formatcode_i += "HHHH"
  167. if signed:
  168. formatcode_o += "q"
  169. else:
  170. formatcode_o += "Q"
  171. bytestring = struct.pack(formatcode_i, *values)
  172. value = struct.unpack(formatcode_o, bytestring)[0]
  173. if value in ABBMeter.NULLS:
  174. return None
  175. else:
  176. return float(value) / 10 ** number_of_decimals
  177. NULLS = []
  178. REGS = []