poll_service.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. # SPDX-License-Identifier: Apache-2.0
  2. # Copyright 2020 Contributors to OpenLEADR
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. # Unless required by applicable law or agreed to in writing, software
  8. # distributed under the License is distributed on an "AS IS" BASIS,
  9. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. # See the License for the specific language governing permissions and
  11. # limitations under the License.
  12. from openleadr.service import service, handler, VTNService
  13. from openleadr import objects
  14. import asyncio
  15. from dataclasses import asdict
  16. import logging
  17. logger = logging.getLogger('openleadr')
  18. # ╔══════════════════════════════════════════════════════════════════════════╗
  19. # ║ POLLING SERVICE ║
  20. # ╚══════════════════════════════════════════════════════════════════════════╝
  21. #
  22. # oadrPoll is a service independent polling mechanism used by VENs in a PULL
  23. # model to request pending service operations from the VTN. The VEN queries
  24. # the poll endpoint and the VTN re- sponds with the same message that it would
  25. # have “pushed” had it been a PUSH VEN. If there are multiple messages pending
  26. # a “push,” the VEN will continue to query the poll endpoint until there are
  27. # no new messages and the VTN responds with an eiResponse payload.
  28. #
  29. # ┌──────────────────────────────────────────────────────────────────────────┐
  30. # │ The VEN can poll for any messages that we have for them. If we have no │
  31. # │ (more) messages, we send a generic oadrResponse: │
  32. # │ ┌────┐ ┌────┐ │
  33. # │ │VEN │ │VTN │ │
  34. # │ └─┬──┘ └─┬──┘ │
  35. # │ │───────────────────────────oadrPoll()───────────────────────────▶│ │
  36. # │ │ │ │
  37. # │ │◀ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─oadrResponse() ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│ │
  38. # │ │ │ │
  39. # │ │
  40. # └──────────────────────────────────────────────────────────────────────────┘
  41. # ┌──────────────────────────────────────────────────────────────────────────┐
  42. # │ If we have an Event, we expect the following: │
  43. # │ │
  44. # │ ┌────┐ ┌────┐ │
  45. # │ │VEN │ │VTN │ │
  46. # │ └─┬──┘ └─┬──┘ │
  47. # │ │───────────────────────────oadrPoll()───────────────────────────▶│ │
  48. # │ │ │ │
  49. # │ │◀ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ oadrCreateEvent() ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│ │
  50. # │ │ │ │
  51. # │ │───────────────────────oadrCreatedEvent()───────────────────────▶│ │
  52. # │ │ │ │
  53. # │ │◀ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─oadrResponse() ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│ │
  54. # │ │ │ │
  55. # │ │
  56. # └──────────────────────────────────────────────────────────────────────────┘
  57. # ┌──────────────────────────────────────────────────────────────────────────┐
  58. # │ For Reports: │
  59. # │ │
  60. # │ ┌────┐ ┌────┐ │
  61. # │ │VEN │ │VTN │ │
  62. # │ └─┬──┘ └─┬──┘ │
  63. # │ │───────────────────────────oadrPoll()───────────────────────────▶│ │
  64. # │ │ │ │
  65. # │ │◀ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─oadrCreateReport() ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│ │
  66. # │ │ │ │
  67. # │ │───────────────────────oadrCreatedReport()──────────────────────▶│ │
  68. # │ │ │ │
  69. # │ │◀ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─oadrResponse() ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│ │
  70. # │ │ │ │
  71. # │ │
  72. # └──────────────────────────────────────────────────────────────────────────┘
  73. # ┌──────────────────────────────────────────────────────────────────────────┐
  74. # │ If re-registration is neccessary: │
  75. # │ │
  76. # │ ┌────┐ ┌────┐ │
  77. # │ │VEN │ │VTN │ │
  78. # │ └─┬──┘ └─┬──┘ │
  79. # │ │───────────────────────────oadrPoll()───────────────────────────▶│ │
  80. # │ │ │ │
  81. # │ │◀ ─ ─ ─ ─ ─ ─ ─ ─ ─oadrRequestReregistration()─ ─ ─ ─ ─ ─ ─ ─ ─ ─│ │
  82. # │ │ │ │
  83. # │ │─────────────────────────oadrResponse()─────────────────────────▶│ │
  84. # │ │ │ │
  85. # │ │◀ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ HTTP 200─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│ │
  86. # │ │ │ │
  87. # │ │
  88. # │ │──────────────────oadrCreatePartyRegistration()─────────────────▶│ │
  89. # │ │ │ │
  90. # │ │◀ ─ ─ ─ ─ ─ ─ ─ ─ ─oadrRequestReregistration()─ ─ ─ ─ ─ ─ ─ ─ ─ ─│ │
  91. # │ │ │ │
  92. # │ │
  93. # └──────────────────────────────────────────────────────────────────────────┘
  94. @service('OadrPoll')
  95. class PollService(VTNService):
  96. def __init__(self, vtn_id, polling_method='internal', message_queues=None):
  97. super().__init__(vtn_id)
  98. self.polling_method = polling_method
  99. self.message_queues = message_queues
  100. @handler('oadrPoll')
  101. async def poll(self, payload):
  102. """
  103. Handle the request to the oadrPoll service. This either calls a previously registered
  104. `on_poll` handler, or it retrieves the next message from the internal queue.
  105. """
  106. if self.polling_method == 'external':
  107. result = self.on_poll(ven_id=payload['ven_id'])
  108. elif payload['ven_id'] in self.message_queues:
  109. try:
  110. result = self.message_queues[payload['ven_id']].get_nowait()
  111. except asyncio.QueueEmpty:
  112. return 'oadrResponse', {}
  113. else:
  114. return 'oadrResponse', {}
  115. if asyncio.iscoroutine(result):
  116. result = await result
  117. if result is None:
  118. return 'oadrResponse', {}
  119. if isinstance(result, tuple):
  120. return result
  121. if isinstance(result, list):
  122. return 'oadrDistributeEvent', result
  123. if isinstance(result, dict) and 'event_descriptor' in result:
  124. return 'oadrDistributeEvent', {'events': [result]}
  125. if isinstance(result, objects.Event):
  126. return 'oadrDistributeEvent', {'events': [asdict(result)]}
  127. logger.warning(f"Could not determine type of message in response to oadrPoll: {result}")
  128. return 'oadrResponse', result
  129. def on_poll(self, ven_id):
  130. """
  131. Placeholder for the on_poll handler.
  132. """
  133. logger.warning("You should implement and register your own on_poll handler that "
  134. "returns the next message for the VEN. This handler receives the "
  135. "ven_id as its argument, and should return None (if no messages "
  136. "are available), an Event or list of Events, a RequestReregistration "
  137. " or RequestReport.")
  138. return None