Kaynağa Gözat

Add optional, default-enabled jitter to polling and report sending

This will prevent polling overload at the VTN due to the VENs using synchronized polling.

Signed-off-by: Stan Janssen <stan.janssen@elaad.nl>
Stan Janssen 4 yıl önce
ebeveyn
işleme
c53ccd0b5f
6 değiştirilmiş dosya ile 74 ekleme ve 22 silme
  1. 12 0
      docs/client.rst
  2. 19 0
      docs/roadmap.rst
  3. 29 8
      openleadr/client.py
  4. 9 10
      openleadr/utils.py
  5. 1 1
      test/test_certificates.py
  6. 4 3
      test/test_utils.py

+ 12 - 0
docs/client.rst

@@ -172,3 +172,15 @@ You can validate incoming messages against a public key.
 This will automatically validate check that incoming messages are signed by the private key that belongs to the provided (public) certificate. If validation fails, you will see a Warning emitted, but the message will not be delivered to your handlers, protecting you from malicious messages being processed by your system. The sending party will see an error message returned.
 
 You should use both of the previous examples combined to secure both the incoming and the outgoing messages.
+
+
+.. _client_polling_jitter:
+
+A word on polling
+=================
+
+The OpenADR polling mechanism is very robust; there is very little chance that the client misses an important message. The downside is that there is some wasted bandwith (from polling when no relevant message is available from the VTN), and there is the risk of unnecessary VTN overload if all VENs poll synchronously.
+
+To mitigate the last point, the OpenLEADR VEN will, by default, 'jitter' the pollings by up to +/- 10% or +/- 5 minutes (whichever is smallest). The same goes for delivering the reports (the data collection will still happen on synchronized moments).
+
+If you don't want to jitter the polling requests on your VEN, you can disable this by passing ``allow_jitter=False`` to your ``OpenADRClient`` constructor.

+ 19 - 0
docs/roadmap.rst

@@ -20,6 +20,25 @@ Version Main features                      Target timeframe
 Changelog
 ---------
 
+openleadr 0.5.12
+~~~~~~~~~~~~~~~~
+
+Released: 10 december 2020
+
+New features:
+
+- Events now cycle through the correct 'far', 'near', 'active', 'completed'.
+- The Client now implements the ``on_update_event handler``, so that you can catch these event updates separately from the regular event messages.
+- Added support for the ramp_up_duration parameter on the ``server.add_event`` method.
+
+Bug fixes:
+
+- The OpenADRServer would block ``oadrPoll`` requests when no internal messages were available. This has been corrected.
+- Some left-over ``print()`` statements have been removed.
+- Nonce caching was badly broken in a previous version, this has now been fixed.
+
+
+
 openleadr 0.5.11
 ~~~~~~~~~~~~~~~~
 

+ 29 - 8
openleadr/client.py

@@ -22,6 +22,7 @@ import asyncio
 import inspect
 import logging
 import ssl
+import random
 from datetime import datetime, timedelta, timezone
 from functools import partial
 from http import HTTPStatus
@@ -44,7 +45,8 @@ class OpenADRClient:
     you can always choose to call them manually.
     """
     def __init__(self, ven_name, vtn_url, debug=False, cert=None, key=None,
-                 passphrase=None, vtn_fingerprint=None, show_fingerprint=True, ca_file=None):
+                 passphrase=None, vtn_fingerprint=None, show_fingerprint=True, ca_file=None,
+                 allow_jitter=True):
         """
         Initializes a new OpenADR Client (Virtual End Node)
 
@@ -87,6 +89,7 @@ class OpenADRClient:
         self.key_path = key
         self.passphrase = passphrase
         self.ca_file = ca_file
+        self.allow_jitter = allow_jitter
 
         if cert and key:
             with open(cert, 'rb') as file:
@@ -114,7 +117,7 @@ class OpenADRClient:
         """
         # if not hasattr(self, 'on_event'):
         #     raise NotImplementedError("You must implement on_event.")
-
+        self.loop = asyncio.get_event_loop()
         await self.create_party_registration()
 
         if not self.ven_id:
@@ -124,8 +127,7 @@ class OpenADRClient:
 
         if self.reports:
             await self.register_reports(self.reports)
-            loop = asyncio.get_event_loop()
-            self.report_queue_task = loop.create_task(self._report_queue_worker())
+            self.report_queue_task = self.loop.create_task(self._report_queue_worker())
 
         await self._poll()
 
@@ -134,7 +136,7 @@ class OpenADRClient:
             logger.warning("Polling with intervals of more than 24 hours is not supported. "
                            "Will use 24 hours as the logging interval.")
             self.poll_frequency = timedelta(hours=24)
-        cron_config = utils.cron_config(self.poll_frequency, randomize_seconds=True)
+        cron_config = utils.cron_config(self.poll_frequency, randomize_seconds=self.allow_jitter)
 
         self.scheduler.add_job(self._poll,
                                trigger='cron',
@@ -526,6 +528,11 @@ class OpenADRClient:
                                      'granularity': granularity,
                                      'job': job})
 
+    async def create_single_report(self, report_request):
+        """
+        Create a single report in response to a request from the VTN.
+        """
+
     async def update_report(self, report_request_id):
         """
         Call the previously registered report callback and send the result as a message to the VTN.
