#!/usr/bin/env python3 """ EnergyMeter: A Python module for interfacing with several energy meters. """ __author__ = 'Stan Janssen' __email__ = 'stanjanssen@finetuned.nl' __url__ = 'https://finetuned.nl/' __license__ = 'Apache License, Version 2.0' __version__ = '1.2.0' __status__ = 'Beta' import random import socket import struct import asyncio import time import minimalmodbus import tinysbus from collections import Iterable DEBUG = False class ModbusTCPMeter: """ Implementation for a Modbus TCP Energy Meter. """ def __init__(self, port, tcp_port=502, slaveaddress=126, type=None, baudrate=None): self.port = port self.tcp_port = tcp_port self.device_id = slaveaddress def read(self, regnames=None): if regnames is None: registers = self.REGS return self._read_multiple(registers) if type(regnames) is str: registers = [register for register in self.REGS if register['name'] == regnames] return self._read_single(registers[0]) if type(regnames) is list: registers = [register for register in self.REGS if register['name'] in regnames] return self._read_multiple(registers) def _read_single(self, register): message = self._modbus_message(start_reg=register['start'], num_regs=register['length']) data = self._perform_request(message) return self._convert_value(data, signed=register['signed'], decimals=register['decimals']) def _read_multiple(self, registers): registers.sort(key=lambda reg: reg['start']) results = {} for reg_range in self._split_ranges(registers): first_reg = min([register['start'] for register in reg_range]) num_regs = max([register['start'] + register['length'] for register in reg_range]) - first_reg message = self._modbus_message(start_reg=first_reg, num_regs=num_regs) data = self._perform_request(message) results.update(self._interpret_result(data, reg_range)) return results def _split_ranges(self, registers): """ Generator that splits the registers list into continuous parts. """ reg_list = [] prev_end = registers[0]['start'] - 1 for r in registers: if r['start'] - prev_end > 1: yield reg_list reg_list = [] reg_list.append(r) prev_end = r['start'] + r['length'] yield reg_list def _modbus_message(self, start_reg, num_regs): transaction_id = random.randint(1, 2**16 - 1) return struct.pack(">HHHBBHH", transaction_id, self.PROTOCOL_CODE, 6, self.device_id, self.FUNCTION_CODE, start_reg - self.REG_OFFSET, num_regs) def _perform_request(self, message): if self.device is None: self._connect() self.device.send(message) data = bytes() expect_bytes = 9 + 2 * struct.unpack(">H", message[-2:])[0] attempt = 1 while len(data) is not expect_bytes: time.sleep(0.05) data += self.device.recv(2048) if attempt >= 10: return 2 * struct.unpack(">H", message[-2:])[0] * [0] attempt += 1 return data[9:] def _interpret_result(self, data, registers): """ Pull the returned string apart and package the data back to its intended form. Arguments: * data: list of register values returned from the device * registers: the original requested set of registers Returns: * A dict containing the register names and resulting values """ first_reg = min([register['start'] for register in registers]) results = {} for register in registers: regname = register['name'] start = (register['start'] - first_reg) * 2 end = start + register['length'] * 2 values = data[start:end] results[regname] = self._convert_value(values=values, signed=register['signed'], decimals=register['decimals'], isFloat=register['isFloat']) if regname == "power_factor_total" and results[regname] == 0: results[regname] = 1 # The SMA will send out a 0 when the power factor is 100% return results def _convert_value(self, values, signed=False, decimals=0, isFloat=False): """ Convert a list of returned integers to the intended value. Arguments: * bytestring: a list of integers that together represent the value * signed: whether the value is a signed value * decimals: number of decimals the return value should contain * isFloat: whether the valie is a float """ numberOfBytes = len(values) formatcode_o = '>' if isFloat: formatcode_o += 'f' elif numberOfBytes == 1: if signed: formatcode_o += "b" else: formatcode_o += "B" elif numberOfBytes == 2: if signed: formatcode_o += "h" else: formatcode_o += "H" elif numberOfBytes == 4: if signed: formatcode_o += "l" else: formatcode_o += "L" value = struct.unpack(formatcode_o, bytes(values))[0] if value in self.NULLS: return None else: return float(value) / 10 ** decimals def _connect(self): self.device = socket.create_connection(address=(self.port, self.tcp_port)) class SMAMeter(ModbusTCPMeter): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) REGS = [ {'name': 'current_ac', 'start': 40188, 'length': 1, 'signed': False, 'decimals': 1}, {'name': 'current_l1', 'start': 40189, 'length': 1, 'signed': False, 'decimals': 1}, {'name': 'current_l2', 'start': 40190, 'length': 1, 'signed': False, 'decimals': 1}, {'name': 'current_l3', 'start': 40191, 'length': 1, 'signed': False, 'decimals': 1}, {'name': 'voltage_l1_l2', 'start': 40193, 'length': 1, 'signed': False, 'decimals': 1}, {'name': 'voltage_l2_l3', 'start': 40194, 'length': 1, 'signed': False, 'decimals': 1}, {'name': 'voltage_l3_l1', 'start': 40195, 'length': 1, 'signed': False, 'decimals': 1}, {'name': 'voltage_l1_n', 'start': 40196, 'length': 1, 'signed': False, 'decimals': 1}, {'name': 'voltage_l2_n', 'start': 40197, 'length': 1, 'signed': False, 'decimals': 1}, {'name': 'voltage_l3_n', 'start': 40198, 'length': 1, 'signed': False, 'decimals': 1}, {'name': 'active_power_total', 'start': 40200, 'length': 1, 'signed': True, 'decimals': -1}, {'name': 'frequency', 'start': 40202, 'length': 1, 'signed': False, 'decimals': 2}, {'name': 'apparent_power_total', 'start': 40204, 'length': 1, 'signed': True, 'decimals': -1}, {'name': 'reactive_power_total', 'start': 40206, 'length': 1, 'signed': True, 'decimals': -1}, {'name': 'power_factor_total', 'start': 40208, 'length': 1, 'signed': True, 'decimals': 3}, {'name': 'active_export', 'start': 40210, 'length': 2, 'signed': False, 'decimals': 3}, {'name': 'dc_power', 'start': 40217, 'length': 1, 'signed': False, 'decimals': -1}, {'name': 'temperature_internal', 'start': 40219, 'length': 1, 'signed': False, 'decimals': 0}, {'name': 'temperature_other', 'start': 40222, 'length': 1, 'signed': False, 'decimals': 0}, {'name': 'operating_status', 'start': 40224, 'length': 1, 'signed': False, 'decimals': 0} ] REG_OFFSET = 1 PROTOCOL_CODE = 0 FUNCTION_CODE = 3 NULLS = [2**16 - 1, 2**15 - 1, 2**15, -2**15] class MulticubeMeter(ModbusTCPMeter): """ Implementation for a Multicube energy meter over Modbus TCP. """ def __init__(self, port, tcp_port=1502, slaveaddress=1, type=None, baudrate=None, auto_scale=True): super().__init__(port, tcp_port, slaveaddress, type, baudrate) self.device = socket.create_connection(address=(port, tcp_port)) self.device_id = slaveaddress self.REGS = [ {'name': 'energy_scale', 'start': 512, 'length': 2, 'decimals': 0, 'signed': True}, {'name': 'active_net', 'start': 514, 'length': 2, 'decimals': 0, 'signed': True}, {'name': 'apparent_net', 'start': 516, 'length': 2, 'decimals': 0, 'signed': True}, {'name': 'reactive_net', 'start': 518, 'length': 2, 'decimals': 0, 'signed': True}, {'name': 'active_power_total', 'start': 2816, 'length': 1, 'decimals': 0, 'signed': True}, {'name': 'apparent_power_total', 'start': 2817, 'length': 1, 'decimals': 0, 'signed': True}, {'name': 'reactive_power_total', 'start': 2818, 'length': 1, 'decimals': 0, 'signed': True}, {'name': 'power_factor_total', 'start': 2819, 'length': 1, 'decimals': 3, 'signed': True}, {'name': 'frequency', 'start': 2820, 'length': 1, 'decimals': 1, 'signed': True}, {'name': 'voltage_l1_n', 'start': 2821, 'length': 1, 'decimals': 0, 'signed': True}, {'name': 'current_l1', 'start': 2822, 'length': 1, 'decimals': 0, 'signed': True}, {'name': 'active_power_l1', 'start': 2823, 'length': 1, 'decimals': 0, 'signed': True}, {'name': 'voltage_l2_n', 'start': 2824, 'length': 1, 'decimals': 0, 'signed': True}, {'name': 'current_l2', 'start': 2825, 'length': 1, 'decimals': 0, 'signed': True}, {'name': 'active_power_l2', 'start': 2826, 'length': 1, 'decimals': 0, 'signed': True}, {'name': 'voltage_l3_n', 'start': 2827, 'length': 1, 'decimals': 0, 'signed': True}, {'name': 'current_l3', 'start': 2828, 'length': 1, 'decimals': 0, 'signed': True}, {'name': 'active_power_l3', 'start': 2829, 'length': 1, 'decimals': 0, 'signed': True}, {'name': 'power_factor_l1', 'start': 2830, 'length': 1, 'decimals': 0, 'signed': True}, {'name': 'power_factor_l2', 'start': 2831, 'length': 1, 'decimals': 0, 'signed': True}, {'name': 'power_factor_l3', 'start': 2832, 'length': 1, 'decimals': 0, 'signed': True}, {'name': 'voltage_l1_l2', 'start': 2833, 'length': 1, 'decimals': 0, 'signed': True}, {'name': 'voltage_l2_l3', 'start': 2834, 'length': 1, 'decimals': 0, 'signed': True}, {'name': 'voltage_l3_l1', 'start': 2835, 'length': 1, 'decimals': 0, 'signed': True}, {'name': 'current_n', 'start': 2836, 'length': 1, 'decimals': 0, 'signed': True}, {'name': 'amps_scale', 'start': 2837, 'length': 1, 'decimals': 0, 'signed': True}, {'name': 'phase_volts_scale', 'start': 2838, 'length': 1, 'decimals': 0, 'signed': True}, {'name': 'line_volts_scale', 'start': 2839, 'length': 1, 'decimals': 0, 'signed': True}, {'name': 'power_scale', 'start': 2840, 'length': 1, 'decimals': 0, 'signed': True}, {'name': 'apparent_power_l1', 'start': 3072, 'length': 1, 'decimals': 0, 'signed': True}, {'name': 'apparent_power_l2', 'start': 3073, 'length': 1, 'decimals': 0, 'signed': True}, {'name': 'apparent_power_l3', 'start': 3074, 'length': 1, 'decimals': 0, 'signed': True}, {'name': 'reactive_power_l1', 'start': 3075, 'length': 1, 'decimals': 0, 'signed': True}, {'name': 'reactive_power_l2', 'start': 3076, 'length': 1, 'decimals': 0, 'signed': True}, {'name': 'reactive_power_l3', 'start': 3077, 'length': 1, 'decimals': 0, 'signed': True}, {'name': 'peak_current_l1', 'start': 3078, 'length': 1, 'decimals': 0, 'signed': True}, {'name': 'peak_current_l2', 'start': 3079, 'length': 1, 'decimals': 0, 'signed': True}, {'name': 'peak_current_l3', 'start': 3080, 'length': 1, 'decimals': 0, 'signed': True}, {'name': 'current_l1_thd', 'start': 3081, 'length': 1, 'decimals': 3, 'signed': True}, {'name': 'current_l2_thd', 'start': 3082, 'length': 1, 'decimals': 3, 'signed': True}, {'name': 'current_l3_thd', 'start': 3083, 'length': 1, 'decimals': 3, 'signed': True} ] if auto_scale: self.set_scaling() def set_scaling(self): """ Call this function before reading anything to set up the correct scaling for this meter. """ decimals_mapping = {1: 2, 2: 1, 3: 0, 4: -1, 5: -2, 6: -3, 7: -4} a_registers = ['current_l1', 'current_l2', 'current_l3', 'current_n'] scale = int(self.read('amps_scale')) for r in self.REGS: if r['name'] in a_registers: r['decimals'] = decimals_mapping[scale] pv_registers = ['voltage_l1_n', 'voltage_l2_n', 'voltage_l3_n'] scale = int(self.read('phase_volts_scale')) for r in self.REGS: if r['name'] in pv_registers: r['decimals'] = decimals_mapping[scale] lv_registers = ['voltage_l1_l2', 'voltage_l2_l3', 'voltage_l3_l1'] scale = int(self.read('line_volts_scale')) for r in self.REGS: if r['name'] in lv_registers: r['decimals'] = decimals_mapping[scale] p_registers = ['active_power_total', 'reactive_power_total', 'apparent_power_total', 'active_power_l1', 'active_power_l2', 'active_power_l3', 'apparent_power_l1', 'apparent_power_l2', 'apparent_power_l3', 'reactive_power_l1', 'reactive_power_l2', 'reactive_power_l3'] scale = int(self.read('power_scale')) for r in self.REGS: if r['name'] in p_registers: r['decimals'] = decimals_mapping[scale] e_registers = ['active_net', 'apparent_net', 'reactive_net'] decimals_mapping = {3: 3, 4: 2, 5: 1, 6: 0, 7: -1} scale = int(self.read('energy_scale')) for r in self.REGS: if r['name'] in e_registers: r['decimals'] = decimals_mapping[scale] REG_OFFSET = 0 PROTOCOL_CODE = 0 FUNCTION_CODE = 3 NULLS = [2**16 - 1, 2**15 - 1, 2**15, -2**15] class SaiaMeter: def __init__(self, port, baudrate=38400, slaveaddress=1, type=None, **kwargs): """ Initialize the ABBMeter object. Arguments: * port: a serial port (string) * baudrate: the baudrate to use (integer) * slaveaddress: the address of the modbus device (integer) * Specification of the type. Used to limit the registers to a specific set. Returns: * An ABBMeter object """ self.instrument = tinysbus.Instrument(address=slaveaddress, serial_port=port, baudrate=baudrate, **kwargs) def read(self, regnames=None): """ Read one, many or all registers from the device Args: * regnames (str or list). If None, read all. If string, read single register. If list, read all registers from list. Returns: * If single register, it returns a single value. If all or list, return a dict with the keys and values. Raises: * KeyError, TypeError, IOError """ if regnames is None: return self._batch_read(self.REGS) if type(regnames) is list: registers = [register for register in self.REGS if register['name'] in regnames] if len(registers) < len(regnames): regs_not_available = [regname for regname in regnames if regname not in \ [register['name'] for register in self.REGS]] print("Warning: the following registers are not available on this device: " + ", ".join(regs_not_available)) print("The available registers are: %s" + ", ".join(register['name'] for register in self.REGS)) if len(registers) == 0: return {} registers.sort(key=lambda reg: reg['start']) return self._batch_read(registers) elif type(regnames) is str: registers = [register for register in self.REGS if register['name'] == regnames] if len(registers) == 0: raise ValueError("Register not found on device.") return self._read_single(registers[0]) else: raise TypeError def _read_single(self, register): """ Read a single register and return the value. Not to be called directly. Arguments: * register: a 'register' dict that contains info on the register. Returns: * The interpreted value from the meter. """ if register['length'] == 1: return self.instrument.read_register(register_address=register['start'], number_of_decimals=register['decimals'], signed=register['signed']) if register['length'] == 2: value = self.instrument.read_long(register_address=register['start'], signed=register['signed']) return value / 10 ** register['decimals'] if register['length'] == 4: value = self.instrument.read_registers(register_address=register['start'], number_of_registers=register['length']) return self._convert_value(values=value, signed=register['signed'], number_of_decimals=register['decimals']) def _read_multiple(self, registers): """ Read multiple registers from the slave device and return their values as a dict. Arguments: * A list of registers (complete structs) Returns: * A dict containing all keys and values """ first_reg = min([register['start'] for register in registers]) num_regs = max([register['start'] + register['length'] for register in registers]) - first_reg values = self.instrument.read_registers(register_address=first_reg, num_registers=num_regs) return self._interpret_result(values, registers) def _batch_read(self, registers): """ Read multiple registers in batches, limiting each batch to at most 10 registers. Arguments: * A list of registers (complete structs) Returns: * A dict containing all keys and values """ # Count up to at most 10 registers: start_reg = registers[0]['start'] batch = [] results = {} for register in registers: if register['start'] + register['length'] - start_reg <= 10: batch.append(register) else: results.update(self._read_multiple(batch)) batch = [] batch.append(register) start_reg = register['start'] results.update(self._read_multiple(batch)) return results def _interpret_result(self, data, registers): """ Pull the returned string apart and package the data back to its intended form. Arguments: * data: list of register values returned from the device * registers: the original requested set of registers Returns: * A dict containing the register names and resulting values """ first_reg = min([register['start'] for register in registers]) results = {} for register in registers: regname = register['name'] start = register['start'] - first_reg end = start + register['length'] values = data[start:end] results[regname] = self._convert_value(values=values, signed=register['signed'], number_of_decimals=register['decimals']) return results def _convert_value(self, values, signed=False, number_of_decimals=0): """ Convert a list of returned integers to the intended value. Arguments: * bytestring: a list of integers that together represent the value * signed: whether the value is a signed value * decimals: number of decimals the return value should contain """ number_of_registers = len(values) formatcode_i = '>' formatcode_o = '>' if number_of_registers == 1: formatcode_i += "L" if signed: formatcode_o += "l" else: formatcode_o += "L" if number_of_registers == 2: formatcode_i += 'LL' if signed: formatcode_o += "q" else: formatcode_o += "Q" bytestring = struct.pack(formatcode_i, *values) value = struct.unpack(formatcode_o, bytestring)[0] return float(value) / 10 ** number_of_decimals REGS = [{'name': 'firmware_version', 'start': 0, 'length': 1, 'signed': True, 'decimals': 0}, {'name': 'num_registers', 'start': 1, 'length': 1, 'signed': False, 'decimals': 0}, {'name': 'num_flags', 'start': 2, 'length': 1, 'signed': False, 'decimals': 0}, {'name': 'baudrate', 'start': 3, 'length': 1, 'signed': False, 'decimals': 0}, {'name': 'serial_number', 'start': 11, 'length': 2, 'signed': False, 'decimals': 0}, {'name': 'status_protect', 'start': 14, 'length': 1, 'signed': False, 'decimals': 0}, {'name': 'sbus_timeout', 'start': 15, 'length': 1, 'signed': False, 'decimals': 0}, {'name': 'sbus_address', 'start': 16, 'length': 1, 'signed': False, 'decimals': 0}, {'name': 'error_flags', 'start': 17, 'length': 1, 'signed': False, 'decimals': 0}, {'name': 'tariff', 'start': 19, 'length': 1, 'signed': False, 'decimals': 0}, {'name': 'active_import_tariff_1', 'start': 20, 'length': 1, 'signed': False, 'decimals': 2}, {'name': 'resettable_active_import_t1','start': 21, 'length': 1, 'signed': False, 'decimals': 2}, {'name': 'active_import_tariff_2', 'start': 22, 'length': 1, 'signed': False, 'decimals': 2}, {'name': 'resettable_active_import_t2','start': 23, 'length': 1, 'signed': False, 'decimals': 2}, {'name': 'voltage_l1_n', 'start': 24, 'length': 1, 'signed': False, 'decimals': 0}, {'name': 'current_l1', 'start': 25, 'length': 1, 'signed': False, 'decimals': 1}, {'name': 'active_power_l1', 'start': 26, 'length': 1, 'signed': True, 'decimals': 2}, {'name': 'reactive_power_l1', 'start': 27, 'length': 1, 'signed': True, 'decimals': 2}, {'name': 'power_factor_l1', 'start': 28, 'length': 1, 'signed': True, 'decimals': 2}, {'name': 'voltage_l2_n', 'start': 29, 'length': 1, 'signed': False, 'decimals': 0}, {'name': 'current_l2', 'start': 30, 'length': 1, 'signed': False, 'decimals': 1}, {'name': 'active_power_l2', 'start': 31, 'length': 1, 'signed': True, 'decimals': 2}, {'name': 'reactive_power_l2', 'start': 32, 'length': 1, 'signed': True, 'decimals': 2}, {'name': 'power_factor_l2', 'start': 33, 'length': 1, 'signed': True, 'decimals': 2}, {'name': 'voltage_l3_n', 'start': 34, 'length': 1, 'signed': False, 'decimals': 0}, {'name': 'current_l3', 'start': 35, 'length': 1, 'signed': False, 'decimals': 1}, {'name': 'active_power_l3', 'start': 36, 'length': 1, 'signed': True, 'decimals': 2}, {'name': 'reactive_power_l3', 'start': 37, 'length': 1, 'signed': True, 'decimals': 2}, {'name': 'power_factor_l3', 'start': 38, 'length': 1, 'signed': True, 'decimals': 2}, {'name': 'active_power_total', 'start': 39, 'length': 1, 'signed': True, 'decimals': 2}, {'name': 'reactive_power_total', 'start': 40, 'length': 1, 'signed': True, 'decimals': 2}]