report_service.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. # SPDX-License-Identifier: Apache-2.0
  2. # Copyright 2020 Contributors to OpenLEADR
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. # Unless required by applicable law or agreed to in writing, software
  8. # distributed under the License is distributed on an "AS IS" BASIS,
  9. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. # See the License for the specific language governing permissions and
  11. # limitations under the License.
  12. from . import service, handler, VTNService
  13. from asyncio import iscoroutine
  14. from openleadr import objects, utils
  15. import logging
  16. import inspect
  17. logger = logging.getLogger('openleadr')
  18. # ╔══════════════════════════════════════════════════════════════════════════╗
  19. # ║ REPORT SERVICE ║
  20. # ╚══════════════════════════════════════════════════════════════════════════╝
  21. # ┌──────────────────────────────────────────────────────────────────────────┐
  22. # │ The VEN can register its reporting capabilities. │
  23. # │ │
  24. # │ ┌────┐ ┌────┐ │
  25. # │ │VEN │ │VTN │ │
  26. # │ └─┬──┘ └─┬──┘ │
  27. # │ │───────────────oadrRegisterReport(METADATA Report)──────────────▶│ │
  28. # │ │ │ │
  29. # │ │◀ ─ ─ ─ ─oadrRegisteredReport(optional oadrReportRequest) ─ ─ ─ ─│ │
  30. # │ │ │ │
  31. # │ │ │ │
  32. # │ │─────────────oadrCreatedReport(if report requested)─────────────▶│ │
  33. # │ │ │ │
  34. # │ │◀ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ oadrResponse()─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│ │
  35. # │ │ │ │
  36. # │ │
  37. # └──────────────────────────────────────────────────────────────────────────┘
  38. # ┌──────────────────────────────────────────────────────────────────────────┐
  39. # │ A report can also be canceled │
  40. # │ │
  41. # │ ┌────┐ ┌────┐ │
  42. # │ │VEN │ │VTN │ │
  43. # │ └─┬──┘ └─┬──┘ │
  44. # │ │───────────────oadrRegisterReport(METADATA Report)──────────────▶│ │
  45. # │ │ │ │
  46. # │ │◀ ─ ─ ─ ─oadrRegisteredReport(optional oadrReportRequest) ─ ─ ─ ─│ │
  47. # │ │ │ │
  48. # │ │ │ │
  49. # │ │─────────────oadrCreatedReport(if report requested)─────────────▶│ │
  50. # │ │ │ │
  51. # │ │◀ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ oadrResponse()─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│ │
  52. # │ │ │ │
  53. # │ │
  54. # └──────────────────────────────────────────────────────────────────────────┘
  55. @service('EiReport')
  56. class ReportService(VTNService):
  57. def __init__(self, vtn_id):
  58. super().__init__(vtn_id)
  59. self.report_callbacks = {}
  60. self.registered_reports = {}
  61. @handler('oadrRegisterReport')
  62. async def register_report(self, payload):
  63. """
  64. Handle the VENs reporting capabilities.
  65. """
  66. report_requests = []
  67. args = inspect.signature(self.on_register_report).parameters
  68. if all(['ven_id' in args, 'resource_id' in args, 'measurement' in args,
  69. 'min_sampling_interval' in args, 'max_sampling_interval' in args,
  70. 'unit' in args, 'scale' in args]):
  71. mode = 'compact'
  72. else:
  73. mode = 'full'
  74. if payload['reports'] is None:
  75. return
  76. for report in payload['reports']:
  77. if report['report_name'] == 'METADATA_TELEMETRY_STATUS':
  78. if mode == 'compact':
  79. results = [self.on_register_report(ven_id=payload['ven_id'],
  80. resource_id=rd.get('report_data_source', {}).get('resource_id'),
  81. measurement='Status',
  82. unit=None,
  83. scale=None,
  84. min_sampling_interval=rd['sampling_rate']['min_period'],
  85. max_sampling_interval=rd['sampling_rate']['max_period'])
  86. for rd in report['report_descriptions']]
  87. results = await utils.gather_if_required(results)
  88. elif mode == 'full':
  89. results = await utils.await_if_required(self.on_register_report(report))
  90. elif report['report_name'] == 'METADATA_TELEMETRY_USAGE':
  91. if mode == 'compact':
  92. results = [self.on_register_report(ven_id=payload['ven_id'],
  93. resource_id=rd.get('report_data_source', {}).get('resource_id'),
  94. measurement=rd['measurement']['description'],
  95. unit=rd['measurement']['unit'],
  96. scale=rd['measurement']['scale'],
  97. min_sampling_interval=rd['sampling_rate']['min_period'],
  98. max_sampling_interval=rd['sampling_rate']['max_period'])
  99. for rd in report['report_descriptions']]
  100. results = await utils.gather_if_required(results)
  101. elif mode == 'full':
  102. results = await utils.await_if_required(self.on_register_report(report))
  103. elif report['report_name'] in ('METADATA_HISTORY_USAGE', 'METADATA_HISTORY_GREENBUTTON'):
  104. if payload['ven_id'] not in self.registered_reports:
  105. self.registered_reports[payload['ven_id']] = []
  106. report['report_name'] = report['report_name'][9:]
  107. self.registered_reports[payload['ven_id']].append(report)
  108. report_requests.append(None)
  109. continue
  110. else:
  111. logger.warning("Reports other than TELEMETRY_USAGE, TELEMETRY_STATUS, "
  112. "HISTORY_USAGE and HISTORY_GREENBUTTON are not yet supported. "
  113. f"Skipping report with name {report['report_name']}.")
  114. report_requests.append(None)
  115. continue
  116. # Perform some rudimentary checks on the returned type
  117. if results is not None:
  118. if not isinstance(results, list):
  119. logger.error("Your on_register_report handler must return a list of tuples or None; "
  120. f"it returned '{results}' ({results.__class__.__name__}).")
  121. results = None
  122. else:
  123. for i, r in enumerate(results):
  124. if r is None:
  125. continue
  126. if not isinstance(r, tuple):
  127. if mode == 'compact':
  128. logger.error("Your on_register_report handler must return a tuple or None; "
  129. f"it returned '{r}' ({r.__class__.__name__}).")
  130. elif mode == 'full':
  131. logger.error("Your on_register_report handler must return a list of tuples or None; "
  132. f"The first item from the list was '{r}' ({r.__class__.__name__}).")
  133. results[i] = None
  134. # If we used compact mode, prepend the r_id to each result
  135. # (this is already there when using the full mode)
  136. if mode == 'compact':
  137. results = [(report['report_descriptions'][i]['r_id'], *results[i])
  138. for i in range(len(report['report_descriptions'])) if isinstance(results[i], tuple)]
  139. report_requests.append(results)
  140. utils.validate_report_request_tuples(report_requests, mode=mode)
  141. for i, report_request in enumerate(report_requests):
  142. if report_request is None or len(report_request) == 0 or all(rrq is None for rrq in report_request):
  143. continue
  144. # Check if all sampling rates per report_request are the same
  145. sampling_interval = min(rrq[2] for rrq in report_request if isinstance(rrq, tuple))
  146. if not all(rrq is not None and report_request[0][2] == sampling_interval for rrq in report_request):
  147. logger.error("OpenADR does not support multiple different sampling rates per "
  148. "report. OpenLEADR will set all sampling rates to "
  149. f"{sampling_interval}")
  150. # Form the report request
  151. oadr_report_requests = []
  152. for i, report_request in enumerate(report_requests):
  153. if report_request is None or len(report_request) == 0 or all(rrq is None for rrq in report_request):
  154. continue
  155. orig_report = payload['reports'][i]
  156. report_specifier_id = orig_report['report_specifier_id']
  157. report_request_id = utils.generate_id()
  158. specifier_payloads = []
  159. for rrq in report_request:
  160. if len(rrq) == 3:
  161. r_id, callback, sampling_interval = rrq
  162. report_interval = sampling_interval
  163. elif len(rrq) == 4:
  164. r_id, callback, sampling_interval, report_interval = rrq
  165. report_description = utils.find_by(orig_report['report_descriptions'], 'r_id', r_id)
  166. reading_type = report_description['reading_type']
  167. specifier_payloads.append(objects.SpecifierPayload(r_id=r_id,
  168. reading_type=reading_type))
  169. # Append the callback to our list of known callbacks
  170. self.report_callbacks[(report_request_id, r_id)] = callback
  171. # Add the ReportSpecifier to the ReportRequest
  172. report_specifier = objects.ReportSpecifier(report_specifier_id=report_specifier_id,
  173. granularity=sampling_interval,
  174. report_back_duration=report_interval,
  175. specifier_payloads=specifier_payloads)
  176. # Add the ReportRequest to our outgoing message
  177. oadr_report_requests.append(objects.ReportRequest(report_request_id=report_request_id,
  178. report_specifier=report_specifier))
  179. # Put the report requests back together
  180. response_type = 'oadrRegisteredReport'
  181. response_payload = {'report_requests': oadr_report_requests}
  182. return response_type, response_payload
  183. async def on_register_report(self, payload):
  184. """
  185. Pre-handler for a oadrOnRegisterReport message. This will call your own handler (if defined)
  186. to allow for requesting the offered reports.
  187. """
  188. logger.warning("You should implement and register your own on_register_report handler "
  189. "if you want to receive reports from a VEN. This handler will receive the "
  190. "following arguments: ven_id, resource_id, measurement, unit, scale, "
  191. "min_sampling_interval, max_sampling_interval and should return either "
  192. "None or (callback, sampling_interval) or "
  193. "(callback, sampling_interval, reporting_interval).")
  194. return None
  195. @handler('oadrUpdateReport')
  196. async def update_report(self, payload):
  197. """
  198. Handle a report that we received from the VEN.
  199. """
  200. for report in payload['reports']:
  201. report_request_id = report['report_request_id']
  202. if not self.report_callbacks:
  203. result = self.on_update_report(report)
  204. if iscoroutine(result):
  205. result = await result
  206. continue
  207. for r_id, values in utils.group_by(report['intervals'], 'report_payload.r_id').items():
  208. # Find the callback that was registered.
  209. if (report_request_id, r_id) in self.report_callbacks:
  210. # Collect the values
  211. values = [(ri['dtstart'], ri['report_payload']['value']) for ri in values]
  212. # Call the callback function to deliver the values
  213. result = self.report_callbacks[(report_request_id, r_id)](values)
  214. if iscoroutine(result):
  215. result = await result
  216. response_type = 'oadrUpdatedReport'
  217. response_payload = {}
  218. return response_type, response_payload
  219. async def on_update_report(self, payload):
  220. """
  221. Placeholder for the on_update_report handler.
  222. """
  223. logger.warning("You should implement and register your own on_update_report handler "
  224. "to deal with reports that your receive from the VEN. This handler will "
  225. "receive either a complete oadrReport dict, or a list of (datetime, value) "
  226. "tuples that you can then process how you see fit. You don't "
  227. "need to return anything from that handler.")
  228. return None