Quellcode durchsuchen

Add support for SAIA meters

Stan Janssen vor 4 Jahren
Ursprung
Commit
b8eb17eb96
2 geänderte Dateien mit 232 neuen und 13 gelöschten Zeilen
  1. 230 11
      energymeter.py
  2. 2 2
      setup.py

+ 230 - 11
energymeter.py

@@ -18,6 +18,7 @@ import struct
 import asyncio
 import time
 import minimalmodbus
+import sbus
 from collections import Iterable
 
 class ABBMeter:
@@ -93,7 +94,7 @@ class ABBMeter:
 
         if register['length'] is 1:
             return self.instrument.read_register(registeraddress=register['start'],
-                                                 numberOfDecimals=register['decimals'],
+                                                 number_of_decimals=register['decimals'],
                                                  signed=register['signed'])
         if register['length'] is 2:
             value = self.instrument.read_long(registeraddress=register['start'],
@@ -102,10 +103,10 @@ class ABBMeter:
 
         if register['length'] is 4:
             value = self.instrument.read_registers(registeraddress=register['start'],
-                                                   numberOfRegisters=register['length'])
+                                                   number_of_registers=register['length'])
             return self._convert_value(values=value,
                                        signed=register['signed'],
-                                       numberOfDecimals=register['decimals'])
+                                       number_of_decimals=register['decimals'])
 
     def _read_multiple(self, registers):
         """
@@ -121,7 +122,7 @@ class ABBMeter:
         first_reg = min([register['start'] for register in registers])
         num_regs = max([register['start'] + register['length'] for register in registers]) - first_reg
         values = self.instrument.read_registers(registeraddress=first_reg,
-                                                numberOfRegisters=num_regs)
+                                                number_of_registers=num_regs)
         return self._interpret_result(values, registers)
 
     def _batch_read(self, registers):
@@ -173,10 +174,10 @@ class ABBMeter:
             values = data[start:end]
             results[regname] = self._convert_value(values=values,
                                                    signed=register['signed'],
-                                                   numberOfDecimals=register['decimals'])
+                                                   number_of_decimals=register['decimals'])
         return results
 
-    def _convert_value(self, values, signed=False, numberOfDecimals=0):
+    def _convert_value(self, values, signed=False, number_of_decimals=0):
         """
         Convert a list of returned integers to the intended value.
 
