event_service.py 3.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. # SPDX-License-Identifier: Apache-2.0
  2. # Copyright 2020 Contributors to OpenLEADR
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. # Unless required by applicable law or agreed to in writing, software
  8. # distributed under the License is distributed on an "AS IS" BASIS,
  9. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. # See the License for the specific language governing permissions and
  11. # limitations under the License.
  12. from . import service, handler, VTNService
  13. from asyncio import iscoroutine
  14. from .. import objects
  15. import logging
  16. logger = logging.getLogger('openleadr')
  17. @service('EiEvent')
  18. class EventService(VTNService):
  19. def __init__(self, vtn_id, polling_method='internal', message_queues=None):
  20. super().__init__(vtn_id)
  21. self.polling_method = polling_method
  22. self.message_queues = message_queues
  23. self.pending_events = {} # Holds the event callbacks
  24. @handler('oadrRequestEvent')
  25. async def request_event(self, payload):
  26. """
  27. The VEN requests us to send any events we have.
  28. """
  29. result = self.on_request_event(payload['ven_id'])
  30. if iscoroutine(result):
  31. result = await result
  32. if result is None:
  33. return 'oadrDistributeEvent', {'events': []}
  34. if isinstance(result, dict):
  35. return 'oadrDistributeEvent', {'events': [result]}
  36. if isinstance(result, objects.Event):
  37. return 'oadrDistributeEvent', {'events': [result]}
  38. if isinstance(result, list):
  39. return 'oadrDistributeEvent', {'events': result}
  40. else:
  41. raise TypeError("Event handler should return None, a dict or a list")
  42. def on_request_event(self, ven_id):
  43. """
  44. Placeholder for the on_request_event handler.
  45. """
  46. logger.warning("You should implement and register your own on_request_event handler "
  47. "that returns the next event for a VEN. This handler will receive a "
  48. "ven_id as its only argument, and should return None (if no events are "
  49. "available), a single Event, or a list of Events.")
  50. return None
  51. @handler('oadrCreatedEvent')
  52. async def created_event(self, payload):
  53. """
  54. The VEN informs us that they created an EiEvent.
  55. """
  56. if self.polling_method == 'internal':
  57. for event_response in payload['event_responses']:
  58. if event_response['event_id'] in self.pending_events:
  59. ven_id = payload['ven_id']
  60. event_id = event_response['event_id']
  61. opt_type = event_response['opt_type']
  62. callback = self.pending_events.pop(event_id)
  63. result = callback(ven_id=ven_id, event_id=event_id, opt_type=opt_type)
  64. if iscoroutine(result):
  65. result = await result
  66. else:
  67. result = self.on_created_event(ven_id=ven_id, event_id=event_id, opt_type=opt_type)
  68. if iscoroutine(result):
  69. result = await(result)
  70. return 'oadrResponse', {}
  71. def on_created_event(self, ven_id, event_id, opt_type):
  72. """
  73. Placeholder for the on_created_event handler.
  74. """
  75. logger.warning("You should implement and register you own on_created_event handler "
  76. "to receive the opt status for an Event that you sent to the VEN. This "
  77. "handler will receive a ven_id, event_id and opt_status. "
  78. "You don't need to return anything from this handler.")
  79. return None