event_service.py 1.3 KB

123456789101112131415161718192021222324252627282930313233343536
  1. from . import api, handler, VTNService
  2. from datetime import datetime, timedelta, timezone
  3. from asyncio import iscoroutine
  4. @api.route('/OpenADR2/Simple/2.0b/EiEvent')
  5. class EventService(VTNService):
  6. @handler('oadrRequestEvent')
  7. async def request_event(self, payload):
  8. """
  9. The VEN requests us to send any events we have.
  10. """
  11. # TODO: hook into some backend here to retrieve the appropriate events for this VEN.
  12. try:
  13. result = self.on_request_event(payload['ven_id'])
  14. if iscoroutine(result):
  15. result = await result
  16. except OpenADRError as err:
  17. response_type = 'oadrResponse'
  18. response_payload = {'request_id': payload['request_id'],
  19. 'response_code': err.status,
  20. 'response_description': err.description,
  21. 'ven_id': payload['ven_id']}
  22. return response_type, response_payload
  23. else:
  24. return result
  25. @handler('oadrCreatedEvent')
  26. async def created_event(self, payload):
  27. """
  28. The VEN informs us that they created an EiEvent.
  29. """
  30. result = self.on_created_event(payload)
  31. if iscoroutine(result):
  32. result = await(result)
  33. return result