@@ -186,25 +187,25 @@ class ABBMeter:
             * decimals: number of decimals the return value should contain
         """
 
-        numberOfRegisters = len(values)
+        number_of_registers = len(values)
         formatcode_i = '>'
         formatcode_o = '>'
 
-        if numberOfRegisters == 1:
+        if number_of_registers == 1:
             formatcode_i += "H"
             if signed:
                 formatcode_o += "h"
             else:
                 formatcode_o += "H"
 
-        if numberOfRegisters == 2:
+        if number_of_registers == 2:
             formatcode_i += 'HH'
             if signed:
                 formatcode_o += "l"
             else:
                 formatcode_o += "L"
 
-        if numberOfRegisters == 4:
+        if number_of_registers == 4:
             formatcode_i += "HHHH"
             if signed:
                 formatcode_o += "q"
@@ -217,7 +218,7 @@ class ABBMeter:
         if value in ABBMeter.NULLS:
             return None
         else:
-            return float(value) / 10 ** numberOfDecimals
+            return float(value) / 10 ** number_of_decimals
 
     # Register map of the ABB A and B series energy meters.
     REGS = [{'name': 'active_import',              'start': 20480, 'length': 4, 'signed': True, 'decimals': 2},
@@ -912,3 +913,221 @@ class AsyncABBTCPMeter(AsyncModbusTCPMeter):
     PROTOCOL_CODE = 0
     FUNCTION_CODE = 3
     REG_OFFSET = 0
+
+
+class SaiaMeter:
+    def __init__(self, port, baudrate=38400, slaveaddress=1, type=None):
+        """ Initialize the ABBMeter object.
+
+        Arguments:
+            * port: a serial port (string)
+            * baudrate: the baudrate to use (integer)
+            * slaveaddress: the address of the modbus device (integer)
+            * Specification of the type. Used to limit the registers to a specific set.
+
+        Returns:
+            * An ABBMeter object
+
+        """
+        minimalmodbus.BAUDRATE = baudrate
+        minimalmodbus.TIMEOUT = 0.5
+        self.instrument = sbus.Instrument(address=slaveaddress, serial_port=port, baudrate=baudrate)
+
+    def read(self, regnames=None):
+        """ Read one, many or all registers from the device
+
+        Args:
+            * regnames (str or list). If None, read all. If string, read
+              single register. If list, read all registers from list.
+
+        Returns:
+            * If single register, it returns a single value. If all or list,
+              return a dict with the keys and values.
+
+        Raises:
+            * KeyError, TypeError, IOError
+        """
+        if regnames is None:
+            return self._batch_read(self.REGS)
+        if type(regnames) is list:
+            registers = [register for register in self.REGS if register['name'] in regnames]
+            if len(registers) < len(regnames):
+                regs_not_available = [regname for regname in regnames if regname not in \
+                                      [register['name'] for register in self.REGS]]
+                print("Warning: the following registers are not available on this device: " +
+                      ", ".join(regs_not_available))
+                print("The available registers are: %s" +
+                      ", ".join(register['name'] for register in self.REGS))
+            if len(registers) == 0:
+                return {}
+            registers.sort(key=lambda reg: reg['start'])
+            return self._batch_read(registers)
+        elif type(regnames) is str:
+            registers = [register for register in self.REGS if register['name'] == regnames]
+            if len(registers) == 0:
+                return "Register not found on device."
+            return self._read_single(registers[0])
+        else:
+            raise TypeError
+
+    def _read_single(self, register):
+        """
+        Read a single register and return the value. Not to be called directly.
+
+        Arguments:
+        * register: a 'register' dict that contains info on the register.
+
+        Returns:
+        * The interpreted value from the meter.
+        """
+
+        if register['length'] is 1:
+            return self.instrument.read_register(registeraddress=register['start'],
+                                                 number_of_decimals=register['decimals'],
+                                                 signed=register['signed'])
+        if register['length'] is 2:
+            value = self.instrument.read_long(registeraddress=register['start'],
+                                              signed=register['signed'])
+            return value / 10 ** register['decimals']
+
+        if register['length'] is 4:
+            value = self.instrument.read_registers(registeraddress=register['start'],
+                                                   number_of_registers=register['length'])
+            return self._convert_value(values=value,
+                                       signed=register['signed'],
+                                       number_of_decimals=register['decimals'])
+
+    def _read_multiple(self, registers):
+        """
+        Read multiple registers from the slave device and return their values as a dict.
+
+        Arguments:
+            * A list of registers (complete structs)
+
+        Returns:
+            * A dict containing all keys and values
+        """
+
+        first_reg = min([register['start'] for register in registers])
+        num_regs = max([register['start'] + register['length'] for register in registers]) - first_reg
+        values = self.instrument.read_registers(start_register=first_reg,
+                                                num_registers=num_regs)
+        return self._interpret_result(values, registers)
+
+    def _batch_read(self, registers):
+        """
+        Read multiple registers in batches, limiting each batch to at most 10 registers.
+
+        Arguments:
+            * A list of registers (complete structs)
+
+        Returns:
+            * A dict containing all keys and values
+        """
+        # Count up to at most 10 registers:
+        start_reg = registers[0]['start']
+        batch = []
+        results = {}
+        for register in registers:
+            if register['start'] + register['length'] - start_reg <= 10:
+                batch.append(register)
+            else:
+                results.update(self._read_multiple(batch))
+                batch = []
+                batch.append(register)
+                start_reg = register['start']
+
+        results.update(self._read_multiple(batch))
+        return results
+
+    def _interpret_result(self, data, registers):
+        """
+        Pull the returned string apart and package the data back to its
+        intended form.
+
+        Arguments:
+            * data: list of register values returned from the device
+            * registers: the original requested set of registers
+
+        Returns:
+            * A dict containing the register names and resulting values
+
+        """
+        first_reg = min([register['start'] for register in registers])
+
+        results = {}
+        for register in registers:
+            regname = register['name']
+            start = register['start'] - first_reg
+            end = start + register['length']
+            values = data[start:end]
+            results[regname] = self._convert_value(values=values,
+                                                   signed=register['signed'],
+                                                   number_of_decimals=register['decimals'])
+        return results
+
+    def _convert_value(self, values, signed=False, number_of_decimals=0):
+        """
+        Convert a list of returned integers to the intended value.
+
+        Arguments:
+            * bytestring: a list of integers that together represent the value
+            * signed: whether the value is a signed value
+            * decimals: number of decimals the return value should contain
+        """
+
+        number_of_registers = len(values)
+        formatcode_i = '>'
+        formatcode_o = '>'
+
+        if number_of_registers == 1:
+            formatcode_i += "L"
+            if signed:
+                formatcode_o += "l"
+            else:
+                formatcode_o += "L"
+
+        if number_of_registers == 2:
+            formatcode_i += 'LL'
+            if signed:
+                formatcode_o += "q"
+            else:
+                formatcode_o += "Q"
+
+        bytestring = struct.pack(formatcode_i, *values)
+        value = struct.unpack(formatcode_o, bytestring)[0]
+
+        return float(value) / 10 ** number_of_decimals
+
+    REGISTER_BYTES = 4
+    REGS = [{'name': 'firmware_version',           'start': 0, 'length': 1, 'signed': True, 'decimals': 0},
+            {'name': 'num_registers',              'start': 1, 'length': 1, 'signed': False, 'decimals': 0},
+            {'name': 'num_flags',                  'start': 2, 'length': 1, 'signed': False, 'decimals': 0},
+            {'name': 'baudrate',                   'start': 3, 'length': 1, 'signed': False, 'decimals': 0},
+            {'name': 'serial_number',              'start': 11, 'length': 2, 'signed': False, 'decimals': 0},
+            {'name': 'status_protect',             'start': 14, 'length': 1, 'signed': False, 'decimals': 0},
+            {'name': 'sbus_timeout',               'start': 15, 'length': 1, 'signed': False, 'decimals': 0},
+            {'name': 'sbus_address',               'start': 16, 'length': 1, 'signed': False, 'decimals': 0},
+            {'name': 'error_flags',                'start': 17, 'length': 1, 'signed': False, 'decimals': 0},
+            {'name': 'tariff',                     'start': 19, 'length': 1, 'signed': False, 'decimals': 0},
+            {'name': 'active_import_tariff_1',     'start': 20, 'length': 1, 'signed': False, 'decimals': 2},
+            {'name': 'resettable_active_import_t1','start': 21, 'length': 1, 'signed': False, 'decimals': 2},
+            {'name': 'active_import_tariff_2',     'start': 22, 'length': 1, 'signed': False, 'decimals': 2},
+            {'name': 'resettable_active_import_t2','start': 23, 'length': 1, 'signed': False, 'decimals': 2},
+            {'name': 'voltage_l1_n',               'start': 24, 'length': 1, 'signed': False, 'decimals': 0},
+            {'name': 'current_l1',                 'start': 25, 'length': 1, 'signed': False, 'decimals': 1},
+            {'name': 'active_power_l1',            'start': 26, 'length': 1, 'signed': True, 'decimals': 2},
+            {'name': 'reactive_power_l1',          'start': 27, 'length': 1, 'signed': True, 'decimals': 2},
+            {'name': 'power_factor_l1',            'start': 28, 'length': 1, 'signed': True, 'decimals': 2},
+            {'name': 'voltage_l2_n',               'start': 29, 'length': 1, 'signed': False, 'decimals': 0},
+            {'name': 'current_l2',                 'start': 30, 'length': 1, 'signed': False, 'decimals': 1},
+            {'name': 'active_power_l2',            'start': 31, 'length': 1, 'signed': True, 'decimals': 2},
+            {'name': 'reactive_power_l2',          'start': 32, 'length': 1, 'signed': True, 'decimals': 2},
+            {'name': 'power_factor_l2',            'start': 33, 'length': 1, 'signed': True, 'decimals': 2},
+            {'name': 'voltage_l3_n',               'start': 34, 'length': 1, 'signed': False, 'decimals': 0},
+            {'name': 'current_l3',                 'start': 35, 'length': 1, 'signed': False, 'decimals': 1},
+            {'name': 'active_power_l3',            'start': 36, 'length': 1, 'signed': True, 'decimals': 2},
+            {'name': 'reactive_power_l3',          'start': 37, 'length': 1, 'signed': True, 'decimals': 2},
+            {'name': 'power_factor_l3',            'start': 38, 'length': 1, 'signed': True, 'decimals': 2},
+            {'name': 'active_power_total',         'start': 39, 'length': 1, 'signed': True, 'decimals': 2},
+            {'name': 'reactive_power_total',       'start': 40, 'length': 1, 'signed': True, 'decimals': 2}]

+ 2 - 2
setup.py

@@ -30,7 +30,7 @@ setup(
     author_email = metadata['email'],
     url          = metadata['url'],
     description="Wrapper for Minimalmodbus to use with ABB Energy Meters.",
-    install_requires = ['pyserial','minimalmodbus'],
+    install_requires = ['pyserial', 'minimalmodbus', 'sbus'],
     py_modules = ['energymeter'],
     keywords='ABB Energy Meter Modbus'
-)
+)