client.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. #!/Users/stan/Development/ElaadNL/pyopenadr/.python/bin/python3
  2. import xmltodict
  3. import random
  4. import requests
  5. from jinja2 import Environment, PackageLoader, select_autoescape
  6. from pyopenadr.utils import parse_message, create_message, new_request_id, peek
  7. from http import HTTPStatus
  8. from apscheduler.schedulers.asyncio import AsyncIOScheduler
  9. import asyncio
  10. from asyncio import iscoroutine
  11. class OpenADRClient:
  12. def __init__(self, ven_name, vtn_url):
  13. self.ven_name = ven_name
  14. self.vtn_url = vtn_url
  15. self.ven_id = None
  16. self.poll_frequency = None
  17. def run(self):
  18. """
  19. Run the client in full-auto mode.
  20. """
  21. if not hasattr(self, 'on_event') or not hasattr(self, 'on_report'):
  22. raise NotImplementedError("You must implement both the on_event and and_report functions or coroutines.")
  23. self.create_party_registration()
  24. # Set up automatic polling
  25. self.scheduler = AsyncIOScheduler()
  26. if self.poll_frequency.total_seconds() < 60:
  27. cron_second = f"*/{self.poll_frequency.seconds}"
  28. cron_minute = "*"
  29. cron_hour = "*"
  30. elif self.poll_frequency.total_seconds() < 3600:
  31. cron_second = "0"
  32. cron_minute = f'*/{int(self.poll_frequency.total_seconds() / 60)}'
  33. cron_hour = "*"
  34. elif self.poll_frequency.total_seconds() < 86400:
  35. cron_second = "0"
  36. cron_minute = "0"
  37. cron_hour = f'*/{int(self.poll_frequency.total_seconds() / 3600)}'
  38. elif self.poll_frequency.total_seconds() > 86400:
  39. print("Polling with intervals of more than 24 hours is not supported.")
  40. return
  41. self.scheduler.add_job(self._poll, trigger='cron', second=cron_second, minute=cron_minute, hour=cron_hour)
  42. self.scheduler.start()
  43. def query_registration(self):
  44. """
  45. Request information about the VTN.
  46. """
  47. request_id = new_request_id()
  48. service = 'EiRegisterParty'
  49. message = create_message('oadrQueryRegistration', request_id=request_id)
  50. response_type, response_payload = self._perform_request(service, message)
  51. return response_type, response_payload
  52. def create_party_registration(self, http_pull_model=True, xml_signature=False,
  53. report_only=False, profile_name='2.0b',
  54. transport_name='simpleHttp', transport_address=None, ven_id=None):
  55. request_id = new_request_id()
  56. service = 'EiRegisterParty'
  57. payload = {'ven_name': self.ven_name,
  58. 'http_pull_model': http_pull_model,
  59. 'xml_signature': xml_signature,
  60. 'report_only': report_only,
  61. 'profile_name': profile_name,
  62. 'transport_name': transport_name,
  63. 'transport_address': transport_address}
  64. if ven_id:
  65. payload['ven_id'] = ven_id
  66. message = create_message('oadrCreatePartyRegistration', request_id=new_request_id(), **payload)
  67. response_type, response_payload = self._perform_request(service, message)
  68. self.ven_id = response_payload['ven_id']
  69. self.poll_frequency = response_payload['requested_oadr_poll_freq']
  70. print(f"VEN is now registered with ID {self.ven_id}")
  71. print(f"The polling frequency is {self.poll_frequency}")
  72. return response_type, response_payload
  73. def cancel_party_registration(self):
  74. raise NotImplementedError("Cancel Registration is not yet implemented")
  75. def request_event(self, reply_limit=1):
  76. """
  77. Request the next Event from the VTN, if it has any.
  78. """
  79. payload = {'request_id': new_request_id(),
  80. 'ven_id': self.ven_id,
  81. 'reply_limit': reply_limit}
  82. message = create_message('oadrRequestEvent', **payload)
  83. service = 'EiEvent'
  84. response_type, response_payload = self._perform_request(service, message)
  85. return response_type, response_payload
  86. def created_event(self, request_id, event_id, opt_type, modification_number=1):
  87. """
  88. Inform the VTN that we created an event.
  89. """
  90. service = 'EiEvent'
  91. payload = {'ven_id': self.ven_id,
  92. 'response': {'response_code': 200,
  93. 'response_description': 'OK',
  94. 'request_id': request_id},
  95. 'event_responses': [{'response_code': 200,
  96. 'response_description': 'OK',
  97. 'request_id': request_id,
  98. 'qualified_event_id': {'event_id': event_id,
  99. 'modification_number': modification_number},
  100. 'opt_type': opt_type}]}
  101. message = create_message('oadrCreatedEvent', **payload)
  102. response_type, response_payload = self._perform_request(service, message)
  103. return response_type, response_payload
  104. def register_report(self):
  105. """
  106. Tell the VTN about our reporting capabilities.
  107. """
  108. raise NotImplementedError("Reporting is not yet implemented")
  109. def poll(self):
  110. service = 'OadrPoll'
  111. message = create_message('OadrPoll', ven_id=self.ven_id)
  112. response_type, response_payload = self._perform_request(service, message)
  113. return response_type, response_payload
  114. def _perform_request(self, service, message):
  115. print(f"Sending {message}")
  116. url = f"{self.vtn_url}/{service}"
  117. r = requests.post(url,
  118. data=message)
  119. if r.status_code != HTTPStatus.OK:
  120. raise Exception(f"Received non-OK status in request: {r.status_code}")
  121. print(r.content.decode('utf-8'))
  122. return parse_message(r.content)
  123. async def _on_event(self, message):
  124. print("ON_EVENT")
  125. result = self.on_event(message)
  126. if iscoroutine(result):
  127. result = await result
  128. print(f"Now responding with {result}")
  129. request_id = message['request_id']
  130. event_id = message['events'][0]['event_descriptor']['event_id']
  131. self.created_event(request_id, event_id, result)
  132. return
  133. async def _on_report(self, message):
  134. result = self.on_report(message)
  135. if iscoroutine(result):
  136. result = await result
  137. return result
  138. async def _poll(self):
  139. response_type, response_payload = self.poll()
  140. if response_type == 'oadrResponse':
  141. print("No events or reports available")
  142. return
  143. if response_type == 'oadrRequestReregistration':
  144. result = self.create_party_registration()
  145. if response_type == 'oadrDistributeEvent':
  146. result = await self._on_event(response_payload)
  147. elif response_type == 'oadrUpdateReport':
  148. result = await self._on_report(response_payload)
  149. else:
  150. print(f"No handler implemented for message type {response_type}, ignoring.")
  151. await self._poll()