event_service.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. # SPDX-License-Identifier: Apache-2.0
  2. # Copyright 2020 Contributors to OpenLEADR
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. # Unless required by applicable law or agreed to in writing, software
  8. # distributed under the License is distributed on an "AS IS" BASIS,
  9. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. # See the License for the specific language governing permissions and
  11. # limitations under the License.
  12. from . import service, handler, VTNService
  13. import asyncio
  14. from openleadr import objects, utils, enums
  15. import logging
  16. from datetime import datetime, timezone
  17. from functools import partial
  18. logger = logging.getLogger('openleadr')
  19. @service('EiEvent')
  20. class EventService(VTNService):
  21. def __init__(self, vtn_id, polling_method='internal', message_queues=None):
  22. super().__init__(vtn_id)
  23. self.polling_method = polling_method
  24. self.message_queues = message_queues
  25. self.pending_events = {} # Holds the event callbacks
  26. self.running_events = {} # Holds the event callbacks for accepted events
  27. @handler('oadrRequestEvent')
  28. async def request_event(self, payload):
  29. """
  30. The VEN requests us to send any events we have.
  31. """
  32. result = self.on_request_event(payload['ven_id'])
  33. if asyncio.iscoroutine(result):
  34. result = await result
  35. if result is None:
  36. return 'oadrDistributeEvent', {'events': []}
  37. if isinstance(result, dict):
  38. return 'oadrDistributeEvent', {'events': [result]}
  39. if isinstance(result, objects.Event):
  40. return 'oadrDistributeEvent', {'events': [result]}
  41. if isinstance(result, list):
  42. return 'oadrDistributeEvent', {'events': result}
  43. else:
  44. raise TypeError("Event handler should return None, a dict or a list")
  45. def on_request_event(self, ven_id):
  46. """
  47. Placeholder for the on_request_event handler.
  48. """
  49. logger.warning("You should implement and register your own on_request_event handler "
  50. "that returns the next event for a VEN. This handler will receive a "
  51. "ven_id as its only argument, and should return None (if no events are "
  52. "available), a single Event, or a list of Events.")
  53. return None
  54. @handler('oadrCreatedEvent')
  55. async def created_event(self, payload):
  56. """
  57. The VEN informs us that they created an EiEvent.
  58. """
  59. loop = asyncio.get_event_loop()
  60. ven_id = payload['ven_id']
  61. if self.polling_method == 'internal':
  62. for event_response in payload['event_responses']:
  63. event_id = event_response['event_id']
  64. opt_type = event_response['opt_type']
  65. if event_response['event_id'] in self.pending_events:
  66. event, callback = self.pending_events.pop(event_id)
  67. result = callback(ven_id=ven_id, event_id=event_id, opt_type=opt_type)
  68. if asyncio.iscoroutine(result):
  69. result = await result
  70. if opt_type == 'optIn':
  71. self.running_events[event_id] = (event, callback)
  72. now = datetime.now(timezone.utc)
  73. active_period = event.active_period
  74. # Schedule status update to 'near' if applicable
  75. if active_period.ramp_up_period is not None and event.event_descriptor.event_status == 'far':
  76. ramp_up_start_delay = (active_period.dtstart - active_period.ramp_up_period) - now
  77. update_coro = partial(self._update_event_status, ven_id, event, 'near')
  78. loop.create_task(utils.delayed_call(func=update_coro, delay=ramp_up_start_delay))
  79. # Schedule status update to 'active'
  80. if event.event_descriptor.event_status in ('near', 'far'):
  81. active_start_delay = active_period.dtstart - now
  82. update_coro = partial(self._update_event_status, ven_id, event, 'active')
  83. loop.create_task(utils.delayed_call(func=update_coro, delay=active_start_delay))
  84. # Schedule status update to 'completed'
  85. if event.event_descriptor.event_status in ('near', 'far', 'active'):
  86. active_end_delay = active_period.dtstart + active_period.duration - now
  87. update_coro = partial(self._update_event_status, ven_id, event, 'completed')
  88. loop.create_task(utils.delayed_call(func=update_coro, delay=active_end_delay))
  89. elif event_response['event_id'] in self.running_events:
  90. event, callback = self.running_events.pop(event_id)
  91. result = callback(ven_id=ven_id, event_id=event_id, opt_type=opt_type)
  92. if asyncio.iscoroutine(result):
  93. result = await result
  94. else:
  95. result = self.on_created_event(ven_id=ven_id, event_id=event_id, opt_type=opt_type)
  96. if asyncio.iscoroutine(result):
  97. result = await(result)
  98. return 'oadrResponse', {}
  99. def on_created_event(self, ven_id, event_id, opt_type):
  100. """
  101. Placeholder for the on_created_event handler.
  102. """
  103. logger.warning("You should implement and register you own on_created_event handler "
  104. "to receive the opt status for an Event that you sent to the VEN. This "
  105. "handler will receive a ven_id, event_id and opt_status. "
  106. "You don't need to return anything from this handler.")
  107. return None
  108. def _update_event_status(self, ven_id, event, event_status):
  109. """
  110. Update the event to the given Status.
  111. """
  112. event.event_descriptor.event_status = event_status
  113. if event_status == enums.EVENT_STATUS.CANCELLED:
  114. event.event_descriptor.modification_number += 1
  115. self.message_queues[ven_id].put_nowait(event)