event_service.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. # SPDX-License-Identifier: Apache-2.0
  2. # Copyright 2020 Contributors to OpenLEADR
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. # Unless required by applicable law or agreed to in writing, software
  8. # distributed under the License is distributed on an "AS IS" BASIS,
  9. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. # See the License for the specific language governing permissions and
  11. # limitations under the License.
  12. from . import service, handler, VTNService
  13. import asyncio
  14. from openleadr import objects, utils, enums
  15. import logging
  16. import sys
  17. from datetime import datetime, timezone
  18. from functools import partial
  19. from dataclasses import asdict
  20. logger = logging.getLogger('openleadr')
  21. @service('EiEvent')
  22. class EventService(VTNService):
  23. def __init__(self, vtn_id, polling_method='internal', message_queues=None):
  24. super().__init__(vtn_id)
  25. self.polling_method = polling_method
  26. self.message_queues = message_queues
  27. self.pending_events = {} # Holds the event callbacks
  28. self.running_events = {} # Holds the event callbacks for accepted events
  29. @handler('oadrRequestEvent')
  30. async def request_event(self, payload):
  31. """
  32. The VEN requests us to send any events we have.
  33. """
  34. if self.polling_method == 'external':
  35. result = self.on_request_event(ven_id=payload['ven_id'])
  36. if asyncio.iscoroutine(result):
  37. result = await result
  38. elif payload['ven_id'] in self.message_queues:
  39. queue = self.message_queues[payload['ven_id']]
  40. result = utils.get_next_event_from_deque(queue)
  41. else:
  42. return 'oadrResponse', {}
  43. if result is None:
  44. return 'oadrResponse', {}
  45. if isinstance(result, dict) and 'event_descriptor' in result:
  46. return 'oadrDistributeEvent', {'events': [result]}
  47. elif isinstance(result, objects.Event):
  48. return 'oadrDistributeEvent', {'events': [asdict(result)]}
  49. logger.warning("Could not determine type of message "
  50. f"in response to oadrRequestEvent: {result}")
  51. return 'oadrResponse', result
  52. def on_request_event(self, ven_id):
  53. """
  54. Placeholder for the on_request_event handler.
  55. """
  56. logger.warning("You should implement and register your own on_request_event handler "
  57. "that returns the next event for a VEN. This handler will receive a "
  58. "ven_id as its only argument, and should return None (if no events are "
  59. "available), a single Event, or a list of Events.")
  60. return None
  61. @handler('oadrCreatedEvent')
  62. async def created_event(self, payload):
  63. """
  64. The VEN informs us that they created an EiEvent.
  65. """
  66. ven_id = payload['ven_id']
  67. if self.polling_method == 'internal':
  68. for event_response in payload['event_responses']:
  69. event_id = event_response['event_id']
  70. opt_type = event_response['opt_type']
  71. if event_response['event_id'] in self.pending_events:
  72. event, callback = self.pending_events.pop(event_id)
  73. if isinstance(callback, asyncio.Future):
  74. callback.set_result(opt_type)
  75. else:
  76. result = callback(ven_id=ven_id, event_id=event_id, opt_type=opt_type)
  77. if asyncio.iscoroutine(result):
  78. result = await result
  79. if opt_type == 'optIn':
  80. self.running_events[event_id] = (event, callback)
  81. self.schedule_event_updates(ven_id, event)
  82. elif event_response['event_id'] in self.running_events:
  83. event, callback = self.running_events.pop(event_id)
  84. if isinstance(callback, asyncio.Future):
  85. logger.warning(f"Got a second response '{opt_type}' from ven '{ven_id}' "
  86. f"to event '{event_id}', which we cannot use because the "
  87. "callback future you provided was already completed during "
  88. "the first response.")
  89. else:
  90. result = callback(ven_id=ven_id, event_id=event_id, opt_type=opt_type)
  91. if asyncio.iscoroutine(result):
  92. result = await result
  93. else:
  94. result = self.on_created_event(ven_id=ven_id, event_id=event_id, opt_type=opt_type)
  95. if asyncio.iscoroutine(result):
  96. result = await(result)
  97. return 'oadrResponse', {}
  98. def on_created_event(self, ven_id, event_id, opt_type):
  99. """
  100. Placeholder for the on_created_event handler.
  101. """
  102. logger.warning("You should implement and register you own on_created_event handler "
  103. "to receive the opt status for an Event that you sent to the VEN. This "
  104. "handler will receive a ven_id, event_id and opt_status. "
  105. "You don't need to return anything from this handler.")
  106. return None
  107. def _update_event_status(self, ven_id, event, event_status):
  108. """
  109. Update the event to the given Status.
  110. """
  111. event.event_descriptor.event_status = event_status
  112. if event_status == enums.EVENT_STATUS.CANCELLED:
  113. event.event_descriptor.modification_number += 1
  114. self.message_queues[ven_id].append(event)
  115. def schedule_event_updates(self, ven_id, event):
  116. """
  117. Schedules the event updates.
  118. """
  119. loop = asyncio.get_event_loop()
  120. now = datetime.now(timezone.utc)
  121. active_period = event.active_period
  122. # Named tasks is only supported in Python 3.8+
  123. if sys.version_info.minor >= 8:
  124. named_tasks = True
  125. else:
  126. named_tasks = False
  127. name = {}
  128. # Schedule status update to 'near' if applicable
  129. if active_period.ramp_up_period is not None and event.event_descriptor.event_status == 'far':
  130. ramp_up_start_delay = (active_period.dtstart - active_period.ramp_up_period) - now
  131. update_coro = partial(self._update_event_status, ven_id, event, 'near')
  132. if named_tasks:
  133. name = {'name': f'DelayedCall-EventStatusToNear-{event.event_descriptor.event_id}'}
  134. loop.create_task(utils.delayed_call(func=update_coro, delay=ramp_up_start_delay), **name)
  135. # Schedule status update to 'active'
  136. if event.event_descriptor.event_status in ('near', 'far'):
  137. active_start_delay = active_period.dtstart - now
  138. update_coro = partial(self._update_event_status, ven_id, event, 'active')
  139. if named_tasks:
  140. name = {'name': f'DelayedCall-EventStatusToActive-{event.event_descriptor.event_id}'}
  141. loop.create_task(utils.delayed_call(func=update_coro, delay=active_start_delay), **name)
  142. # Schedule status update to 'completed'
  143. if event.event_descriptor.event_status in ('near', 'far', 'active'):
  144. active_end_delay = active_period.dtstart + active_period.duration - now
  145. update_coro = partial(self._update_event_status, ven_id, event, 'completed')
  146. if named_tasks:
  147. name = {'name': f'DelayedCall-EventStatusToActive-{event.event_descriptor.event_id}'}
  148. loop.create_task(utils.delayed_call(func=update_coro, delay=active_end_delay), **name)