123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395 |
- # SPDX-License-Identifier: Apache-2.0
- # Copyright 2020 Contributors to OpenLEADR
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- # http://www.apache.org/licenses/LICENSE-2.0
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- from asyncio import iscoroutine
- from datetime import datetime, timedelta, timezone
- from dataclasses import is_dataclass, asdict
- import random
- import string
- from collections import OrderedDict
- import itertools
- import re
- import ssl
- import hashlib
- import uuid
- from openleadr import config
- DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
- DATETIME_FORMAT_NO_MICROSECONDS = "%Y-%m-%dT%H:%M:%SZ"
- def generate_id(*args, **kwargs):
- """
- Generate a string that can be used as an identifier in OpenADR messages.
- """
- return str(uuid.uuid4())
- def indent_xml(message):
- """
- Indents the XML in a nice way.
- """
- INDENT_SIZE = 2
- lines = [line.strip() for line in message.split("\n") if line.strip() != ""]
- indent = 0
- for i, line in enumerate(lines):
- if i == 0:
- continue
- if re.search(r'^</[^>]+>$', line):
- indent = indent - INDENT_SIZE
- lines[i] = " " * indent + line
- if not (re.search(r'</[^>]+>$', line) or line.endswith("/>")):
- indent = indent + INDENT_SIZE
- return "\n".join(lines)
- def flatten_xml(message):
- """
- Flatten the entire XML structure.
- """
- lines = [line.strip() for line in message.split("\n") if line.strip() != ""]
- for line in lines:
- line = re.sub(r'\n', '', line)
- line = re.sub(r'\s\s+', ' ', line)
- return "".join(lines)
- def normalize_dict(ordered_dict):
- """
- Main conversion function for the output of xmltodict to the OpenLEADR
- representation of OpenADR contents.
- :param ordered_dict dict: The OrderedDict, dict or dataclass that you wish to convert.
- """
- if is_dataclass(ordered_dict):
- ordered_dict = asdict(ordered_dict)
- def normalize_key(key):
- if key.startswith('oadr'):
- key = key[4:]
- elif key.startswith('ei'):
- key = key[2:]
- key = re.sub(r'([a-z])([A-Z])', r'\1_\2', key)
- if '-' in key:
- key = key.replace('-', '_')
- return key.lower()
- d = {}
- for key, value in ordered_dict.items():
- # Interpret values from the dict
- if key.startswith("@"):
- continue
- key = normalize_key(key)
- if isinstance(value, (OrderedDict, dict)):
- d[key] = normalize_dict(value)
- elif isinstance(value, list):
- d[key] = []
- for item in value:
- if isinstance(item, (OrderedDict, dict)):
- dict_item = normalize_dict(item)
- d[key].append(normalize_dict(dict_item))
- else:
- d[key].append(item)
- elif key in ("duration", "startafter", "max_period", "min_period"):
- d[key] = parse_duration(value)
- elif ("date_time" in key or key == "dtstart") and isinstance(value, str):
- d[key] = parse_datetime(value)
- elif value in ('true', 'false'):
- d[key] = parse_boolean(value)
- elif isinstance(value, str):
- d[key] = parse_int(value) or parse_float(value) or value
- else:
- d[key] = value
- # Do our best to make the dictionary structure as pythonic as possible
- if key.startswith("x_ei_"):
- d[key[5:]] = d.pop(key)
- key = key[5:]
- # Group all targets as a list of dicts under the key "target"
- if key in ("target", "report_subject", "report_data_source"):
- targets = d.pop(key)
- new_targets = []
- if targets:
- for ikey in targets:
- if isinstance(targets[ikey], list):
- new_targets.extend([{ikey: value} for value in targets[ikey]])
- else:
- new_targets.append({ikey: targets[ikey]})
- d[key + "s"] = new_targets
- key = key + "s"
- # Dig up the properties inside some specific target identifiers
- # if key in ("aggregated_pnode", "pnode", "service_delivery_point"):
- # d[key] = d[key]["node"]
- # if key in ("end_device_asset", "meter_asset"):
- # d[key] = d[key]["mrid"]
- # Group all reports as a list of dicts under the key "pending_reports"
- if key == "pending_reports":
- if isinstance(d[key], dict) and 'report_request_id' in d[key] and isinstance(d[key]['report_request_id'], list):
- d['pending_reports'] = [{'request_id': rrid} for rrid in d['pending_reports']['report_request_id']]
- # Group all events al a list of dicts under the key "events"
- elif key == "event" and isinstance(d[key], list):
- events = d.pop("event")
- new_events = []
- for event in events:
- new_event = event['event']
- new_event['response_required'] = event['response_required']
- new_events.append(new_event)
- d["events"] = new_events
- # If there's only one event, also put it into a list
- elif key == "event" and isinstance(d[key], dict) and "event" in d[key]:
- oadr_event = d.pop('event')
- ei_event = oadr_event['event']
- ei_event['response_required'] = oadr_event['response_required']
- d['events'] = [ei_event]
- elif key in ("request_event", "created_event") and isinstance(d[key], dict):
- d = d[key]
- # Plurarize some lists
- elif key in ('report_request', 'report'):
- if isinstance(d[key], list):
- d[key + 's'] = d.pop(key)
- else:
- d[key + 's'] = [d.pop(key)]
- elif key == 'report_description':
- if isinstance(d[key], list):
- original_descriptions = d.pop(key)
- report_descriptions = {}
- for item in original_descriptions:
- r_id = item.pop('r_id')
- report_descriptions[r_id] = item
- d[key + 's'] = report_descriptions
- else:
- original_description = d.pop(key)
- r_id = original_description.pop('r_id')
- d[key + 's'] = {r_id: original_description}
- # Promote the contents of the Qualified Event ID
- elif key == "qualified_event_id" and isinstance(d['qualified_event_id'], dict):
- qeid = d.pop('qualified_event_id')
- d['event_id'] = qeid['event_id']
- d['modification_number'] = qeid['modification_number']
- # Promote the contents of the tolerance items
- # if key == "tolerance" and "tolerate" in d["tolerance"] and len(d["tolerance"]["tolerate"]) == 1:
- # d["tolerance"] = d["tolerance"]["tolerate"].values()[0]
- # Durations are encapsulated in their own object, remove this nesting
- elif isinstance(d[key], dict) and "duration" in d[key] and len(d[key]) == 1:
- d[key] = d[key]["duration"]
- # In general, remove all double nesting
- elif isinstance(d[key], dict) and key in d[key] and len(d[key]) == 1:
- d[key] = d[key][key]
- # In general, remove the double nesting of lists of items
- elif isinstance(d[key], dict) and key[:-1] in d[key] and len(d[key]) == 1:
- if isinstance(d[key][key[:-1]], list):
- d[key] = d[key][key[:-1]]
- else:
- d[key] = [d[key][key[:-1]]]
- # Payload values are wrapped in an object according to their type. We don't need that information.
- elif key in ("signal_payload", "current_value"):
- value = d[key]
- if isinstance(d[key], dict):
- if 'payload_float' in d[key] and 'value' in d[key]['payload_float'] and d[key]['payload_float']['value'] is not None:
- d[key] = float(d[key]['payload_float']['value'])
- elif 'payload_int' in d[key] and 'value' in d[key]['payload_int'] and d[key]['payload_int'] is not None:
- d[key] = int(d[key]['payload_int']['value'])
- # All values other than 'false' must be interpreted as True for testEvent (rule 006)
- elif key == 'test_event' and not isinstance(d[key], bool):
- d[key] = True
- # Promote the 'text' item
- elif isinstance(d[key], dict) and "text" in d[key] and len(d[key]) == 1:
- if key == 'uid':
- d[key] = int(d[key]["text"])
- else:
- d[key] = d[key]["text"]
- # Promote a 'date-time' item
- elif isinstance(d[key], dict) and "date_time" in d[key] and len(d[key]) == 1:
- d[key] = d[key]["date_time"]
- # Promote 'properties' item, discard the unused? 'components' item
- elif isinstance(d[key], dict) and "properties" in d[key] and len(d[key]) <= 2:
- d[key] = d[key]["properties"]
- # Remove all empty dicts
- elif isinstance(d[key], dict) and len(d[key]) == 0:
- d.pop(key)
- return d
- def parse_datetime(value):
- """
- Parse an ISO8601 datetime into a datetime.datetime object.
- """
- matches = re.match(r'(\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2})\.?(\d{1,6})?\d*Z', value)
- if matches:
- year, month, day, hour, minute, second, microsecond = (int(value) for value in matches.groups())
- return datetime(year, month, day, hour, minute, second, microsecond=microsecond, tzinfo=timezone.utc)
- else:
- print(f"{value} did not match format")
- return value
- def parse_duration(value):
- """
- Parse a RFC5545 duration.
- """
- # TODO: implement the full regex: matches = re.match(r'(\+|\-)?P((\d+Y)?(\d+M)?(\d+D)?T?(\d+H)?(\d+M)?(\d+S)?)|(\d+W)', value)
- if isinstance(value, timedelta):
- return value
- matches = re.match(r'P(\d+(?:D|W))?T(\d+H)?(\d+M)?(\d+S)?', value)
- if not matches:
- return False
- days = hours = minutes = seconds = 0
- _days, _hours, _minutes, _seconds = matches.groups()
- if _days:
- if _days.endswith("D"):
- days = int(_days[:-1])
- elif _days.endswith("W"):
- days = int(_days[:-1]) * 7
- if _hours:
- hours = int(_hours[:-1])
- if _minutes:
- minutes = int(_minutes[:-1])
- if _seconds:
- seconds = int(_seconds[:-1])
- return timedelta(days=days, hours=hours, minutes=minutes, seconds=seconds)
- def parse_int(value):
- matches = re.match(r'^[\d-]+$', value)
- if not matches:
- return False
- else:
- return int(value)
- def parse_float(value):
- matches = re.match(r'^[\d.-]+$', value)
- if not matches:
- return False
- else:
- return float(value)
- def parse_boolean(value):
- if value == 'true':
- return True
- else:
- return False
- def peek(iterable):
- """
- Peek into an iterable.
- """
- try:
- first = next(iterable)
- except StopIteration:
- return None
- else:
- return itertools.chain([first], iterable)
- def datetimeformat(value, format=DATETIME_FORMAT):
- """
- Format a given datetime as a UTC ISO3339 string.
- """
- if not isinstance(value, datetime):
- return value
- return value.astimezone(timezone.utc).strftime(format)
- def timedeltaformat(value):
- """
- Format a timedelta to a RFC5545 Duration.
- """
- if not isinstance(value, timedelta):
- return value
- days = value.days
- hours, seconds = divmod(value.seconds, 3600)
- minutes, seconds = divmod(seconds, 60)
- formatted = "P"
- if days:
- formatted += f"{days}D"
- if hours or minutes or seconds:
- formatted += f"T"
- if hours:
- formatted += f"{hours}H"
- if minutes:
- formatted += f"{minutes}M"
- if seconds:
- formatted += f"{seconds}S"
- return formatted
- def booleanformat(value):
- """
- Format a boolean value
- """
- if isinstance(value, bool):
- if value == True:
- return "true"
- elif value == False:
- return "false"
- elif value in ("true", "false"):
- return value
- else:
- raise ValueError(f"A boolean value must be provided, not {value}.")
- def ensure_bytes(obj):
- """
- Converts a utf-8 str object to bytes.
- """
- if isinstance(obj, bytes):
- return obj
- if isinstance(obj, str):
- return bytes(obj, 'utf-8')
- else:
- raise TypeError("Must be bytes or str")
- def ensure_str(obj):
- """
- Converts bytes to a utf-8 string.
- """
- if isinstance(obj, str):
- return obj
- if isinstance(obj, bytes):
- return obj.decode('utf-8')
- else:
- raise TypeError("Must be bytes or str")
- def certificate_fingerprint(certificate_str):
- """
- Calculate the fingerprint for the given certificate, as defined by OpenADR.
- """
- der_cert = ssl.PEM_cert_to_DER_cert(ensure_str(certificate_str))
- hash = hashlib.sha256(der_cert).digest().hex()
- return ":".join([hash[i-2:i].upper() for i in range(-20, 0, 2)])
- def extract_pem_cert(tree):
- """
- Extract a given X509 certificate inside an XML tree and return the standard
- form of a PEM-encoded certificate.
- :param tree lxml.etree: The tree that contains the X509 element. This is
- usually the KeyInfo element from the XMLDsig Signature
- part of the message.
- """
- cert = tree.find('.//{http://www.w3.org/2000/09/xmldsig#}X509Certificate').text
- return "-----BEGIN CERTIFICATE-----\n" + cert + "-----END CERTIFICATE-----\n"
|