event_service.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. # SPDX-License-Identifier: Apache-2.0
  2. # Copyright 2020 Contributors to OpenLEADR
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. # Unless required by applicable law or agreed to in writing, software
  8. # distributed under the License is distributed on an "AS IS" BASIS,
  9. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. # See the License for the specific language governing permissions and
  11. # limitations under the License.
  12. from . import service, handler, VTNService
  13. import asyncio
  14. from openleadr import utils, errors
  15. import logging
  16. logger = logging.getLogger('openleadr')
  17. @service('EiEvent')
  18. class EventService(VTNService):
  19. def __init__(self, vtn_id, polling_method='internal'):
  20. super().__init__(vtn_id)
  21. self.polling_method = polling_method
  22. self.events = {}
  23. self.completed_event_ids = {} # Holds the ids of completed events
  24. self.event_callbacks = {}
  25. self.event_opt_types = {}
  26. @handler('oadrRequestEvent')
  27. async def request_event(self, payload):
  28. """
  29. The VEN requests us to send any events we have.
  30. """
  31. ven_id = payload['ven_id']
  32. if self.polling_method == 'internal':
  33. if ven_id in self.events and self.events[ven_id]:
  34. events = utils.order_events(self.events[ven_id])
  35. for event in events:
  36. event_status = utils.getmember(utils.getmember(event, 'event_descriptor'), 'event_status')
  37. # Pop the event from the events so that this is the last time it is communicated
  38. if event_status == 'completed':
  39. self.events[ven_id].pop(self.events[ven_id].index(event))
  40. else:
  41. events = None
  42. else:
  43. result = self.on_request_event(ven_id=payload['ven_id'])
  44. if asyncio.iscoroutine(result):
  45. result = await result
  46. if result is None:
  47. events = None
  48. else:
  49. events = utils.order_events(result)
  50. if events is None:
  51. return 'oadrResponse', {}
  52. else:
  53. return 'oadrDistributeEvent', {'events': events}
  54. return 'oadrResponse', result
  55. def on_request_event(self, ven_id):
  56. """
  57. Placeholder for the on_request_event handler.
  58. """
  59. logger.warning("You should implement and register your own on_request_event handler "
  60. "that returns the next event for a VEN. This handler will receive a "
  61. "ven_id as its only argument, and should return None (if no events are "
  62. "available), a single Event, or a list of Events.")
  63. return None
  64. @handler('oadrCreatedEvent')
  65. async def created_event(self, payload):
  66. """
  67. The VEN informs us that they created an EiEvent.
  68. """
  69. ven_id = payload['ven_id']
  70. if self.polling_method == 'internal':
  71. for event_response in payload['event_responses']:
  72. event_id = event_response['event_id']
  73. opt_type = event_response['opt_type']
  74. if event_id not in [utils.getmember(utils.getmember(event, 'event_descriptor'), 'event_id')
  75. for event in self.events[ven_id]] + self.completed_event_ids.get(ven_id, []):
  76. raise errors.InvalidIdError
  77. if event_response['event_id'] in self.event_callbacks:
  78. event, callback = self.event_callbacks.pop(event_id)
  79. if isinstance(callback, asyncio.Future):
  80. if callback.done():
  81. logger.warning(f"Got a second response '{opt_type}' from ven '{ven_id}' "
  82. f"to event '{event_id}', which we cannot use because the "
  83. "callback future you provided was already completed during "
  84. "the first response.")
  85. else:
  86. callback.set_result(opt_type)
  87. else:
  88. result = callback(ven_id=ven_id, event_id=event_id, opt_type=opt_type)
  89. if asyncio.iscoroutine(result):
  90. result = await result
  91. else:
  92. result = self.on_created_event(ven_id=ven_id, event_id=event_id, opt_type=opt_type)
  93. if asyncio.iscoroutine(result):
  94. result = await(result)
  95. return 'oadrResponse', {}
  96. def on_created_event(self, ven_id, event_id, opt_type):
  97. """
  98. Placeholder for the on_created_event handler.
  99. """
  100. logger.warning("You should implement and register you own on_created_event handler "
  101. "to receive the opt status for an Event that you sent to the VEN. This "
  102. "handler will receive a ven_id, event_id and opt_status. "
  103. "You don't need to return anything from this handler.")
  104. return None