server.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. # SPDX-License-Identifier: Apache-2.0
  2. # Copyright 2020 Contributors to OpenLEADR
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. # Unless required by applicable law or agreed to in writing, software
  8. # distributed under the License is distributed on an "AS IS" BASIS,
  9. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. # See the License for the specific language governing permissions and
  11. # limitations under the License.
  12. from aiohttp import web
  13. from openleadr.service import EventService, PollService, RegistrationService, ReportService, \
  14. VTNService
  15. from openleadr.messaging import create_message
  16. from openleadr import objects
  17. from openleadr import utils
  18. from functools import partial
  19. from datetime import datetime, timedelta, timezone
  20. import asyncio
  21. import inspect
  22. import logging
  23. import ssl
  24. import re
  25. import sys
  26. logger = logging.getLogger('openleadr')
  27. class OpenADRServer:
  28. _MAP = {'on_created_event': 'event_service',
  29. 'on_request_event': 'event_service',
  30. 'on_register_report': 'report_service',
  31. 'on_create_report': 'report_service',
  32. 'on_created_report': 'report_service',
  33. 'on_request_report': 'report_service',
  34. 'on_update_report': 'report_service',
  35. 'on_poll': 'poll_service',
  36. 'on_query_registration': 'registration_service',
  37. 'on_create_party_registration': 'registration_service',
  38. 'on_cancel_party_registration': 'registration_service'}
  39. def __init__(self, vtn_id, cert=None, key=None, passphrase=None, fingerprint_lookup=None,
  40. show_fingerprint=True, http_port=8080, http_host='127.0.0.1', http_cert=None,
  41. http_key=None, http_key_passphrase=None, http_path_prefix='/OpenADR2/Simple/2.0b',
  42. requested_poll_freq=timedelta(seconds=10), http_ca_file=None):
  43. """
  44. Create a new OpenADR VTN (Server).
  45. :param str vtn_id: An identifier string for this VTN. This is how you identify yourself
  46. to the VENs that talk to you.
  47. :param str cert: Path to the PEM-formatted certificate file that is used to sign outgoing
  48. messages
  49. :param str key: Path to the PEM-formatted private key file that is used to sign outgoing
  50. messages
  51. :param str passphrase: The passphrase used to decrypt the private key file
  52. :param callable fingerprint_lookup: A callable that receives a ven_id and should return the
  53. registered fingerprint for that VEN. You should receive
  54. these fingerprints outside of OpenADR and configure them
  55. manually.
  56. :param bool show_fingerprint: Whether to print the fingerprint to your stdout on startup.
  57. Defaults to True.
  58. :param int http_port: The port that the web server is exposed on (default: 8080)
  59. :param str http_host: The host or IP address to bind the server to (default: 127.0.0.1).
  60. :param str http_cert: The path to the PEM certificate for securing HTTP traffic.
  61. :param str http_key: The path to the PEM private key for securing HTTP traffic.
  62. :param str http_ca_file: The path to the CA-file that client certificates are checked against.
  63. :param str http_key_passphrase: The passphrase for the HTTP private key.
  64. """
  65. # Set up the message queues
  66. self.app = web.Application()
  67. self.services = {}
  68. self.services['event_service'] = EventService(vtn_id)
  69. self.services['report_service'] = ReportService(vtn_id)
  70. self.services['poll_service'] = PollService(vtn_id)
  71. self.services['registration_service'] = RegistrationService(vtn_id, poll_freq=requested_poll_freq)
  72. # Register the other services with the poll service
  73. self.services['poll_service'].event_service = self.services['event_service']
  74. self.services['poll_service'].report_service = self.services['report_service']
  75. # Set up the HTTP handlers for the services
  76. if http_path_prefix[-1] == "/":
  77. http_path_prefix = http_path_prefix[:-1]
  78. self.app.add_routes([web.post(f"{http_path_prefix}/{s.__service_name__}", s.handler)
  79. for s in self.services.values()])
  80. # Configure the web server
  81. self.http_port = http_port
  82. self.http_host = http_host
  83. self.http_path_prefix = http_path_prefix
  84. # Create SSL context for running the server
  85. if http_cert and http_key:
  86. self.ssl_context = ssl.create_default_context(cafile=http_ca_file,
  87. purpose=ssl.Purpose.CLIENT_AUTH)
  88. self.ssl_context.verify_mode = ssl.CERT_REQUIRED
  89. self.ssl_context.load_cert_chain(http_cert, http_key, http_key_passphrase)
  90. else:
  91. self.ssl_context = None
  92. # Configure message signing
  93. if cert and key:
  94. with open(cert, "rb") as file:
  95. cert = file.read()
  96. with open(key, "rb") as file:
  97. key = file.read()
  98. if show_fingerprint:
  99. print("")
  100. print("*" * 80)
  101. print("Your VTN Certificate Fingerprint is "
  102. f"{utils.certificate_fingerprint(cert)}".center(80))
  103. print("Please deliver this fingerprint to the VENs that connect to you.".center(80))
  104. print("You do not need to keep this a secret.".center(80))
  105. print("*" * 80)
  106. print("")
  107. VTNService._create_message = partial(create_message, cert=cert, key=key,
  108. passphrase=passphrase)
  109. VTNService.fingerprint_lookup = staticmethod(fingerprint_lookup)
  110. self.__setattr__ = self.add_handler
  111. async def run(self):
  112. """
  113. Starts the server in an already-running asyncio loop.
  114. """
  115. self.app_runner = web.AppRunner(self.app)
  116. await self.app_runner.setup()
  117. site = web.TCPSite(self.app_runner,
  118. port=self.http_port,
  119. host=self.http_host,
  120. ssl_context=self.ssl_context)
  121. await site.start()
  122. protocol = 'https' if self.ssl_context else 'http'
  123. print("")
  124. print("*" * 80)
  125. print("Your VTN Server is now running at ".center(80))
  126. print(f"{protocol}://{self.http_host}:{self.http_port}{self.http_path_prefix}".center(80))
  127. print("*" * 80)
  128. print("")
  129. async def run_async(self):
  130. await self.run()
  131. async def stop(self):
  132. if sys.version_info.minor >= 8:
  133. delayed_call_tasks = [task for task in asyncio.all_tasks()
  134. if task.get_name().startswith('DelayedCall')]
  135. for task in delayed_call_tasks:
  136. task.cancel()
  137. await self.app_runner.cleanup()
  138. def add_event(self, ven_id, signal_name, signal_type, intervals, callback=None, event_id=None,
  139. targets=None, targets_by_type=None, target=None, response_required='always',
  140. market_context="oadr://unknown.context", notification_period=None,
  141. ramp_up_period=None, recovery_period=None, signal_target_mrid=None):
  142. """
  143. Convenience method to add an event with a single signal.
  144. :param str ven_id: The ven_id to whom this event must be delivered.
  145. :param str signal_name: The OpenADR name of the signal; one of openleadr.objects.SIGNAL_NAME
  146. :param str signal_type: The OpenADR type of the signal; one of openleadr.objects.SIGNAL_TYPE
  147. :param str intervals: A list of intervals with a dtstart, duration and payload member.
  148. :param str callback: A callback function for when your event has been accepted (optIn) or refused (optOut).
  149. :param list targets: A list of Targets that this Event applies to.
  150. :param target: A single target for this event.
  151. :param dict targets_by_type: A dict of targets, grouped by type.
  152. :param str market_context: A URI for the DR program that this event belongs to.
  153. :param timedelta notification_period: The Notification period for the Event's Active Period.
  154. :param timedelta ramp_up_period: The Ramp Up period for the Event's Active Period.
  155. :param timedelta recovery_period: The Recovery period for the Event's Active Period.
  156. If you don't provide a target using any of the three arguments, the target will be set to the given ven_id.
  157. """
  158. if self.services['event_service'].polling_method == 'external':
  159. logger.error("You cannot use the add_event method after you assign your own on_poll "
  160. "handler. If you use your own on_poll handler, you are responsible for "
  161. "delivering events from that handler. If you want to use OpenLEADRs "
  162. "message queuing system, you should not assign an on_poll handler. "
  163. "Your Event will NOT be added.")
  164. return
  165. if not re.match(r"^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?", market_context):
  166. raise ValueError("The Market Context must be a valid URI.")
  167. event_id = event_id or utils.generate_id()
  168. if response_required not in ('always', 'never'):
  169. raise ValueError("'response_required' should be either 'always' or 'never'; "
  170. f"you provided '{response_required}'.")
  171. # Figure out the target for this Event
  172. if target is None and targets is None and targets_by_type is None:
  173. targets = [{'ven_id': ven_id}]
  174. elif target is not None:
  175. targets = [target]
  176. elif targets_by_type is not None:
  177. targets = utils.ungroup_targets_by_type(targets_by_type)
  178. if not isinstance(targets, list):
  179. targets = [targets]
  180. event_descriptor = objects.EventDescriptor(event_id=event_id,
  181. modification_number=0,
  182. market_context=market_context,
  183. event_status="far",
  184. created_date_time=datetime.now(timezone.utc))
  185. event_signal = objects.EventSignal(intervals=intervals,
  186. signal_name=signal_name,
  187. signal_type=signal_type,
  188. signal_id=utils.generate_id())
  189. # Make sure the intervals carry timezone-aware timestamps
  190. for interval in intervals:
  191. if utils.getmember(interval, 'dtstart').tzinfo is None:
  192. utils.setmember(interval, 'dtstart',
  193. utils.getmember(interval, 'dtstart').astimezone(timezone.utc))
  194. logger.warning("You supplied a naive datetime object to your interval's dtstart. "
  195. "This will be interpreted as a timestamp in your local timezone "
  196. "and then converted to UTC before sending. Please supply timezone-"
  197. "aware timestamps like datetime.datetime.new(timezone.utc) or "
  198. "datetime.datetime(..., tzinfo=datetime.timezone.utc)")
  199. active_period = utils.get_active_period_from_intervals(intervals, False)
  200. active_period.ramp_up_period = ramp_up_period
  201. active_period.notification_period = notification_period
  202. active_period.recovery_period = recovery_period
  203. event = objects.Event(active_period=active_period,
  204. event_descriptor=event_descriptor,
  205. event_signals=[event_signal],
  206. targets=targets,
  207. response_required=response_required)
  208. self.add_raw_event(ven_id=ven_id, event=event, callback=callback)
  209. return event_id
  210. def add_raw_event(self, ven_id, event, callback=None):
  211. """
  212. Add a new event to the queue for a specific VEN.
  213. :param str ven_id: The ven_id to which this event should be distributed.
  214. :param dict event: The event (as a dict or as a objects.Event instance)
  215. that contains the event details.
  216. :param callable callback: A callback that will receive the opt status for this event.
  217. This callback receives ven_id, event_id, opt_type as its arguments.
  218. """
  219. if utils.getmember(event, 'response_required') == 'always':
  220. if callback is None:
  221. logger.warning("You did not provide a 'callback', which means you won't know if the "
  222. "VEN will opt in or opt out of your event. You should consider adding "
  223. "a callback for this.")
  224. elif not asyncio.isfuture(callback):
  225. args = inspect.signature(callback).parameters
  226. if not all(['ven_id' in args, 'event_id' in args, 'opt_type' in args]):
  227. raise ValueError("The 'callback' must have at least the following parameters: "
  228. "'ven_id' (str), 'event_id' (str), 'opt_type' (str). Please fix "
  229. "your 'callback' handler.")
  230. event_id = utils.getmember(utils.getmember(event, 'event_descriptor'), 'event_id')
  231. # Create the event queue if it does not exist yet
  232. if ven_id not in self.events:
  233. self.events[ven_id] = []
  234. # Add event to the queue
  235. self.events[ven_id].append(event)
  236. self.events_updated[ven_id] = True
  237. # Add the callback for the response to this event
  238. if callback is not None:
  239. self.event_callbacks[event_id] = (event, callback)
  240. return event_id
  241. def add_handler(self, name, func):
  242. """
  243. Add a handler to the OpenADRServer.
  244. :param str name: The name for this handler. Should be one of: on_created_event,
  245. on_request_event, on_register_report, on_create_report,
  246. on_created_report, on_request_report, on_update_report, on_poll,
  247. on_query_registration, on_create_party_registration,
  248. on_cancel_party_registration.
  249. :param callable func: A function or coroutine that handles this type of occurrence.
  250. It receives the message, and should return the contents of a response.
  251. """
  252. logger.debug(f"Adding handler: {name} {func}")
  253. if name in self._MAP:
  254. setattr(self.services[self._MAP[name]], name, func)
  255. if name == 'on_poll':
  256. self.services['poll_service'].polling_method = 'external'
  257. self.services['event_service'].polling_method = 'external'
  258. else:
  259. raise NameError(f"""Unknown handler '{name}'. """
  260. f"""Correct handler names are: '{"', '".join(self._MAP.keys())}'.""")
  261. @property
  262. def registered_reports(self):
  263. return self.services['report_service'].registered_reports
  264. @property
  265. def events(self):
  266. return self.services['event_service'].events
  267. @property
  268. def events_updated(self):
  269. return self.services['poll_service'].events_updated
  270. @property
  271. def event_callbacks(self):
  272. return self.services['event_service'].event_callbacks