server.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. # SPDX-License-Identifier: Apache-2.0
  2. # Copyright 2020 Contributors to OpenLEADR
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. # Unless required by applicable law or agreed to in writing, software
  8. # distributed under the License is distributed on an "AS IS" BASIS,
  9. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. # See the License for the specific language governing permissions and
  11. # limitations under the License.
  12. import asyncio
  13. from aiohttp import web
  14. from openleadr.service import EventService, PollService, RegistrationService, ReportService, \
  15. OptService, VTNService
  16. from openleadr.messaging import create_message
  17. from openleadr import objects
  18. from openleadr import utils
  19. from functools import partial
  20. from datetime import datetime, timedelta, timezone
  21. import logging
  22. import ssl
  23. import re
  24. logger = logging.getLogger('openleadr')
  25. class OpenADRServer:
  26. _MAP = {'on_created_event': 'event_service',
  27. 'on_request_event': 'event_service',
  28. 'on_register_report': 'report_service',
  29. 'on_create_report': 'report_service',
  30. 'on_created_report': 'report_service',
  31. 'on_request_report': 'report_service',
  32. 'on_update_report': 'report_service',
  33. 'on_poll': 'poll_service',
  34. 'on_query_registration': 'registration_service',
  35. 'on_create_party_registration': 'registration_service',
  36. 'on_cancel_party_registration': 'registration_service'}
  37. def __init__(self, vtn_id, cert=None, key=None, passphrase=None, fingerprint_lookup=None,
  38. show_fingerprint=True, http_port=8080, http_host='127.0.0.1', http_cert=None,
  39. http_key=None, http_key_passphrase=None, http_path_prefix='/OpenADR2/Simple/2.0b',
  40. requested_poll_freq=timedelta(seconds=10), http_ca_file=None):
  41. """
  42. Create a new OpenADR VTN (Server).
  43. :param vtn_id string: An identifier string for this VTN. This is how you identify yourself
  44. to the VENs that talk to you.
  45. :param cert string: Path to the PEM-formatted certificate file that is used to sign outgoing
  46. messages
  47. :param key string: Path to the PEM-formatted private key file that is used to sign outgoing
  48. messages
  49. :param passphrase string: The passphrase used to decrypt the private key file
  50. :param fingerprint_lookup callable: A callable that receives a ven_id and should return the
  51. registered fingerprint for that VEN. You should receive
  52. these fingerprints outside of OpenADR and configure them
  53. manually.
  54. :param show_fingerprint boolean: Whether to print the fingerprint to your stdout on startup.
  55. Defaults to True.
  56. :param http_port integer: The port that the web server is exposed on (default: 8080)
  57. :param http_host str: The host or IP address to bind the server to (default: 127.0.0.1).
  58. :param http_cert str: The path to the PEM certificate for securing HTTP traffic.
  59. :param http_key str: The path to the PEM private key for securing HTTP traffic.
  60. :param http_ca_file str: The path to the CA-file that client certificates are checked against.
  61. :param http_key_passphrase str: The passphrase for the HTTP private key.
  62. """
  63. # Set up the message queues
  64. self.message_queues = {}
  65. self.app = web.Application()
  66. self.services = {'event_service': EventService(vtn_id, message_queues=self.message_queues),
  67. 'report_service': ReportService(vtn_id, message_queues=self.message_queues),
  68. 'poll_service': PollService(vtn_id, message_queues=self.message_queues),
  69. 'opt_service': OptService(vtn_id),
  70. 'registration_service': RegistrationService(vtn_id,
  71. poll_freq=requested_poll_freq)}
  72. if http_path_prefix[-1] == "/":
  73. http_path_prefix = http_path_prefix[:-1]
  74. self.app.add_routes([web.post(f"{http_path_prefix}/{s.__service_name__}", s.handler)
  75. for s in self.services.values()])
  76. self.http_port = http_port
  77. self.http_host = http_host
  78. self.http_path_prefix = http_path_prefix
  79. # Create SSL context for running the server
  80. if http_cert and http_key:
  81. self.ssl_context = ssl.create_default_context(cafile=http_ca_file,
  82. purpose=ssl.Purpose.CLIENT_AUTH)
  83. self.ssl_context.verify_mode = ssl.CERT_REQUIRED
  84. self.ssl_context.load_cert_chain(http_cert, http_key, http_key_passphrase)
  85. else:
  86. self.ssl_context = None
  87. # Configure message signing
  88. if cert and key:
  89. with open(cert, "rb") as file:
  90. cert = file.read()
  91. with open(key, "rb") as file:
  92. key = file.read()
  93. if show_fingerprint:
  94. print("")
  95. print("*" * 80)
  96. print("Your VTN Certificate Fingerprint is "
  97. f"{utils.certificate_fingerprint(cert)}".center(80))
  98. print("Please deliver this fingerprint to the VENs that connect to you.".center(80))
  99. print("You do not need to keep this a secret.".center(80))
  100. print("*" * 80)
  101. print("")
  102. VTNService._create_message = partial(create_message, cert=cert, key=key,
  103. passphrase=passphrase)
  104. VTNService.fingerprint_lookup = staticmethod(fingerprint_lookup)
  105. self.__setattr__ = self.add_handler
  106. def run(self):
  107. """
  108. Starts the asyncio-loop and runs the server in it. This function is
  109. blocking. For other ways to run the server in a more flexible context,
  110. please refer to the `aiohttp documentation
  111. <https://docs.aiohttp.org/en/stable/web_advanced.html#aiohttp-web-app-runners>`_.
  112. """
  113. web.run_app(self.app)
  114. async def run_async(self):
  115. """
  116. Starts the server in an already-running asyncio loop.
  117. """
  118. self.app_runner = web.AppRunner(self.app)
  119. await self.app_runner.setup()
  120. site = web.TCPSite(self.app_runner,
  121. port=self.http_port,
  122. host=self.http_host,
  123. ssl_context=self.ssl_context)
  124. await site.start()
  125. protocol = 'https' if self.ssl_context else 'http'
  126. print("")
  127. print("*" * 80)
  128. print("Your VTN Server is now running at ".center(80))
  129. print(f"{protocol}://{self.http_host}:{self.http_port}{self.http_path_prefix}".center(80))
  130. print("*" * 80)
  131. print("")
  132. async def stop(self):
  133. await self.app_runner.cleanup()
  134. def add_event(self, ven_id, signal_name, signal_type, intervals, callback, targets=None,
  135. targets_by_type=None, target=None, market_context="oadr://unknown.context"):
  136. """
  137. Convenience method to add an event with a single signal.
  138. :param str ven_id: The ven_id to whom this event must be delivered.
  139. :param str signal_name: The OpenADR name of the signal; one of openleadr.objects.SIGNAL_NAME
  140. :param str signal_type: The OpenADR type of the signal; one of openleadr.objects.SIGNAL_TYPE
  141. :param str intervals: A list of intervals with a dtstart, duration and payload member.
  142. :param str callback: A callback function for when your event has been accepted (optIn) or refused (optOut).
  143. :param list targets: A list of Targets that this Event applies to.
  144. :param target: A single target for this event.
  145. :param dict targets_by_type: A dict of targets, grouped by type.
  146. :param str market_context: A URI for the DR program that this event belongs to.
  147. If you don't provide a target using any of the three arguments, the target will be set to the given ven_id.
  148. """
  149. if self.services['event_service'].polling_method == 'external':
  150. logger.error("You cannot use the add_event method after you assign your own on_poll "
  151. "handler. If you use your own on_poll handler, you are responsible for "
  152. "delivering events from that handler. If you want to use OpenLEADRs "
  153. "message queuing system, you should not assign an on_poll handler. "
  154. "Your Event will NOT be added.")
  155. return
  156. if not re.match(r"^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?", market_context):
  157. raise ValueError("The Market Context must be a valid URI.")
  158. event_id = utils.generate_id()
  159. # Figure out the target for this Event
  160. if target is None and targets is None and targets_by_type is None:
  161. targets = [{'ven_id': ven_id}]
  162. elif target is not None:
  163. targets = [target]
  164. elif targets_by_type is not None:
  165. targets = utils.ungroup_targets_by_type(targets_by_type)
  166. if not isinstance(targets, list):
  167. targets = [targets]
  168. event_descriptor = objects.EventDescriptor(event_id=event_id,
  169. modification_number=0,
  170. market_context=market_context,
  171. event_status="near",
  172. created_date_time=datetime.now(timezone.utc))
  173. event_signal = objects.EventSignal(intervals=intervals,
  174. signal_name=signal_name,
  175. signal_type=signal_type,
  176. signal_id=utils.generate_id(),
  177. targets=targets)
  178. event = objects.Event(event_descriptor=event_descriptor,
  179. event_signals=[event_signal],
  180. targets=targets)
  181. if ven_id not in self.message_queues:
  182. self.message_queues[ven_id] = asyncio.Queue()
  183. self.message_queues[ven_id].put_nowait(event)
  184. self.services['event_service'].pending_events[event_id] = callback
  185. def add_raw_event(self, ven_id, event):
  186. """
  187. Add a new event to the queue for a specific VEN.
  188. :param str ven_id: The ven_id to which this event should be distributed.
  189. :param dict event: The event (as a dict or as a objects.Event instance)
  190. that contains the event details.
  191. """
  192. if ven_id not in self.message_queues:
  193. self.message_queues[ven_id] = asyncio.Queue()
  194. self.message_queues[ven_id].put_nowait(event)
  195. async def request_report(self):
  196. """
  197. Request a report from the client.
  198. """
  199. def add_handler(self, name, func):
  200. """
  201. Add a handler to the OpenADRServer.
  202. :param name string: The name for this handler. Should be one of: on_created_event,
  203. on_request_event, on_register_report, on_create_report,
  204. on_created_report, on_request_report, on_update_report, on_poll,
  205. on_query_registration, on_create_party_registration,
  206. on_cancel_party_registration.
  207. :param func coroutine: A coroutine that handles this event. It receives the message, and
  208. should return the contents of a response.
  209. """
  210. logger.debug(f"Adding handler: {name} {func}")
  211. if name in self._MAP:
  212. setattr(self.services[self._MAP[name]], name, func)
  213. if name == 'on_poll':
  214. self.services['poll_service'].polling_method = 'external'
  215. self.services['event_service'].polling_method = 'external'
  216. else:
  217. raise NameError(f"Unknown handler {name}. "
  218. f"Correct handler names are: {self._MAP.keys()}")