|
@@ -15,7 +15,7 @@
|
|
|
# limitations under the License.
|
|
|
|
|
|
from . import service, handler, VTNService
|
|
|
-from asyncio import iscoroutine, gather
|
|
|
+from asyncio import iscoroutine
|
|
|
from openleadr import objects, utils
|
|
|
import logging
|
|
|
import inspect
|
|
@@ -63,10 +63,9 @@ logger = logging.getLogger('openleadr')
|
|
|
@service('EiReport')
|
|
|
class ReportService(VTNService):
|
|
|
|
|
|
- def __init__(self, vtn_id, message_queues=None):
|
|
|
+ def __init__(self, vtn_id):
|
|
|
super().__init__(vtn_id)
|
|
|
self.report_callbacks = {}
|
|
|
- self.message_queues = message_queues
|
|
|
self.registered_reports = {}
|
|
|
|
|
|
@handler('oadrRegisterReport')
|
|
@@ -79,63 +78,79 @@ class ReportService(VTNService):
|
|
|
if all(['ven_id' in args, 'resource_id' in args, 'measurement' in args,
|
|
|
'min_sampling_interval' in args, 'max_sampling_interval' in args,
|
|
|
'unit' in args, 'scale' in args]):
|
|
|
- for report in payload['reports']:
|
|
|
- if report['report_name'] == 'METADATA_TELEMETRY_STATUS':
|
|
|
- result = [self.on_register_report(ven_id=payload['ven_id'],
|
|
|
- resource_id=rd.get('report_data_source', {}).get('resource_id'),
|
|
|
- measurement='Status',
|
|
|
- unit=None,
|
|
|
- scale=None,
|
|
|
- min_sampling_interval=rd['sampling_rate']['min_period'],
|
|
|
- max_sampling_interval=rd['sampling_rate']['max_period'])
|
|
|
- for rd in report['report_descriptions']]
|
|
|
- elif report['report_name'] == 'METADATA_TELEMETRY_USAGE':
|
|
|
- result = [self.on_register_report(ven_id=payload['ven_id'],
|
|
|
- resource_id=rd.get('report_data_source', {}).get('resource_id'),
|
|
|
- measurement=rd['measurement']['description'],
|
|
|
- unit=rd['measurement']['unit'],
|
|
|
- scale=rd['measurement']['scale'],
|
|
|
- min_sampling_interval=rd['sampling_rate']['min_period'],
|
|
|
- max_sampling_interval=rd['sampling_rate']['max_period'])
|
|
|
- for rd in report['report_descriptions']]
|
|
|
- elif report['report_name'] in ('METADATA_HISTORY_USAGE', 'METADATA_HISTORY_GREENBUTTON'):
|
|
|
- if payload['ven_id'] not in self.available_reports:
|
|
|
- self.available_reports[payload['ven_id']] = []
|
|
|
- self.registered_reports[payload['ven_id']].append(report)
|
|
|
- else:
|
|
|
- logger.warning("Reports other than TELEMETRY_USAGE, TELEMETRY_STATUS, "
|
|
|
- "HISTORY_USAGE and HISTORY_GREENBUTTON are not yet supported. "
|
|
|
- f"Skipping report with name {report['report_name']}.")
|
|
|
- report_requests.append(None)
|
|
|
- continue
|
|
|
-
|
|
|
- if iscoroutine(result[0]):
|
|
|
- result = await gather(*result)
|
|
|
- for i, r in enumerate(result):
|
|
|
- if r is None:
|
|
|
- continue
|
|
|
- if not isinstance(r, tuple):
|
|
|
- logger.error("Your on_register_report handler must return a tuple; "
|
|
|
- f"it returned '{r}' ({r.__class__.__name__}).")
|
|
|
- result[i] = None
|
|
|
- result = [(report['report_descriptions'][i]['r_id'], *result[i])
|
|
|
- for i in range(len(report['report_descriptions'])) if isinstance(result[i], tuple)]
|
|
|
- report_requests.append(result)
|
|
|
- utils.validate_report_request_tuples(report_requests)
|
|
|
+ mode = 'compact'
|
|
|
else:
|
|
|
- # Use the 'full' mode for openADR reporting
|
|
|
- result = [self.on_register_report(report) for report in payload['reports']]
|
|
|
- if iscoroutine(result[0]):
|
|
|
- result = await gather(*result) # Now we have r_id, callback, sampling_rate
|
|
|
- for i, r in enumerate(result):
|
|
|
- if r is None:
|
|
|
- continue
|
|
|
- if not isinstance(r, list):
|
|
|
- logger.error("Your on_register_report handler must return a list of tuples. "
|
|
|
- f"It returned '{r}' ({r.__class__.__name__}).")
|
|
|
- result[i] = None
|
|
|
- report_requests = result
|
|
|
- utils.validate_report_request_tuples(report_requests, full_mode=True)
|
|
|
+ mode = 'full'
|
|
|
+
|
|
|
+ if payload['reports'] is None:
|
|
|
+ return
|
|
|
+
|
|
|
+ for report in payload['reports']:
|
|
|
+ if report['report_name'] == 'METADATA_TELEMETRY_STATUS':
|
|
|
+ if mode == 'compact':
|
|
|
+ results = [self.on_register_report(ven_id=payload['ven_id'],
|
|
|
+ resource_id=rd.get('report_data_source', {}).get('resource_id'),
|
|
|
+ measurement='Status',
|
|
|
+ unit=None,
|
|
|
+ scale=None,
|
|
|
+ min_sampling_interval=rd['sampling_rate']['min_period'],
|
|
|
+ max_sampling_interval=rd['sampling_rate']['max_period'])
|
|
|
+ for rd in report['report_descriptions']]
|
|
|
+ results = await utils.gather_if_required(results)
|
|
|
+ elif mode == 'full':
|
|
|
+ results = await utils.await_if_required(self.on_register_report(report))
|
|
|
+ elif report['report_name'] == 'METADATA_TELEMETRY_USAGE':
|
|
|
+ if mode == 'compact':
|
|
|
+ results = [self.on_register_report(ven_id=payload['ven_id'],
|
|
|
+ resource_id=rd.get('report_data_source', {}).get('resource_id'),
|
|
|
+ measurement=rd['measurement']['description'],
|
|
|
+ unit=rd['measurement']['unit'],
|
|
|
+ scale=rd['measurement']['scale'],
|
|
|
+ min_sampling_interval=rd['sampling_rate']['min_period'],
|
|
|
+ max_sampling_interval=rd['sampling_rate']['max_period'])
|
|
|
+ for rd in report['report_descriptions']]
|
|
|
+ results = await utils.gather_if_required(results)
|
|
|
+ elif mode == 'full':
|
|
|
+ results = await utils.await_if_required(self.on_register_report(report))
|
|
|
+ elif report['report_name'] in ('METADATA_HISTORY_USAGE', 'METADATA_HISTORY_GREENBUTTON'):
|
|
|
+ if payload['ven_id'] not in self.registered_reports:
|
|
|
+ self.registered_reports[payload['ven_id']] = []
|
|
|
+ report['report_name'] = report['report_name'][9:]
|
|
|
+ self.registered_reports[payload['ven_id']].append(report)
|
|
|
+ report_requests.append(None)
|
|
|
+ continue
|
|
|
+ else:
|
|
|
+ logger.warning("Reports other than TELEMETRY_USAGE, TELEMETRY_STATUS, "
|
|
|
+ "HISTORY_USAGE and HISTORY_GREENBUTTON are not yet supported. "
|
|
|
+ f"Skipping report with name {report['report_name']}.")
|
|
|
+ report_requests.append(None)
|
|
|
+ continue
|
|
|
+
|
|
|
+ # Perform some rudimentary checks on the returned type
|
|
|
+ if results is not None:
|
|
|
+ if not isinstance(results, list):
|
|
|
+ logger.error("Your on_register_report handler must return a list of tuples or None; "
|
|
|
+ f"it returned '{results}' ({results.__class__.__name__}).")
|
|
|
+ results = None
|
|
|
+ else:
|
|
|
+ for i, r in enumerate(results):
|
|
|
+ if r is None:
|
|
|
+ continue
|
|
|
+ if not isinstance(r, tuple):
|
|
|
+ if mode == 'compact':
|
|
|
+ logger.error("Your on_register_report handler must return a tuple or None; "
|
|
|
+ f"it returned '{r}' ({r.__class__.__name__}).")
|
|
|
+ elif mode == 'full':
|
|
|
+ logger.error("Your on_register_report handler must return a list of tuples or None; "
|
|
|
+ f"The first item from the list was '{r}' ({r.__class__.__name__}).")
|
|
|
+ results[i] = None
|
|
|
+ # If we used compact mode, prepend the r_id to each result
|
|
|
+ # (this is already there when using the full mode)
|
|
|
+ if mode == 'compact':
|
|
|
+ results = [(report['report_descriptions'][i]['r_id'], *results[i])
|
|
|
+ for i in range(len(report['report_descriptions'])) if isinstance(results[i], tuple)]
|
|
|
+ report_requests.append(results)
|
|
|
+ utils.validate_report_request_tuples(report_requests, mode=mode)
|
|
|
|
|
|
for i, report_request in enumerate(report_requests):
|
|
|
if report_request is None or len(report_request) == 0 or all(rrq is None for rrq in report_request):
|