utils.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802
  1. # SPDX-License-Identifier: Apache-2.0
  2. # Copyright 2020 Contributors to OpenLEADR
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. # Unless required by applicable law or agreed to in writing, software
  8. # distributed under the License is distributed on an "AS IS" BASIS,
  9. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. # See the License for the specific language governing permissions and
  11. # limitations under the License.
  12. from datetime import datetime, timedelta, timezone
  13. from dataclasses import is_dataclass, asdict
  14. from collections import OrderedDict
  15. from openleadr import enums, objects
  16. import asyncio
  17. import re
  18. import ssl
  19. import hashlib
  20. import uuid
  21. import logging
  22. logger = logging.getLogger('openleadr')
  23. DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
  24. DATETIME_FORMAT_NO_MICROSECONDS = "%Y-%m-%dT%H:%M:%SZ"
  25. def generate_id(*args, **kwargs):
  26. """
  27. Generate a string that can be used as an identifier in OpenADR messages.
  28. """
  29. return str(uuid.uuid4())
  30. def flatten_xml(message):
  31. """
  32. Flatten the entire XML structure.
  33. """
  34. lines = [line.strip() for line in message.split("\n") if line.strip() != ""]
  35. for line in lines:
  36. line = re.sub(r'\n', '', line)
  37. line = re.sub(r'\s\s+', ' ', line)
  38. return "".join(lines)
  39. def normalize_dict(ordered_dict):
  40. """
  41. Main conversion function for the output of xmltodict to the OpenLEADR
  42. representation of OpenADR contents.
  43. :param ordered_dict dict: The OrderedDict, dict or dataclass that you wish to convert.
  44. """
  45. if is_dataclass(ordered_dict):
  46. ordered_dict = asdict(ordered_dict)
  47. def normalize_key(key):
  48. if key.startswith('oadr'):
  49. key = key[4:]
  50. elif key.startswith('ei'):
  51. key = key[2:]
  52. # Don't normalize the measurement descriptions
  53. if key in enums._MEASUREMENT_NAMESPACES:
  54. return key
  55. key = re.sub(r'([a-z])([A-Z])', r'\1_\2', key)
  56. if '-' in key:
  57. key = key.replace('-', '_')
  58. return key.lower()
  59. d = {}
  60. for key, value in ordered_dict.items():
  61. # Interpret values from the dict
  62. if key.startswith("@"):
  63. continue
  64. key = normalize_key(key)
  65. if isinstance(value, (OrderedDict, dict)):
  66. d[key] = normalize_dict(value)
  67. elif isinstance(value, list):
  68. d[key] = []
  69. for item in value:
  70. if isinstance(item, (OrderedDict, dict)):
  71. dict_item = normalize_dict(item)
  72. d[key].append(normalize_dict(dict_item))
  73. else:
  74. d[key].append(item)
  75. elif key in ("duration", "startafter", "max_period", "min_period"):
  76. d[key] = parse_duration(value)
  77. elif ("date_time" in key or key == "dtstart") and isinstance(value, str):
  78. d[key] = parse_datetime(value)
  79. elif value in ('true', 'false'):
  80. d[key] = parse_boolean(value)
  81. elif isinstance(value, str):
  82. if re.match(r'^-?\d+$', value):
  83. d[key] = int(value)
  84. elif re.match(r'^-?[\d.]+$', value):
  85. d[key] = float(value)
  86. else:
  87. d[key] = value
  88. else:
  89. d[key] = value
  90. # Do our best to make the dictionary structure as pythonic as possible
  91. if key.startswith("x_ei_"):
  92. d[key[5:]] = d.pop(key)
  93. key = key[5:]
  94. # Group all targets as a list of dicts under the key "target"
  95. if key == 'target':
  96. targets = d.pop(key)
  97. new_targets = []
  98. if targets:
  99. for ikey in targets:
  100. if isinstance(targets[ikey], list):
  101. new_targets.extend([{ikey: value} for value in targets[ikey]])
  102. else:
  103. new_targets.append({ikey: targets[ikey]})
  104. d[key + "s"] = new_targets
  105. key = key + "s"
  106. # Also add a targets_by_type element to this dict
  107. # to access the targets in a more convenient way.
  108. d['targets_by_type'] = group_targets_by_type(new_targets)
  109. # Group all reports as a list of dicts under the key "pending_reports"
  110. if key == "pending_reports":
  111. if isinstance(d[key], dict) and 'report_request_id' in d[key] \
  112. and isinstance(d[key]['report_request_id'], list):
  113. d['pending_reports'] = [{'request_id': rrid}
  114. for rrid in d['pending_reports']['report_request_id']]
  115. # Group all events al a list of dicts under the key "events"
  116. elif key == "event" and isinstance(d[key], list):
  117. events = d.pop("event")
  118. new_events = []
  119. for event in events:
  120. new_event = event['event']
  121. new_event['response_required'] = event['response_required']
  122. new_events.append(new_event)
  123. d["events"] = new_events
  124. # If there's only one event, also put it into a list
  125. elif key == "event" and isinstance(d[key], dict) and "event" in d[key]:
  126. oadr_event = d.pop('event')
  127. ei_event = oadr_event['event']
  128. ei_event['response_required'] = oadr_event['response_required']
  129. d['events'] = [ei_event]
  130. elif key in ("request_event", "created_event") and isinstance(d[key], dict):
  131. d = d[key]
  132. # Plurarize some lists
  133. elif key in ('report_request', 'report', 'specifier_payload'):
  134. if isinstance(d[key], list):
  135. d[key + 's'] = d.pop(key)
  136. else:
  137. d[key + 's'] = [d.pop(key)]
  138. elif key in ('report_description', 'event_signal'):
  139. descriptions = d.pop(key)
  140. if not isinstance(descriptions, list):
  141. descriptions = [descriptions]
  142. for description in descriptions:
  143. # We want to make the identification of the measurement universal
  144. for measurement in enums._MEASUREMENT_NAMESPACES:
  145. if measurement in description:
  146. name, item = measurement, description.pop(measurement)
  147. break
  148. else:
  149. break
  150. item['description'] = item.pop('item_description', None)
  151. item['unit'] = item.pop('item_units', None)
  152. if 'si_scale_code' in item:
  153. item['scale'] = item.pop('si_scale_code')
  154. if 'pulse_factor' in item:
  155. item['pulse_factor'] = item.pop('pulse_factor')
  156. description['measurement'] = {'name': name,
  157. **item}
  158. d[key + 's'] = descriptions
  159. # Promote the contents of the Qualified Event ID
  160. elif key == "qualified_event_id" and isinstance(d['qualified_event_id'], dict):
  161. qeid = d.pop('qualified_event_id')
  162. d['event_id'] = qeid['event_id']
  163. d['modification_number'] = qeid['modification_number']
  164. # Durations are encapsulated in their own object, remove this nesting
  165. elif isinstance(d[key], dict) and "duration" in d[key] and len(d[key]) == 1:
  166. d[key] = d[key]["duration"]
  167. # In general, remove all double nesting
  168. elif isinstance(d[key], dict) and key in d[key] and len(d[key]) == 1:
  169. d[key] = d[key][key]
  170. # In general, remove the double nesting of lists of items
  171. elif isinstance(d[key], dict) and key[:-1] in d[key] and len(d[key]) == 1:
  172. if isinstance(d[key][key[:-1]], list):
  173. d[key] = d[key][key[:-1]]
  174. else:
  175. d[key] = [d[key][key[:-1]]]
  176. # Payload values are wrapped in an object according to their type. We don't need that.
  177. elif key in ("signal_payload", "current_value"):
  178. value = d[key]
  179. if isinstance(d[key], dict):
  180. if 'payload_float' in d[key] and 'value' in d[key]['payload_float'] \
  181. and d[key]['payload_float']['value'] is not None:
  182. d[key] = float(d[key]['payload_float']['value'])
  183. elif 'payload_int' in d[key] and 'value' in d[key]['payload_int'] \
  184. and d[key]['payload_int'] is not None:
  185. d[key] = int(d[key]['payload_int']['value'])
  186. # Report payloads contain an r_id and a type-wrapped payload_float
  187. elif key == 'report_payload':
  188. if 'payload_float' in d[key] and 'value' in d[key]['payload_float']:
  189. v = d[key].pop('payload_float')
  190. d[key]['value'] = float(v['value'])
  191. elif 'payload_int' in d[key] and 'value' in d[key]['payload_int']:
  192. v = d[key].pop('payload_float')
  193. d[key]['value'] = int(v['value'])
  194. # All values other than 'false' must be interpreted as True for testEvent (rule 006)
  195. elif key == 'test_event' and not isinstance(d[key], bool):
  196. d[key] = True
  197. # Promote the 'text' item
  198. elif isinstance(d[key], dict) and "text" in d[key] and len(d[key]) == 1:
  199. if key == 'uid':
  200. d[key] = int(d[key]["text"])
  201. else:
  202. d[key] = d[key]["text"]
  203. # Promote a 'date-time' item
  204. elif isinstance(d[key], dict) and "date_time" in d[key] and len(d[key]) == 1:
  205. d[key] = d[key]["date_time"]
  206. # Promote 'properties' item, discard the unused? 'components' item
  207. elif isinstance(d[key], dict) and "properties" in d[key] and len(d[key]) <= 2:
  208. d[key] = d[key]["properties"]
  209. # Remove all empty dicts
  210. elif isinstance(d[key], dict) and len(d[key]) == 0:
  211. d.pop(key)
  212. return d
  213. def parse_datetime(value):
  214. """
  215. Parse an ISO8601 datetime into a datetime.datetime object.
  216. """
  217. matches = re.match(r'(\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2})\.?(\d{1,6})?\d*Z', value)
  218. if matches:
  219. year, month, day, hour, minute, second = (int(value)for value in matches.groups()[:-1])
  220. micro = matches.groups()[-1]
  221. if micro is None:
  222. micro = 0
  223. else:
  224. micro = int(micro + "0" * (6 - len(micro)))
  225. return datetime(year, month, day, hour, minute, second, micro, tzinfo=timezone.utc)
  226. else:
  227. logger.warning(f"parse_datetime: {value} did not match format")
  228. return value
  229. def parse_duration(value):
  230. """
  231. Parse a RFC5545 duration.
  232. """
  233. if isinstance(value, timedelta):
  234. return value
  235. regex = r'(\+|\-)?P(?:(?:(\d+)Y)?(?:(\d+)M)?(?:(\d+)D)?T?(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?)|(?:(\d+)W)'
  236. matches = re.match(regex, value)
  237. if not matches:
  238. raise ValueError(f"The duration '{value}' did not match the requested format")
  239. years, months, days, hours, minutes, seconds, weeks = (int(g) if g else 0 for g in matches.groups()[1:])
  240. if years != 0:
  241. logger.warning("Received a duration that specifies years, which is not a determinate duration. "
  242. "It will be interpreted as 1 year = 365 days.")
  243. days = days + 365 * years
  244. if months != 0:
  245. logger.warning("Received a duration that specifies months, which is not a determinate duration "
  246. "It will be interpreted as 1 month = 30 days.")
  247. days = days + 30 * months
  248. duration = timedelta(weeks=weeks, days=days, hours=hours, minutes=minutes, seconds=seconds)
  249. if matches.groups()[0] == "-":
  250. duration = -1 * duration
  251. return duration
  252. def parse_boolean(value):
  253. if value == 'true':
  254. return True
  255. else:
  256. return False
  257. def datetimeformat(value, format=DATETIME_FORMAT):
  258. """
  259. Format a given datetime as a UTC ISO3339 string.
  260. """
  261. if not isinstance(value, datetime):
  262. return value
  263. return value.astimezone(timezone.utc).strftime(format)
  264. def timedeltaformat(value):
  265. """
  266. Format a timedelta to a RFC5545 Duration.
  267. """
  268. if not isinstance(value, timedelta):
  269. return value
  270. days = value.days
  271. hours, seconds = divmod(value.seconds, 3600)
  272. minutes, seconds = divmod(seconds, 60)
  273. formatted = "P"
  274. if days:
  275. formatted += f"{days}D"
  276. if hours or minutes or seconds:
  277. formatted += "T"
  278. if hours:
  279. formatted += f"{hours}H"
  280. if minutes:
  281. formatted += f"{minutes}M"
  282. if seconds:
  283. formatted += f"{seconds}S"
  284. return formatted
  285. def booleanformat(value):
  286. """
  287. Format a boolean value
  288. """
  289. if isinstance(value, bool):
  290. if value is True:
  291. return "true"
  292. elif value is False:
  293. return "false"
  294. elif value in ("true", "false"):
  295. return value
  296. else:
  297. raise ValueError(f"A boolean value must be provided, not {value}.")
  298. def ensure_bytes(obj):
  299. """
  300. Converts a utf-8 str object to bytes.
  301. """
  302. if obj is None:
  303. return obj
  304. if isinstance(obj, bytes):
  305. return obj
  306. if isinstance(obj, str):
  307. return bytes(obj, 'utf-8')
  308. else:
  309. raise TypeError("Must be bytes or str")
  310. def ensure_str(obj):
  311. """
  312. Converts bytes to a utf-8 string.
  313. """
  314. if obj is None:
  315. return None
  316. if isinstance(obj, str):
  317. return obj
  318. if isinstance(obj, bytes):
  319. return obj.decode('utf-8')
  320. else:
  321. raise TypeError("Must be bytes or str")
  322. def certificate_fingerprint_from_der(der_bytes):
  323. hash = hashlib.sha256(der_bytes).digest().hex()
  324. return ":".join([hash[i-2:i].upper() for i in range(-20, 0, 2)])
  325. def certificate_fingerprint(certificate_str):
  326. """
  327. Calculate the fingerprint for the given certificate, as defined by OpenADR.
  328. """
  329. der_bytes = ssl.PEM_cert_to_DER_cert(ensure_str(certificate_str))
  330. return certificate_fingerprint_from_der(der_bytes)
  331. def extract_pem_cert(tree):
  332. """
  333. Extract a given X509 certificate inside an XML tree and return the standard
  334. form of a PEM-encoded certificate.
  335. :param tree lxml.etree: The tree that contains the X509 element. This is
  336. usually the KeyInfo element from the XMLDsig Signature
  337. part of the message.
  338. """
  339. cert = tree.find('.//{http://www.w3.org/2000/09/xmldsig#}X509Certificate').text
  340. return "-----BEGIN CERTIFICATE-----\n" + cert + "-----END CERTIFICATE-----\n"
  341. def find_by(dict_or_list, key, value, *args):
  342. """
  343. Find a dict inside a dict or list by key, value properties.
  344. """
  345. search_params = [(key, value)]
  346. if args:
  347. search_params += [(args[i], args[i+1]) for i in range(0, len(args), 2)]
  348. if isinstance(dict_or_list, dict):
  349. dict_or_list = dict_or_list.values()
  350. for item in dict_or_list:
  351. if not isinstance(item, dict):
  352. _item = item.__dict__
  353. else:
  354. _item = item
  355. for key, value in search_params:
  356. if isinstance(value, tuple):
  357. if key not in _item or _item[key] not in value:
  358. break
  359. else:
  360. if key not in _item or _item[key] != value:
  361. break
  362. else:
  363. return item
  364. else:
  365. return None
  366. def group_by(list_, key, pop_key=False):
  367. """
  368. Return a dict that groups values
  369. """
  370. grouped = {}
  371. key_path = key.split(".")
  372. for item in list_:
  373. value = item
  374. for key in key_path:
  375. value = value.get(key)
  376. if value not in grouped:
  377. grouped[value] = []
  378. grouped[value].append(item)
  379. return grouped
  380. def cron_config(interval, randomize_seconds=False):
  381. """
  382. Returns a dict with cron settings for the given interval
  383. """
  384. if interval < timedelta(minutes=1):
  385. second = f"*/{interval.seconds}"
  386. minute = "*"
  387. hour = "*"
  388. elif interval < timedelta(hours=1):
  389. second = "0"
  390. minute = f"*/{int(interval.total_seconds()/60)}"
  391. hour = "*"
  392. elif interval < timedelta(hours=24):
  393. second = "0"
  394. minute = "0"
  395. hour = f"*/{int(interval.total_seconds()/3600)}"
  396. else:
  397. second = "0"
  398. minute = "0"
  399. hour = "0"
  400. cron_config = {"second": second, "minute": minute, "hour": hour}
  401. if randomize_seconds:
  402. jitter = min(int(interval.total_seconds() / 10), 300)
  403. cron_config['jitter'] = jitter
  404. return cron_config
  405. def get_cert_fingerprint_from_request(request):
  406. ssl_object = request.transport.get_extra_info('ssl_object')
  407. if ssl_object:
  408. der_bytes = ssl_object.getpeercert(binary_form=True)
  409. if der_bytes:
  410. return certificate_fingerprint_from_der(der_bytes)
  411. def group_targets_by_type(list_of_targets):
  412. targets_by_type = {}
  413. for target in list_of_targets:
  414. for key, value in target.items():
  415. if value is None:
  416. continue
  417. if key not in targets_by_type:
  418. targets_by_type[key] = []
  419. targets_by_type[key].append(value)
  420. return targets_by_type
  421. def ungroup_targets_by_type(targets_by_type):
  422. ungrouped_targets = []
  423. for target_type, targets in targets_by_type.items():
  424. if isinstance(targets, list):
  425. for target in targets:
  426. ungrouped_targets.append({target_type: target})
  427. elif isinstance(targets, str):
  428. ungrouped_targets.append({target_type: targets})
  429. return ungrouped_targets
  430. def validate_report_measurement_dict(measurement):
  431. from openleadr.enums import _ACCEPTABLE_UNITS, _MEASUREMENT_DESCRIPTIONS
  432. if 'name' not in measurement \
  433. or 'description' not in measurement \
  434. or 'unit' not in measurement:
  435. raise ValueError("The measurement dict must contain the following keys: "
  436. "'name', 'description', 'unit'. Please correct this.")
  437. name = measurement['name']
  438. description = measurement['description']
  439. unit = measurement['unit']
  440. # Validate the item name and description match
  441. if name in _MEASUREMENT_DESCRIPTIONS:
  442. required_description = _MEASUREMENT_DESCRIPTIONS[name]
  443. if description != required_description:
  444. if description.lower() == required_description.lower():
  445. logger.warning(f"The description for the measurement with name '{name}' "
  446. f"was not in the correct case; you provided '{description}' but "
  447. f"it should be '{required_description}'. "
  448. "This was automatically corrected.")
  449. measurement['description'] = required_description
  450. else:
  451. raise ValueError(f"The measurement's description '{description}' "
  452. f"did not match the expected description for this type "
  453. f" ('{required_description}'). Please correct this, or use "
  454. "'customUnit' as the name.")
  455. if unit not in _ACCEPTABLE_UNITS[name]:
  456. raise ValueError(f"The unit '{unit}' is not acceptable for measurement '{name}'. Allowed "
  457. f"units are: '" + "', '".join(_ACCEPTABLE_UNITS[name]) + "'.")
  458. else:
  459. if name != 'customUnit':
  460. logger.warning(f"You provided a measurement with an unknown name {name}. "
  461. "This was corrected to 'customUnit'. Please correct this in your "
  462. "report definition.")
  463. measurement['name'] = 'customUnit'
  464. if 'power' in name:
  465. if 'power_attributes' in measurement:
  466. power_attributes = measurement['power_attributes']
  467. if 'voltage' not in power_attributes \
  468. or 'ac' not in power_attributes \
  469. or 'hertz' not in power_attributes:
  470. raise ValueError("The power_attributes of the measurement must contain the "
  471. "following keys: 'voltage' (int), 'ac' (bool), 'hertz' (int).")
  472. else:
  473. raise ValueError("A 'power' related measurement must contain a "
  474. "'power_attributes' section that contains the following "
  475. "keys: 'voltage' (int), 'ac' (boolean), 'hertz' (int)")
  476. def get_active_period_from_intervals(intervals, as_dict=True):
  477. if is_dataclass(intervals[0]):
  478. intervals = [asdict(i) for i in intervals]
  479. period_start = min([i['dtstart'] for i in intervals])
  480. period_duration = max([i['dtstart'] + i['duration'] - period_start for i in intervals])
  481. if as_dict:
  482. return {'dtstart': period_start,
  483. 'duration': period_duration}
  484. else:
  485. from openleadr.objects import ActivePeriod
  486. return ActivePeriod(dtstart=period_start, duration=period_duration)
  487. def determine_event_status(active_period):
  488. now = datetime.now(timezone.utc)
  489. active_period_start = getmember(active_period, 'dtstart')
  490. if active_period_start.tzinfo is None:
  491. active_period_start = active_period_start.astimezone(timezone.utc)
  492. setmember(active_period, 'dtstart', active_period_start)
  493. active_period_end = active_period_start + getmember(active_period, 'duration')
  494. if now >= active_period_end:
  495. return 'completed'
  496. if now >= active_period_start:
  497. return 'active'
  498. if getmember(active_period, 'ramp_up_period', None) is not None:
  499. ramp_up_start = active_period_start - getmember(active_period, 'ramp_up_period')
  500. if now >= ramp_up_start:
  501. return 'near'
  502. return 'far'
  503. async def delayed_call(func, delay):
  504. try:
  505. if isinstance(delay, timedelta):
  506. delay = delay.total_seconds()
  507. await asyncio.sleep(delay)
  508. if asyncio.iscoroutinefunction(func):
  509. await func()
  510. elif asyncio.iscoroutine(func):
  511. await func
  512. else:
  513. func()
  514. except asyncio.CancelledError:
  515. pass
  516. def hasmember(obj, member):
  517. """
  518. Check if a dict or dataclass has the given member
  519. """
  520. if is_dataclass(obj):
  521. if hasattr(obj, member):
  522. return True
  523. else:
  524. if member in obj:
  525. return True
  526. return False
  527. def getmember(obj, member, missing='_RAISE_'):
  528. """
  529. Get a member from a dict or dataclass
  530. """
  531. if is_dataclass(obj):
  532. if not missing == '_RAISE_' and not hasattr(obj, member):
  533. return missing
  534. else:
  535. return getattr(obj, member)
  536. else:
  537. if missing == '_RAISE_':
  538. return obj[member]
  539. else:
  540. return obj.get(member, missing)
  541. def setmember(obj, member, value):
  542. """
  543. Set a member of a dict of dataclass
  544. """
  545. if is_dataclass(obj):
  546. setattr(obj, member, value)
  547. else:
  548. obj[member] = value
  549. def get_next_event_from_deque(deque):
  550. unused_elements = []
  551. event = None
  552. for i in range(len(deque)):
  553. msg = deque.popleft()
  554. if isinstance(msg, objects.Event) or (isinstance(msg, dict) and 'event_descriptor' in msg):
  555. event = msg
  556. break
  557. else:
  558. unused_elements.append(msg)
  559. deque.extend(unused_elements)
  560. return event
  561. def validate_report_request_tuples(list_of_report_requests, mode='full'):
  562. if len(list_of_report_requests) == 0:
  563. return
  564. for report_requests in list_of_report_requests:
  565. if report_requests is None:
  566. continue
  567. for i, rrq in enumerate(report_requests):
  568. if rrq is None:
  569. continue
  570. # Check if it is a tuple
  571. elif not isinstance(rrq, tuple):
  572. report_requests[i] = None
  573. if mode == 'full':
  574. logger.error("Your on_register_report handler did not return a list of tuples. "
  575. f"The first item from the list was '{rrq}' ({rrq.__class__.__name__}).")
  576. else:
  577. logger.error("Your on_register_report handler did not return a tuple. "
  578. f"It returned '{rrq}'. Please see the documentation for the correct format.")
  579. # Check if it has the correct length
  580. elif not len(rrq) in (3, 4):
  581. report_requests[i] = None
  582. if mode == 'full':
  583. logger.error("Your on_register_report handler returned tuples of the wrong length. "
  584. f"It should be 3 or 4. It returned: '{rrq}'.")
  585. else:
  586. logger.error("Your on_register_report handler returned a tuple of the wrong length. "
  587. f"It should be 2 or 3. It returned: '{rrq[1:]}'.")
  588. # Check if the first element is callable
  589. elif not callable(rrq[1]):
  590. report_requests[i] = None
  591. if mode == 'full':
  592. logger.error(f"Your on_register_report handler did not return the correct tuple. "
  593. "It should return a list of (r_id, callback, sampling_interval) or "
  594. "(r_id, callback, sampling_interval, reporting_interval) tuples, where "
  595. "the r_id is a string, callback is a callable function or coroutine, and "
  596. "sampling_interval and reporting_interval are of type datetime.timedelta. "
  597. f"It returned: '{rrq}'. The second element was not callable.")
  598. else:
  599. logger.error(f"Your on_register_report handler did not return the correct tuple. "
  600. "It should return a (callback, sampling_interval) or "
  601. "(callback, sampling_interval, reporting_interval) tuple, where "
  602. "the callback is a callable function or coroutine, and "
  603. "sampling_interval and reporting_interval are of type datetime.timedelta. "
  604. f"It returned: '{rrq[1:]}'. The first element was not callable.")
  605. # Check if the second element is a timedelta
  606. elif not isinstance(rrq[2], timedelta):
  607. report_requests[i] = None
  608. if mode == 'full':
  609. logger.error(f"Your on_register_report handler did not return the correct tuple. "
  610. "It should return a list of (r_id, callback, sampling_interval) or "
  611. "(r_id, callback, sampling_interval, reporting_interval) tuples, where "
  612. "sampling_interval and reporting_interval are of type datetime.timedelta. "
  613. f"It returned: '{rrq}'. The third element was not of type timedelta.")
  614. else:
  615. logger.error(f"Your on_register_report handler did not return the correct tuple. "
  616. "It should return a (callback, sampling_interval) or "
  617. "(callback, sampling_interval, reporting_interval) tuple, where "
  618. "sampling_interval and reporting_interval are of type datetime.timedelta. "
  619. f"It returned: '{rrq[1:]}'. The second element was not of type timedelta.")
  620. # Check if the third element is a timedelta (if it exists)
  621. elif len(rrq) == 4 and not isinstance(rrq[3], timedelta):
  622. report_requests[i] = None
  623. if mode == 'full':
  624. logger.error(f"Your on_register_report handler did not return the correct tuple. "
  625. "It should return a list of (r_id, callback, sampling_interval) or "
  626. "(r_id, callback, sampling_interval, reporting_interval) tuples, where "
  627. "sampling_interval and reporting_interval are of type datetime.timedelta. "
  628. f"It returned: '{rrq}'. The fourth element was not of type timedelta.")
  629. else:
  630. logger.error(f"Your on_register_report handler did not return the correct tuple. "
  631. "It should return a (callback, sampling_interval) or "
  632. "(callback, sampling_interval, reporting_interval) tuple, where "
  633. "sampling_interval and reporting_interval are of type datetime.timedelta. "
  634. f"It returned: '{rrq[1:]}'. The third element was not of type timedelta.")
  635. async def await_if_required(result):
  636. if asyncio.iscoroutine(result):
  637. result = await result
  638. return result
  639. async def gather_if_required(results):
  640. if results is None:
  641. return results
  642. if len(results) > 0:
  643. if not any([asyncio.iscoroutine(r) for r in results]):
  644. results = results
  645. elif all([asyncio.iscoroutine(r) for r in results]):
  646. results = await asyncio.gather(*results)
  647. else:
  648. results = [await await_if_required(result) for result in results]
  649. return results
  650. def order_events(events, limit=None, offset=None):
  651. """
  652. Order the events according to the OpenADR rules:
  653. - active events before inactive events
  654. - high priority before low priority
  655. - earlier before later
  656. """
  657. def event_priority(event):
  658. # The default and lowest priority is 0, which we should interpret as a high value.
  659. priority = getmember(getmember(event, 'event_descriptor'), 'priority', float('inf'))
  660. if priority == 0:
  661. priority = float('inf')
  662. return priority
  663. if events is None:
  664. return None
  665. if isinstance(events, objects.Event):
  666. events = [events]
  667. elif isinstance(events, dict):
  668. events = [events]
  669. # Update the event statuses
  670. for event in events:
  671. event_status = determine_event_status(getmember(event, 'active_period'))
  672. setmember(getmember(event, 'event_descriptor'), 'event_status', event_status)
  673. # Short circuit if we only have one event:
  674. if len(events) == 1:
  675. return events
  676. # Get all the active events first
  677. active_events = [event for event in events if getmember(getmember(event, 'event_descriptor'), 'event_status') == 'active']
  678. other_events = [event for event in events if getmember(getmember(event, 'event_descriptor'), 'event_status') != 'active']
  679. # Sort the active events by priority
  680. active_events.sort(key=lambda e: event_priority(e))
  681. # Sort the active events by start date
  682. active_events.sort(key=lambda e: getmember(getmember(e, 'active_period'), 'dtstart'))
  683. # Sort the non-active events by their start date
  684. other_events.sort(key=lambda e: getmember(getmember(e, 'active_period'), 'dtstart'))
  685. ordered_events = active_events + other_events
  686. if limit and offset:
  687. return ordered_events[offset:offset+limit]
  688. return ordered_events