event_service.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. # SPDX-License-Identifier: Apache-2.0
  2. # Copyright 2020 Contributors to OpenLEADR
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. # Unless required by applicable law or agreed to in writing, software
  8. # distributed under the License is distributed on an "AS IS" BASIS,
  9. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. # See the License for the specific language governing permissions and
  11. # limitations under the License.
  12. from . import service, handler, VTNService
  13. import asyncio
  14. from openleadr import objects, utils, enums
  15. import logging
  16. from datetime import datetime, timezone
  17. from functools import partial
  18. from dataclasses import asdict
  19. logger = logging.getLogger('openleadr')
  20. @service('EiEvent')
  21. class EventService(VTNService):
  22. def __init__(self, vtn_id, polling_method='internal', message_queues=None):
  23. super().__init__(vtn_id)
  24. self.polling_method = polling_method
  25. self.message_queues = message_queues
  26. self.pending_events = {} # Holds the event callbacks
  27. self.running_events = {} # Holds the event callbacks for accepted events
  28. @handler('oadrRequestEvent')
  29. async def request_event(self, payload):
  30. """
  31. The VEN requests us to send any events we have.
  32. """
  33. if self.polling_method == 'external':
  34. result = self.on_request_event(ven_id=payload['ven_id'])
  35. if asyncio.iscoroutine(result):
  36. result = await result
  37. elif payload['ven_id'] in self.message_queues:
  38. queue = self.message_queues[payload['ven_id']]
  39. result = utils.get_next_event_from_deque(queue)
  40. else:
  41. return 'oadrResponse', {}
  42. if result is None:
  43. return 'oadrResponse', {}
  44. if isinstance(result, dict) and 'event_descriptor' in result:
  45. return 'oadrDistributeEvent', {'events': [result]}
  46. elif isinstance(result, objects.Event):
  47. return 'oadrDistributeEvent', {'events': [asdict(result)]}
  48. logger.warning("Could not determine type of message "
  49. f"in response to oadrRequestEvent: {result}")
  50. return 'oadrResponse', result
  51. def on_request_event(self, ven_id):
  52. """
  53. Placeholder for the on_request_event handler.
  54. """
  55. logger.warning("You should implement and register your own on_request_event handler "
  56. "that returns the next event for a VEN. This handler will receive a "
  57. "ven_id as its only argument, and should return None (if no events are "
  58. "available), a single Event, or a list of Events.")
  59. return None
  60. @handler('oadrCreatedEvent')
  61. async def created_event(self, payload):
  62. """
  63. The VEN informs us that they created an EiEvent.
  64. """
  65. loop = asyncio.get_event_loop()
  66. ven_id = payload['ven_id']
  67. if self.polling_method == 'internal':
  68. for event_response in payload['event_responses']:
  69. event_id = event_response['event_id']
  70. opt_type = event_response['opt_type']
  71. if event_response['event_id'] in self.pending_events:
  72. event, callback = self.pending_events.pop(event_id)
  73. if asyncio.isfuture(callback):
  74. callback.set_result(opt_type)
  75. else:
  76. result = callback(ven_id=ven_id, event_id=event_id, opt_type=opt_type)
  77. if asyncio.iscoroutine(result):
  78. result = await result
  79. if opt_type == 'optIn':
  80. self.running_events[event_id] = (event, callback)
  81. now = datetime.now(timezone.utc)
  82. active_period = event.active_period
  83. # Schedule status update to 'near' if applicable
  84. if active_period.ramp_up_period is not None and event.event_descriptor.event_status == 'far':
  85. ramp_up_start_delay = (active_period.dtstart - active_period.ramp_up_period) - now
  86. update_coro = partial(self._update_event_status, ven_id, event, 'near')
  87. loop.create_task(utils.delayed_call(func=update_coro, delay=ramp_up_start_delay),
  88. name=f'DelayedCall-{utils.generate_id()}')
  89. # Schedule status update to 'active'
  90. if event.event_descriptor.event_status in ('near', 'far'):
  91. active_start_delay = active_period.dtstart - now
  92. update_coro = partial(self._update_event_status, ven_id, event, 'active')
  93. loop.create_task(utils.delayed_call(func=update_coro, delay=active_start_delay),
  94. name=f'DelayedCall-{utils.generate_id()}')
  95. # Schedule status update to 'completed'
  96. if event.event_descriptor.event_status in ('near', 'far', 'active'):
  97. active_end_delay = active_period.dtstart + active_period.duration - now
  98. update_coro = partial(self._update_event_status, ven_id, event, 'completed')
  99. loop.create_task(utils.delayed_call(func=update_coro, delay=active_end_delay),
  100. name=f'DelayedCall-{utils.generate_id()}')
  101. elif event_response['event_id'] in self.running_events:
  102. event, callback = self.running_events.pop(event_id)
  103. result = callback(ven_id=ven_id, event_id=event_id, opt_type=opt_type)
  104. if asyncio.iscoroutine(result):
  105. result = await result
  106. else:
  107. result = self.on_created_event(ven_id=ven_id, event_id=event_id, opt_type=opt_type)
  108. if asyncio.iscoroutine(result):
  109. result = await(result)
  110. return 'oadrResponse', {}
  111. def on_created_event(self, ven_id, event_id, opt_type):
  112. """
  113. Placeholder for the on_created_event handler.
  114. """
  115. logger.warning("You should implement and register you own on_created_event handler "
  116. "to receive the opt status for an Event that you sent to the VEN. This "
  117. "handler will receive a ven_id, event_id and opt_status. "
  118. "You don't need to return anything from this handler.")
  119. return None
  120. def _update_event_status(self, ven_id, event, event_status):
  121. """
  122. Update the event to the given Status.
  123. """
  124. event.event_descriptor.event_status = event_status
  125. if event_status == enums.EVENT_STATUS.CANCELLED:
  126. event.event_descriptor.modification_number += 1
  127. self.message_queues[ven_id].append(event)