preflight.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. # SPDX-License-Identifier: Apache-2.0
  2. # Copyright 2020 Contributors to OpenLEADR
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. # Unless required by applicable law or agreed to in writing, software
  8. # distributed under the License is distributed on an "AS IS" BASIS,
  9. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. # See the License for the specific language governing permissions and
  11. # limitations under the License.
  12. from datetime import datetime, timedelta, timezone
  13. from dataclasses import asdict, is_dataclass
  14. from openleadr import enums, utils
  15. import logging
  16. logger = logging.getLogger('openleadr')
  17. def preflight_message(message_type, message_payload):
  18. """
  19. Tests message contents before sending them. It will correct benign errors
  20. and warn you about them. Uncorrectable errors will raise an Exception. It
  21. changes the message_payload dict in-place.
  22. :param message_type string: The type of message you are sending
  23. :param message_payload dict: The contents of the message
  24. """
  25. if f'_preflight_{message_type}' in globals():
  26. message_payload = message_payload.copy()
  27. for key, value in message_payload.items():
  28. if isinstance(value, list):
  29. message_payload[key] = [asdict(item) if is_dataclass(item) else item
  30. for item in value]
  31. else:
  32. message_payload[key] = asdict(value) if is_dataclass(value) else value
  33. globals()[f'_preflight_{message_type}'](message_payload)
  34. return message_payload
  35. def _preflight_oadrRegisterReport(message_payload):
  36. for report in message_payload['reports']:
  37. # Check that the report name is preceded by METADATA_ when registering reports
  38. if report['report_name'] in enums.REPORT_NAME.values \
  39. and not report['report_name'].startswith("METADATA"):
  40. report['report_name'] = 'METADATA_' + report['report_name']
  41. # Check that the measurement name and description match according to the schema
  42. for report_description in report['report_descriptions']:
  43. if 'measurement' in report_description and report_description['measurement'] is not None:
  44. utils.validate_report_measurement_dict(report_description['measurement'])
  45. def _preflight_oadrDistributeEvent(message_payload):
  46. if 'parse_duration' not in globals():
  47. from .utils import parse_duration
  48. # Check that the total event_duration matches the sum of the interval durations (rule 8)
  49. for event in message_payload['events']:
  50. active_period_duration = event['active_period']['duration']
  51. signal_durations = []
  52. for signal in event['event_signals']:
  53. signal_durations.append(sum([parse_duration(i['duration'])
  54. for i in signal['intervals']], timedelta(seconds=0)))
  55. if not all([d == active_period_duration for d in signal_durations]):
  56. if not all([d == signal_durations[0] for d in signal_durations]):
  57. raise ValueError("The different EventSignals have different total durations. "
  58. "Please correct this.")
  59. else:
  60. logger.warning(f"The active_period duration for event "
  61. f"{event['event_descriptor']['event_id']} ({active_period_duration})"
  62. f" differs from the sum of the interval's durations "
  63. f"({signal_durations[0]}). The active_period duration has been "
  64. f"adjusted to ({signal_durations[0]}).")
  65. event['active_period']['duration'] = signal_durations[0]
  66. # Check that payload values with signal name SIMPLE are constricted (rule 9)
  67. for event in message_payload['events']:
  68. for event_signal in event['event_signals']:
  69. if event_signal['signal_name'] == "SIMPLE":
  70. for interval in event_signal['intervals']:
  71. if interval['signal_payload'] not in (0, 1, 2, 3):
  72. raise ValueError("Payload Values used with Signal Name SIMPLE "
  73. "must be one of 0, 1, 2 or 3")
  74. # Check that the current_value is 0 for SIMPLE events that are not yet active (rule 14)
  75. for event in message_payload['events']:
  76. for event_signal in event['event_signals']:
  77. if 'current_value' in event_signal and event_signal['current_value'] != 0:
  78. if event_signal['signal_name'] == "SIMPLE" \
  79. and event['event_descriptor']['event_status'] != "ACTIVE":
  80. logger.warning("The current_value for a SIMPLE event "
  81. "that is not yet active must be 0. "
  82. "This will be corrected.")
  83. event_signal['current_value'] = 0
  84. # Check that there is a valid oadrResponseRequired value for each Event
  85. for event in message_payload['events']:
  86. if 'response_required' not in event:
  87. event['response_required'] = 'always'
  88. elif event['response_required'] not in ('never', 'always'):
  89. logger.warning(f"The response_required property in an Event "
  90. f"should be 'never' or 'always', not "
  91. f"{event['response_required']}. Changing to 'always'.")
  92. event['response_required'] = 'always'
  93. # Check that there is a valid oadrResponseRequired value for each Event
  94. for event in message_payload['events']:
  95. if 'created_date_time' not in event['event_descriptor'] \
  96. or not event['event_descriptor']['created_date_time']:
  97. event['event_descriptor']['created_date_time'] = datetime.now(timezone.utc)
  98. # Check that the target designations are correct and consistent
  99. for event in message_payload['events']:
  100. if 'targets' in event and 'targets_by_type' in event:
  101. if utils.group_targets_by_type(event['targets']) != event['targets_by_type']:
  102. raise ValueError("You assigned both 'targets' and 'targets_by_type' in your event, "
  103. "but the two were not consistent with each other. "
  104. f"You supplied 'targets' = {event['targets']} and "
  105. f"'targets_by_type' = {event['targets_by_type']}")
  106. elif 'targets_by_type' in event and 'targets' not in event:
  107. event['targets'] = utils.ungroup_targets_by_type(event['targets_by_type'])