utils.py 24 KB


  1. # SPDX-License-Identifier: Apache-2.0
  2. # Copyright 2020 Contributors to OpenLEADR
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. # Unless required by applicable law or agreed to in writing, software
  8. # distributed under the License is distributed on an "AS IS" BASIS,
  9. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. # See the License for the specific language governing permissions and
  11. # limitations under the License.
  12. from datetime import datetime, timedelta, timezone
  13. from dataclasses import is_dataclass, asdict
  14. from collections import OrderedDict
  15. import asyncio
  16. import itertools
  17. import re
  18. import ssl
  19. import hashlib
  20. import uuid
  21. import logging
  22. logger = logging.getLogger('openleadr')
  23. DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
  24. DATETIME_FORMAT_NO_MICROSECONDS = "%Y-%m-%dT%H:%M:%SZ"
  25. def generate_id(*args, **kwargs):
  26. """
  27. Generate a string that can be used as an identifier in OpenADR messages.
  28. """
  29. return str(uuid.uuid4())
  30. def indent_xml(message):
  31. """
  32. Indents the XML in a nice way.
  33. """
  34. INDENT_SIZE = 2
  35. lines = [line.strip() for line in message.split("\n") if line.strip() != ""]
  36. indent = 0
  37. for i, line in enumerate(lines):
  38. if i == 0:
  39. continue
  40. if re.search(r'^</[^>]+>$', line):
  41. indent = indent - INDENT_SIZE
  42. lines[i] = " " * indent + line
  43. if not (re.search(r'</[^>]+>$', line) or line.endswith("/>")):
  44. indent = indent + INDENT_SIZE
  45. return "\n".join(lines)
  46. def flatten_xml(message):
  47. """
  48. Flatten the entire XML structure.
  49. """
  50. lines = [line.strip() for line in message.split("\n") if line.strip() != ""]
  51. for line in lines:
  52. line = re.sub(r'\n', '', line)
  53. line = re.sub(r'\s\s+', ' ', line)
  54. return "".join(lines)
  55. def normalize_dict(ordered_dict):
  56. """
  57. Main conversion function for the output of xmltodict to the OpenLEADR
  58. representation of OpenADR contents.
  59. :param ordered_dict dict: The OrderedDict, dict or dataclass that you wish to convert.
  60. """
  61. if is_dataclass(ordered_dict):
  62. ordered_dict = asdict(ordered_dict)
  63. def normalize_key(key):
  64. if key.startswith('oadr'):
  65. key = key[4:]
  66. elif key.startswith('ei'):
  67. key = key[2:]
  68. key = re.sub(r'([a-z])([A-Z])', r'\1_\2', key)
  69. if '-' in key:
  70. key = key.replace('-', '_')
  71. return key.lower()
  72. d = {}
  73. for key, value in ordered_dict.items():
  74. # Interpret values from the dict
  75. if key.startswith("@"):
  76. continue
  77. key = normalize_key(key)
  78. if isinstance(value, (OrderedDict, dict)):
  79. d[key] = normalize_dict(value)
  80. elif isinstance(value, list):
  81. d[key] = []
  82. for item in value:
  83. if isinstance(item, (OrderedDict, dict)):
  84. dict_item = normalize_dict(item)
  85. d[key].append(normalize_dict(dict_item))
  86. else:
  87. d[key].append(item)
  88. elif key in ("duration", "startafter", "max_period", "min_period"):
  89. d[key] = parse_duration(value)
  90. elif ("date_time" in key or key == "dtstart") and isinstance(value, str):
  91. d[key] = parse_datetime(value)
  92. elif value in ('true', 'false'):
  93. d[key] = parse_boolean(value)
  94. elif isinstance(value, str):
  95. if re.match(r'^-?\d+$', value):
  96. d[key] = int(value)
  97. elif re.match(r'^-?[\d.]+$', value):
  98. d[key] = float(value)
  99. else:
  100. d[key] = value
  101. else:
  102. d[key] = value
  103. # Do our best to make the dictionary structure as pythonic as possible
  104. if key.startswith("x_ei_"):
  105. d[key[5:]] = d.pop(key)
  106. key = key[5:]
  107. # Group all targets as a list of dicts under the key "target"
  108. if key == 'target':
  109. targets = d.pop(key)
  110. new_targets = []
  111. if targets:
  112. for ikey in targets:
  113. if isinstance(targets[ikey], list):
  114. new_targets.extend([{ikey: value} for value in targets[ikey]])
  115. else:
  116. new_targets.append({ikey: targets[ikey]})
  117. d[key + "s"] = new_targets
  118. key = key + "s"
  119. # Also add a targets_by_type element to this dict
  120. # to access the targets in a more convenient way.
  121. d['targets_by_type'] = group_targets_by_type(new_targets)
  122. # Group all reports as a list of dicts under the key "pending_reports"
  123. if key == "pending_reports":
  124. if isinstance(d[key], dict) and 'report_request_id' in d[key] \
  125. and isinstance(d[key]['report_request_id'], list):
  126. d['pending_reports'] = [{'request_id': rrid}
  127. for rrid in d['pending_reports']['report_request_id']]
  128. # Group all events al a list of dicts under the key "events"
  129. elif key == "event" and isinstance(d[key], list):
  130. events = d.pop("event")
  131. new_events = []
  132. for event in events:
  133. new_event = event['event']
  134. new_event['response_required'] = event['response_required']
  135. new_events.append(new_event)
  136. d["events"] = new_events
  137. # If there's only one event, also put it into a list
  138. elif key == "event" and isinstance(d[key], dict) and "event" in d[key]:
  139. oadr_event = d.pop('event')
  140. ei_event = oadr_event['event']
  141. ei_event['response_required'] = oadr_event['response_required']
  142. d['events'] = [ei_event]
  143. elif key in ("request_event", "created_event") and isinstance(d[key], dict):
  144. d = d[key]
  145. # Plurarize some lists
  146. elif key in ('report_request', 'report', 'specifier_payload'):
  147. if isinstance(d[key], list):
  148. d[key + 's'] = d.pop(key)
  149. else:
  150. d[key + 's'] = [d.pop(key)]
  151. elif key in ('report_description', 'event_signal'):
  152. descriptions = d.pop(key)
  153. if not isinstance(descriptions, list):
  154. descriptions = [descriptions]
  155. for description in descriptions:
  156. # We want to make the identification of the measurement universal
  157. if 'voltage' in description:
  158. name, item = 'voltage', description.pop('voltage')
  159. elif 'power_real' in description:
  160. name, item = 'powerReal', description.pop('power_real')
  161. elif 'power_apparent' in description:
  162. name, item = 'powerApparent', description.pop('power_apparent')
  163. elif 'power_reactive' in description:
  164. name, item = 'powerReactive', description.pop('power_reactive')
  165. elif 'energy_real' in description:
  166. name, item = 'energyReal', description.pop('energy_real')
  167. elif 'energy_apparent' in description:
  168. name, item = 'energyApparent', description.pop('energy_apparent')
  169. elif 'energy_reactive' in description:
  170. name, item = 'energyReactive', description.pop('energy_reactive')
  171. elif 'frequency' in description:
  172. name, item = 'frequency', description.pop('frequency')
  173. elif 'pulse_count' in description:
  174. name, item = 'pulseCount', description.pop('pulse_count')
  175. elif 'temperature' in description:
  176. name, item = 'temperature', description.pop('temperature')
  177. elif 'therm' in description:
  178. name, item = 'therm', description.pop('therm')
  179. elif 'currency' in description:
  180. name, item = 'currency', description.pop('currency')
  181. elif 'currency_per_kw' in description:
  182. name, item = 'currencyPerKW', description.pop('currency_per_kw')
  183. elif 'currency_per_kwh' in description:
  184. name, item = 'currencyPerKWh', description.pop('currency_per_kwh')
  185. elif 'currency_per_therm' in description:
  186. name, item = 'currencyPerTherm', description.pop('currency_per_therm')
  187. elif 'custom_unit' in description:
  188. name, item = 'customUnit', description.pop('custom_unit')
  189. else:
  190. break
  191. item['description'] = item.pop('item_description', None)
  192. item['unit'] = item.pop('item_units', None)
  193. item['scale'] = item.pop('si_scale_code', None)
  194. description['measurement'] = {'name': name,
  195. **item}
  196. d[key + 's'] = descriptions
  197. # Promote the contents of the Qualified Event ID
  198. elif key == "qualified_event_id" and isinstance(d['qualified_event_id'], dict):
  199. qeid = d.pop('qualified_event_id')
  200. d['event_id'] = qeid['event_id']
  201. d['modification_number'] = qeid['modification_number']
  202. # Durations are encapsulated in their own object, remove this nesting
  203. elif isinstance(d[key], dict) and "duration" in d[key] and len(d[key]) == 1:
  204. d[key] = d[key]["duration"]
  205. # In general, remove all double nesting
  206. elif isinstance(d[key], dict) and key in d[key] and len(d[key]) == 1:
  207. d[key] = d[key][key]
  208. # In general, remove the double nesting of lists of items
  209. elif isinstance(d[key], dict) and key[:-1] in d[key] and len(d[key]) == 1:
  210. if isinstance(d[key][key[:-1]], list):
  211. d[key] = d[key][key[:-1]]
  212. else:
  213. d[key] = [d[key][key[:-1]]]
  214. # Payload values are wrapped in an object according to their type. We don't need that.
  215. elif key in ("signal_payload", "current_value"):
  216. value = d[key]
  217. if isinstance(d[key], dict):
  218. if 'payload_float' in d[key] and 'value' in d[key]['payload_float'] \
  219. and d[key]['payload_float']['value'] is not None:
  220. d[key] = float(d[key]['payload_float']['value'])
  221. elif 'payload_int' in d[key] and 'value' in d[key]['payload_int'] \
  222. and d[key]['payload_int'] is not None:
  223. d[key] = int(d[key]['payload_int']['value'])
  224. # Report payloads contain an r_id and a type-wrapped payload_float
  225. elif key == 'report_payload':
  226. if 'payload_float' in d[key] and 'value' in d[key]['payload_float']:
  227. v = d[key].pop('payload_float')
  228. d[key]['value'] = float(v['value'])
  229. elif 'payload_int' in d[key] and 'value' in d[key]['payload_int']:
  230. v = d[key].pop('payload_float')
  231. d[key]['value'] = int(v['value'])
  232. # All values other than 'false' must be interpreted as True for testEvent (rule 006)
  233. elif key == 'test_event' and not isinstance(d[key], bool):
  234. d[key] = True
  235. # Promote the 'text' item
  236. elif isinstance(d[key], dict) and "text" in d[key] and len(d[key]) == 1:
  237. if key == 'uid':
  238. d[key] = int(d[key]["text"])
  239. else:
  240. d[key] = d[key]["text"]
  241. # Promote a 'date-time' item
  242. elif isinstance(d[key], dict) and "date_time" in d[key] and len(d[key]) == 1:
  243. d[key] = d[key]["date_time"]
  244. # Promote 'properties' item, discard the unused? 'components' item
  245. elif isinstance(d[key], dict) and "properties" in d[key] and len(d[key]) <= 2:
  246. d[key] = d[key]["properties"]
  247. # Remove all empty dicts
  248. elif isinstance(d[key], dict) and len(d[key]) == 0:
  249. d.pop(key)
  250. return d
  251. def parse_datetime(value):
  252. """
  253. Parse an ISO8601 datetime into a datetime.datetime object.
  254. """
  255. matches = re.match(r'(\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2})\.?(\d{1,6})?\d*Z', value)
  256. if matches:
  257. year, month, day, hour, minute, second, micro = (int(value) for value in matches.groups())
  258. return datetime(year, month, day, hour, minute, second, micro, tzinfo=timezone.utc)
  259. else:
  260. logger.warning(f"parse_datetime: {value} did not match format")
  261. return value
  262. def parse_duration(value):
  263. """
  264. Parse a RFC5545 duration.
  265. """
  266. # TODO: implement the full regex:
  267. # matches = re.match(r'(\+|\-)?P((\d+Y)?(\d+M)?(\d+D)?T?(\d+H)?(\d+M)?(\d+S)?)|(\d+W)', value)
  268. if isinstance(value, timedelta):
  269. return value
  270. matches = re.match(r'P(\d+(?:D|W))?T?(\d+H)?(\d+M)?(\d+S)?', value)
  271. if not matches:
  272. return False
  273. days = hours = minutes = seconds = 0
  274. _days, _hours, _minutes, _seconds = matches.groups()
  275. if _days:
  276. if _days.endswith("D"):
  277. days = int(_days[:-1])
  278. elif _days.endswith("W"):
  279. days = int(_days[:-1]) * 7
  280. if _hours:
  281. hours = int(_hours[:-1])
  282. if _minutes:
  283. minutes = int(_minutes[:-1])
  284. if _seconds:
  285. seconds = int(_seconds[:-1])
  286. return timedelta(days=days, hours=hours, minutes=minutes, seconds=seconds)
  287. def parse_boolean(value):
  288. if value == 'true':
  289. return True
  290. else:
  291. return False
  292. def peek(iterable):
  293. """
  294. Peek into an iterable.
  295. """
  296. try:
  297. first = next(iterable)
  298. except StopIteration:
  299. return None
  300. else:
  301. return itertools.chain([first], iterable)
  302. def datetimeformat(value, format=DATETIME_FORMAT):
  303. """
  304. Format a given datetime as a UTC ISO3339 string.
  305. """
  306. if not isinstance(value, datetime):
  307. return value
  308. return value.astimezone(timezone.utc).strftime(format)
  309. def timedeltaformat(value):
  310. """
  311. Format a timedelta to a RFC5545 Duration.
  312. """
  313. if not isinstance(value, timedelta):
  314. return value
  315. days = value.days
  316. hours, seconds = divmod(value.seconds, 3600)
  317. minutes, seconds = divmod(seconds, 60)
  318. formatted = "P"
  319. if days:
  320. formatted += f"{days}D"
  321. if hours or minutes or seconds:
  322. formatted += "T"
  323. if hours:
  324. formatted += f"{hours}H"
  325. if minutes:
  326. formatted += f"{minutes}M"
  327. if seconds:
  328. formatted += f"{seconds}S"
  329. return formatted
  330. def booleanformat(value):
  331. """
  332. Format a boolean value
  333. """
  334. if isinstance(value, bool):
  335. if value is True:
  336. return "true"
  337. elif value is False:
  338. return "false"
  339. elif value in ("true", "false"):
  340. return value
  341. else:
  342. raise ValueError(f"A boolean value must be provided, not {value}.")
  343. def ensure_bytes(obj):
  344. """
  345. Converts a utf-8 str object to bytes.
  346. """
  347. if obj is None:
  348. return obj
  349. if isinstance(obj, bytes):
  350. return obj
  351. if isinstance(obj, str):
  352. return bytes(obj, 'utf-8')
  353. else:
  354. raise TypeError("Must be bytes or str")
  355. def ensure_str(obj):
  356. """
  357. Converts bytes to a utf-8 string.
  358. """
  359. if obj is None:
  360. return None
  361. if isinstance(obj, str):
  362. return obj
  363. if isinstance(obj, bytes):
  364. return obj.decode('utf-8')
  365. else:
  366. raise TypeError("Must be bytes or str")
  367. def certificate_fingerprint_from_der(der_bytes):
  368. hash = hashlib.sha256(der_bytes).digest().hex()
  369. return ":".join([hash[i-2:i].upper() for i in range(-20, 0, 2)])
  370. def certificate_fingerprint(certificate_str):
  371. """
  372. Calculate the fingerprint for the given certificate, as defined by OpenADR.
  373. """
  374. der_bytes = ssl.PEM_cert_to_DER_cert(ensure_str(certificate_str))
  375. return certificate_fingerprint_from_der(der_bytes)
  376. def extract_pem_cert(tree):
  377. """
  378. Extract a given X509 certificate inside an XML tree and return the standard
  379. form of a PEM-encoded certificate.
  380. :param tree lxml.etree: The tree that contains the X509 element. This is
  381. usually the KeyInfo element from the XMLDsig Signature
  382. part of the message.
  383. """
  384. cert = tree.find('.//{http://www.w3.org/2000/09/xmldsig#}X509Certificate').text
  385. return "-----BEGIN CERTIFICATE-----\n" + cert + "-----END CERTIFICATE-----\n"
  386. def find_by(dict_or_list, key, value, *args):
  387. """
  388. Find a dict inside a dict or list by key, value properties.
  389. """
  390. search_params = [(key, value)]
  391. if args:
  392. search_params += [(args[i], args[i+1]) for i in range(0, len(args), 2)]
  393. if isinstance(dict_or_list, dict):
  394. dict_or_list = dict_or_list.values()
  395. for item in dict_or_list:
  396. if not isinstance(item, dict):
  397. _item = item.__dict__
  398. else:
  399. _item = item
  400. for key, value in search_params:
  401. if isinstance(value, tuple):
  402. if _item[key] not in value:
  403. break
  404. else:
  405. if _item[key] != value:
  406. break
  407. else:
  408. return item
  409. else:
  410. return None
  411. def group_by(list_, key, pop_key=False):
  412. """
  413. Return a dict that groups values
  414. """
  415. grouped = {}
  416. key_path = key.split(".")
  417. for item in list_:
  418. value = item
  419. for key in key_path:
  420. value = value.get(key)
  421. if value not in grouped:
  422. grouped[value] = []
  423. grouped[value].append(item)
  424. return grouped
  425. def cron_config(interval):
  426. """
  427. Returns a dict with cron settings for the given interval
  428. """
  429. if interval < timedelta(minutes=1):
  430. second = f"*/{interval.seconds}"
  431. minute = "*"
  432. hour = "*"
  433. elif interval < timedelta(hours=1):
  434. second = "0"
  435. minute = f"*/{int(interval.total_seconds/60)}"
  436. hour = "*"
  437. elif interval < timedelta(days=1):
  438. second = "0"
  439. minute = "0"
  440. hour = f"*/{int(interval.total_seconds/3600)}"
  441. return {"second": second, "minute": minute, "hour": hour}
  442. def get_cert_fingerprint_from_request(request):
  443. ssl_object = request.transport.get_extra_info('ssl_object')
  444. if ssl_object:
  445. der_bytes = ssl_object.getpeercert(binary_form=True)
  446. if der_bytes:
  447. return certificate_fingerprint_from_der(der_bytes)
  448. def get_certificate_common_name(request):
  449. cert = request.transport.get_extra_info('peercert')
  450. if cert:
  451. subject = dict(x[0] for x in cert['subject'])
  452. return subject.get('commonName')
  453. def group_targets_by_type(list_of_targets):
  454. targets_by_type = {}
  455. for target in list_of_targets:
  456. for key, value in target.items():
  457. if value is None:
  458. continue
  459. if key not in targets_by_type:
  460. targets_by_type[key] = []
  461. targets_by_type[key].append(value)
  462. return targets_by_type
  463. def ungroup_targets_by_type(targets_by_type):
  464. ungrouped_targets = []
  465. for target_type, targets in targets_by_type.items():
  466. if isinstance(targets, list):
  467. for target in targets:
  468. ungrouped_targets.append({target_type: target})
  469. elif isinstance(targets, str):
  470. ungrouped_targets.append({target_type: targets})
  471. return ungrouped_targets
  472. def validate_report_measurement_dict(measurement):
  473. from openleadr.enums import _ACCEPTABLE_UNITS, _MEASUREMENT_DESCRIPTIONS
  474. if 'name' not in measurement \
  475. or 'description' not in measurement \
  476. or 'unit' not in measurement:
  477. raise ValueError("The measurement dict must contain the following keys: "
  478. "'name', 'description', 'unit'. Please correct this.")
  479. name = measurement['name']
  480. description = measurement['description']
  481. unit = measurement['unit']
  482. # Validate the item name and description match
  483. if name in _MEASUREMENT_DESCRIPTIONS:
  484. required_description = _MEASUREMENT_DESCRIPTIONS[name]
  485. if description != required_description:
  486. if description.lower() == required_description.lower():
  487. logger.warning(f"The description for the measurement with name {name} "
  488. f"was not in the correct case; you provided {description} but "
  489. f"it should be {required_description}. "
  490. "This was automatically corrected.")
  491. measurement['description'] = required_description
  492. else:
  493. raise ValueError(f"The measurement's description {description} "
  494. f"did not match the expected description for this type "
  495. f" ({required_description}). Please correct this, or use "
  496. "'customUnit' as the name.")
  497. if unit not in _ACCEPTABLE_UNITS[name]:
  498. raise ValueError(f"The unit {unit} is not acceptable for measurement {name}. Allowed "
  499. f"units are {_ACCEPTABLE_UNITS[name]}.")
  500. else:
  501. if name != 'customUnit':
  502. logger.warning(f"You provided a measurement with an unknown name {name}. "
  503. "This was corrected to 'customUnit'. Please correct this in your "
  504. "report definition.")
  505. measurement['report_description']['name'] = 'customUnit'
  506. if 'power' in name:
  507. if 'power_attributes' in measurement:
  508. power_attributes = measurement['power_attributes']
  509. if 'voltage' not in power_attributes \
  510. or 'ac' not in power_attributes \
  511. or 'hertz' not in power_attributes:
  512. raise ValueError("The power_attributes of the measurement must contain the "
  513. "following keys: 'voltage' (int), 'ac' (bool), 'hertz' (int).")
  514. else:
  515. raise ValueError("A 'power' related measurement must contain a "
  516. "'power_attributes' section that contains the following "
  517. "keys: 'voltage' (int), 'ac' (boolean), 'hertz' (int)")
  518. def get_active_period_from_intervals(intervals, as_dict=True):
  519. if is_dataclass(intervals[0]):
  520. intervals = [asdict(i) for i in intervals]
  521. period_start = min([i['dtstart'] for i in intervals])
  522. period_duration = max([i['dtstart'] + i['duration'] - period_start for i in intervals])
  523. if as_dict:
  524. return {'dtstart': period_start,
  525. 'duration': period_duration}
  526. else:
  527. from openleadr.objects import ActivePeriod
  528. return ActivePeriod(dtstart=period_start, duration=period_duration)
  529. def determine_event_status(active_period):
  530. if is_dataclass(active_period):
  531. active_period = asdict(active_period)
  532. now = datetime.now(timezone.utc)
  533. if active_period['dtstart'].tzinfo is None:
  534. active_period['dtstart'] = active_period['dtstart'].astimezone(timezone.utc)
  535. active_period_start = active_period['dtstart']
  536. active_period_end = active_period['dtstart'] + active_period['duration']
  537. if now >= active_period_end:
  538. return 'completed'
  539. if now >= active_period_start:
  540. return 'active'
  541. if active_period.get('ramp_up_duration') is not None:
  542. ramp_up_start = active_period_start - active_period['ramp_up_duration']
  543. if now >= ramp_up_start:
  544. return 'near'
  545. return 'far'
  546. async def delayed_call(func, delay):
  547. if isinstance(delay, timedelta):
  548. delay = delay.total_seconds()
  549. await asyncio.sleep(delay)
  550. if asyncio.iscoroutinefunction(func):
  551. await func()
  552. elif asyncio.iscoroutine(func):
  553. await func
  554. else:
  555. func()
  556. def hasmember(obj, member):
  557. """
  558. Check if a dict or dataclass has the given member
  559. """
  560. if is_dataclass(obj):
  561. if hasattr(obj, member):
  562. return True
  563. else:
  564. if member in obj:
  565. return True
  566. return False
  567. def getmember(obj, member):
  568. """
  569. Get a member from a dict or dataclass
  570. """
  571. if is_dataclass(obj):
  572. return getattr(obj, member)
  573. else:
  574. return obj[member]
  575. def setmember(obj, member, value):
  576. """
  577. Set a member of a dict of dataclass
  578. """
  579. if is_dataclass(obj):
  580. setattr(obj, member, value)
  581. else:
  582. obj[member] = value