@@ -590,13 +597,23 @@ class OpenADRClient:
             expected_len = len(report_request['r_ids']) * int(report_interval / sampling_interval)
             if len(outgoing_report.intervals) == expected_len:
                 logger.info("The report is now complete with all the values. Will queue for sending.")
-                await self.pending_reports.put(self.incomplete_reports.pop(report_request_id))
+                if self.allow_jitter:
+                    delay = random.uniform(0, min(30, report_interval / 2))
+                    self.loop.create_task(utils.delayed_call(func=self.pending_reports.put(outgoing_report),
+                                                             delay=delay))
+                else:
+                    await self.pending_reports.put(self.incomplete_reports.pop(report_request_id))
             else:
                 logger.debug("The report is not yet complete, will hold until it is.")
                 self.incomplete_reports[report_request_id] = outgoing_report
         else:
             logger.info("Report will be sent now.")
-            await self.pending_reports.put(outgoing_report)
+            if self.allow_jitter:
+                delay = random.uniform(0, min(30, granularity.total_seconds() / 2))
+                self.loop.create_task(utils.delayed_call(func=self.pending_reports.put(outgoing_report),
+                                                         delay=delay))
+            else:
+                await self.pending_reports.put(outgoing_report)
 
     async def cancel_report(self, payload):
         """
@@ -610,7 +627,6 @@ class OpenADRClient:
 
         while True:
             report = await self.pending_reports.get()
-
             service = 'EiReport'
             message = self._create_message('oadrUpdateReport', reports=[report])
 
@@ -770,6 +786,11 @@ class OpenADRClient:
         elif response_type == 'oadrUpdateReport':
             await self._on_report(response_payload)
 
+        elif response_type == 'oadrCreateReport':
+            if 'report_requests' in response_payload:
+                for report_request in response_payload['report_requests']:
+                    await self.create_report(report_request)
+
         else:
             logger.warning(f"No handler implemented for incoming message "
                            f"of type {response_type}, ignoring.")

+ 9 - 10
openleadr/utils.py

@@ -18,7 +18,6 @@ from datetime import datetime, timedelta, timezone
 from dataclasses import is_dataclass, asdict
 from collections import OrderedDict
 from openleadr import enums
-from random import randint
 import asyncio
 import itertools
 import re
@@ -477,27 +476,27 @@ def cron_config(interval, randomize_seconds=False):
     """
     Returns a dict with cron settings for the given interval
     """
-    if randomize_seconds:
-        seconds_offset = min(60, randint(0, interval.total_seconds()))
-    else:
-        seconds_offset = "*"
     if interval < timedelta(minutes=1):
-        second = f"{seconds_offset}/{interval.seconds}"
+        second = f"*/{interval.seconds}"
         minute = "*"
         hour = "*"
     elif interval < timedelta(hours=1):
-        second = f"{seconds_offset}" if randomize_seconds else "0"
+        second = "0"
         minute = f"*/{int(interval.total_seconds()/60)}"
         hour = "*"
     elif interval < timedelta(hours=24):
-        second = f"{seconds_offset}" if randomize_seconds else "0"
+        second = "0"
         minute = "0"
         hour = f"*/{int(interval.total_seconds()/3600)}"
     else:
-        second = f"{seconds_offset}" if randomize_seconds else "0"
+        second = "0"
         minute = "0"
         hour = "0"
-    return {"second": second, "minute": minute, "hour": hour}
+    cron_config = {"second": second, "minute": minute, "hour": hour}
+    if randomize_seconds:
+        jitter = min(int(interval.total_seconds() / 10), 300)
+        cron_config['jitter'] = jitter
+    return cron_config
 
 
 def get_cert_fingerprint_from_request(request):

+ 1 - 1
test/test_certificates.py

@@ -96,4 +96,4 @@ async def test_ssl_certificates_wrong_cert():
 
     await client.stop()
     await server.stop()
-    await asyncio.sleep(0)
+    await asyncio.sleep(0)

+ 4 - 3
test/test_utils.py

@@ -105,7 +105,8 @@ def test_cron_config():
     assert utils.cron_config(timedelta(hours=1)) == {'second': '0', 'minute': '0', 'hour': '*/1'}
     assert utils.cron_config(timedelta(hours=2)) == {'second': '0', 'minute': '0', 'hour': '*/2'}
     assert utils.cron_config(timedelta(hours=25)) == {'second': '0', 'minute': '0', 'hour': '0'}
-
-    cron_config = utils.cron_config(timedelta(seconds=5), randomize_seconds=True)
-    assert int(cron_config['second'][0]) <= 5
+    assert utils.cron_config(timedelta(seconds=10), randomize_seconds=True) == {'second': '*/10',
+                                                                                'minute': '*',
+                                                                                'hour': '*',
+                                                                                'jitter': 1}