report_service.py 14 KB


  1. # SPDX-License-Identifier: Apache-2.0
  2. # Copyright 2020 Contributors to OpenLEADR
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. # Unless required by applicable law or agreed to in writing, software
  8. # distributed under the License is distributed on an "AS IS" BASIS,
  9. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. # See the License for the specific language governing permissions and
  11. # limitations under the License.
  12. from . import service, handler, VTNService
  13. from asyncio import iscoroutine, gather
  14. from openleadr.utils import generate_id, find_by, group_by
  15. from openleadr import objects
  16. import logging
  17. import inspect
  18. logger = logging.getLogger('openleadr')
  19. # ╔══════════════════════════════════════════════════════════════════════════╗
  20. # ║ REPORT SERVICE ║
  21. # ╚══════════════════════════════════════════════════════════════════════════╝
  22. # ┌──────────────────────────────────────────────────────────────────────────┐
  23. # │ The VEN can register its reporting capabilities. │
  24. # │ │
  25. # │ ┌────┐ ┌────┐ │
  26. # │ │VEN │ │VTN │ │
  27. # │ └─┬──┘ └─┬──┘ │
  28. # │ │───────────────oadrRegisterReport(METADATA Report)──────────────▶│ │
  29. # │ │ │ │
  30. # │ │◀ ─ ─ ─ ─oadrRegisteredReport(optional oadrReportRequest) ─ ─ ─ ─│ │
  31. # │ │ │ │
  32. # │ │ │ │
  33. # │ │─────────────oadrCreatedReport(if report requested)─────────────▶│ │
  34. # │ │ │ │
  35. # │ │◀ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ oadrResponse()─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│ │
  36. # │ │ │ │
  37. # │ │
  38. # └──────────────────────────────────────────────────────────────────────────┘
  39. # ┌──────────────────────────────────────────────────────────────────────────┐
  40. # │ A report can also be canceled │
  41. # │ │
  42. # │ ┌────┐ ┌────┐ │
  43. # │ │VEN │ │VTN │ │
  44. # │ └─┬──┘ └─┬──┘ │
  45. # │ │───────────────oadrRegisterReport(METADATA Report)──────────────▶│ │
  46. # │ │ │ │
  47. # │ │◀ ─ ─ ─ ─oadrRegisteredReport(optional oadrReportRequest) ─ ─ ─ ─│ │
  48. # │ │ │ │
  49. # │ │ │ │
  50. # │ │─────────────oadrCreatedReport(if report requested)─────────────▶│ │
  51. # │ │ │ │
  52. # │ │◀ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ oadrResponse()─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│ │
  53. # │ │ │ │
  54. # │ │
  55. # └──────────────────────────────────────────────────────────────────────────┘
  56. @service('EiReport')
  57. class ReportService(VTNService):
  58. def __init__(self, vtn_id, message_queues=None):
  59. super().__init__(vtn_id)
  60. self.report_callbacks = {}
  61. self.message_queues = message_queues
  62. @handler('oadrRegisterReport')
  63. async def register_report(self, payload):
  64. """
  65. Handle the VENs reporting capabilities.
  66. """
  67. report_requests = []
  68. args = inspect.signature(self.on_register_report).parameters
  69. if all(['ven_id' in args, 'resource_id' in args, 'measurement' in args,
  70. 'min_sampling_interval' in args, 'max_sampling_interval' in args,
  71. 'unit' in args, 'scale' in args]):
  72. for report in payload['reports']:
  73. if report['report_name'] == 'METADATA_TELEMETRY_STATUS':
  74. result = [self.on_register_report(ven_id=payload['ven_id'],
  75. resource_id=rd['report_subject']['resource_id'],
  76. measurement='Status',
  77. unit=None,
  78. scale=None,
  79. min_sampling_interval=rd['sampling_rate']['min_period'],
  80. max_sampling_interval=rd['sampling_rate']['max_period'])
  81. for rd in report['report_descriptions']]
  82. elif report['report_name'] == 'METADATA_TELEMETRY_USAGE':
  83. result = [self.on_register_report(ven_id=payload['ven_id'],
  84. resource_id=rd['report_subject']['resource_id'],
  85. measurement=rd['measurement']['description'],
  86. unit=rd['measurement']['unit'],
  87. scale=rd['measurement']['scale'],
  88. min_sampling_interval=rd['sampling_rate']['min_period'],
  89. max_sampling_interval=rd['sampling_rate']['max_period'])
  90. for rd in report['report_descriptions']]
  91. else:
  92. logger.warning("Reports other than TELEMETRY_USAGE and TELEMETRY_STATUS are "
  93. f"not yet supported. Skipping report {report['report_name']}.")
  94. report_requests.append(None)
  95. break
  96. if iscoroutine(result[0]):
  97. result = await gather(*result)
  98. result = [(report['report_descriptions'][i]['r_id'], *result[i])
  99. for i in range(len(report['report_descriptions'])) if result[i] is not None]
  100. report_requests.append(result)
  101. else:
  102. # Use the 'full' mode for openADR reporting
  103. result = [self.on_register_report(report) for report in payload['reports']]
  104. if iscoroutine(result[0]):
  105. result = await gather(*result) # Now we have r_id, callback, sampling_rate
  106. report_requests = result
  107. for i, report_request in enumerate(report_requests):
  108. if report_request is not None:
  109. if not all(len(rrq) in (3, 4) for rrq in report_request):
  110. logger.error("Your on_register_report handler did not return a valid response")
  111. # Validate the report requests
  112. for i, report_request in enumerate(report_requests):
  113. if report_request is None or len(report_request) == 0:
  114. continue
  115. # Check if all sampling rates per report_request are the same
  116. sampling_interval = min(rrq[2] for rrq in report_request if rrq is not None)
  117. if not all(rrq is not None and report_request[0][2] == sampling_interval for rrq in report_request):
  118. logger.error("OpenADR does not support multiple different sampling rates per "
  119. "report. OpenLEADR will set all sampling rates to "
  120. f"{sampling_interval}")
  121. # Form the report request
  122. oadr_report_requests = []
  123. for i, report_request in enumerate(report_requests):
  124. if report_request is None:
  125. continue
  126. orig_report = payload['reports'][i]
  127. report_specifier_id = orig_report['report_specifier_id']
  128. report_request_id = generate_id()
  129. specifier_payloads = []
  130. for rrq in report_request:
  131. if len(rrq) == 3:
  132. r_id, callback, sampling_interval = rrq
  133. report_interval = sampling_interval
  134. elif len(rrq) == 4:
  135. r_id, callback, sampling_interval, report_interval = rrq
  136. report_description = find_by(orig_report['report_descriptions'], 'r_id', r_id)
  137. reading_type = report_description['reading_type']
  138. specifier_payloads.append(objects.SpecifierPayload(r_id=r_id,
  139. reading_type=reading_type))
  140. # Append the callback to our list of known callbacks
  141. self.report_callbacks[(report_request_id, r_id)] = callback
  142. # Add the ReportSpecifier to the ReportRequest
  143. report_specifier = objects.ReportSpecifier(report_specifier_id=report_specifier_id,
  144. granularity=sampling_interval,
  145. report_back_duration=report_interval,
  146. specifier_payloads=specifier_payloads)
  147. # Add the ReportRequest to our outgoing message
  148. oadr_report_requests.append(objects.ReportRequest(report_request_id=report_request_id,
  149. report_specifier=report_specifier))
  150. # Put the report requests back together
  151. response_type = 'oadrRegisteredReport'
  152. response_payload = {'report_requests': oadr_report_requests}
  153. return response_type, response_payload
  154. async def on_register_report(self, payload):
  155. """
  156. Pre-handler for a oadrOnRegisterReport message. This will call your own handler (if defined)
  157. to allow for requesting the offered reports.
  158. """
  159. logger.warning("You should implement and register your own on_register_report handler "
  160. "if you want to receive reports from a VEN. This handler will receive the "
  161. "following arguments: ven_id, resource_id, measurement, unit, scale, "
  162. "min_sampling_interval, max_sampling_interval and should return either "
  163. "None or (callback, sampling_interval) or "
  164. "(callback, sampling_interval, reporting_interval).")
  165. return None
  166. @handler('oadrUpdateReport')
  167. async def update_report(self, payload):
  168. """
  169. Handle a report that we received from the VEN.
  170. """
  171. for report in payload['reports']:
  172. report_request_id = report['report_request_id']
  173. if not self.report_callbacks:
  174. result = self.on_update_report(report)
  175. if iscoroutine(result):
  176. result = await result
  177. continue
  178. for r_id, values in group_by(report['intervals'], 'report_payload.r_id').items():
  179. # Find the callback that was registered.
  180. if (report_request_id, r_id) in self.report_callbacks:
  181. # Collect the values
  182. values = [(ri['dtstart'], ri['report_payload']['value']) for ri in values]
  183. # Call the callback function to deliver the values
  184. result = self.report_callbacks[(report_request_id, r_id)](values)
  185. if iscoroutine(result):
  186. result = await result
  187. response_type = 'oadrUpdatedReport'
  188. response_payload = {}
  189. return response_type, response_payload
  190. async def on_update_report(self, payload):
  191. """
  192. Placeholder for the on_update_report handler.
  193. """
  194. logger.warning("You should implement and register your own on_update_report handler "
  195. "to deal with reports that your receive from the VEN. This handler will "
  196. "receive either a complete oadrReport dict, or a list of (datetime, value) "
  197. "tuples that you can then process how you see fit. You don't "
  198. "need to return anything from that handler.")
  199. return None