Pārlūkot izejas kodu

Add support for events that don't require a response

Signed-off-by: Stan Janssen <stan.janssen@elaad.nl>
Stan Janssen 4 gadi atpakaļ
vecāks
revīzija
b4b0e388d7
3 mainītis faili ar 41 papildinājumiem un 28 dzēšanām
  1. 1 0
      openleadr/objects.py
  2. 13 7
      openleadr/server.py
  3. 27 21
      openleadr/service/event_service.py

+ 1 - 0
openleadr/objects.py

@@ -196,6 +196,7 @@ class Event:
     targets: List[Target] = None
     targets_by_type: Dict = None
     active_period: ActivePeriod = None
+    response_required: str = 'always'
 
     def __post_init__(self):
         if self.active_period is None:

+ 13 - 7
openleadr/server.py

@@ -222,14 +222,12 @@ class OpenADRServer:
         event = objects.Event(active_period=active_period,
                               event_descriptor=event_descriptor,
                               event_signals=[event_signal],
-                              targets=targets)
-        if ven_id not in self.message_queues:
-            self.message_queues[ven_id] = deque()
-        self.message_queues[ven_id].append(event)
-        self.services['event_service'].pending_events[event_id] = (event, callback)
+                              targets=targets,
+                              response_required=response_required)
+        self.add_raw_event(ven_id=ven_id, event=event, callback=callback)
         return event_id
 
-    def add_raw_event(self, ven_id, event, callback):
+    def add_raw_event(self, ven_id, event, callback=None):
         """
         Add a new event to the queue for a specific VEN.
         :param str ven_id: The ven_id to which this event should be distributed.
@@ -238,11 +236,19 @@ class OpenADRServer:
         :param callable callback: A callback that will receive the opt status for this event.
                                   This callback receives ven_id, event_id, opt_type as its arguments.
         """
+        if utils.getmember(event, 'response_required') == 'always':
+            if callback is None:
+                logger.warning("You did not provide a 'callback', which means you won't know if the "
+                               "VEN will opt in or opt out of your event. You should consider adding "
+                               "a callback for this.")
         if ven_id not in self.message_queues:
             self.message_queues[ven_id] = deque()
         event_id = utils.getmember(utils.getmember(event, 'event_descriptor'), 'event_id')
         self.message_queues[ven_id].append(event)
-        self.services['event_service'].pending_events[event_id] = (event, callback)
+        if callback is not None:
+            self.services['event_service'].pending_events[event_id] = (event, callback)
+        if utils.getmember(event, 'response_required') == 'never':
+            self.services['event_service'].schedule_event_updates(ven_id, event)
         return event_id
 
     def add_handler(self, name, func):

+ 27 - 21
openleadr/service/event_service.py

@@ -75,7 +75,6 @@ class EventService(VTNService):
         """
         The VEN informs us that they created an EiEvent.
         """
-        loop = asyncio.get_event_loop()
         ven_id = payload['ven_id']
         if self.polling_method == 'internal':
             for event_response in payload['event_responses']:
@@ -91,26 +90,7 @@ class EventService(VTNService):
                             result = await result
                     if opt_type == 'optIn':
                         self.running_events[event_id] = (event, callback)
-                        now = datetime.now(timezone.utc)
-                        active_period = event.active_period
-                        # Schedule status update to 'near' if applicable
-                        if active_period.ramp_up_period is not None and event.event_descriptor.event_status == 'far':
-                            ramp_up_start_delay = (active_period.dtstart - active_period.ramp_up_period) - now
-                            update_coro = partial(self._update_event_status, ven_id, event, 'near')
-                            loop.create_task(utils.delayed_call(func=update_coro, delay=ramp_up_start_delay),
-                                             name=f'DelayedCall-{utils.generate_id()}')
-                        # Schedule status update to 'active'
-                        if event.event_descriptor.event_status in ('near', 'far'):
-                            active_start_delay = active_period.dtstart - now
-                            update_coro = partial(self._update_event_status, ven_id, event, 'active')
-                            loop.create_task(utils.delayed_call(func=update_coro, delay=active_start_delay),
-                                             name=f'DelayedCall-{utils.generate_id()}')
-                        # Schedule status update to 'completed'
-                        if event.event_descriptor.event_status in ('near', 'far', 'active'):
-                            active_end_delay = active_period.dtstart + active_period.duration - now
-                            update_coro = partial(self._update_event_status, ven_id, event, 'completed')
-                            loop.create_task(utils.delayed_call(func=update_coro, delay=active_end_delay),
-                                             name=f'DelayedCall-{utils.generate_id()}')
+                        self.schedule_event_updates(ven_id, event)
                 elif event_response['event_id'] in self.running_events:
                     event, callback = self.running_events.pop(event_id)
                     result = callback(ven_id=ven_id, event_id=event_id, opt_type=opt_type)
@@ -140,3 +120,29 @@ class EventService(VTNService):
         if event_status == enums.EVENT_STATUS.CANCELLED:
             event.event_descriptor.modification_number += 1
         self.message_queues[ven_id].append(event)
+
+    def schedule_event_updates(self, ven_id, event):
+        """
+        Schedules the event updates.
+        """
+        loop = asyncio.get_event_loop()
+        now = datetime.now(timezone.utc)
+        active_period = event.active_period
+        # Schedule status update to 'near' if applicable
+        if active_period.ramp_up_period is not None and event.event_descriptor.event_status == 'far':
+            ramp_up_start_delay = (active_period.dtstart - active_period.ramp_up_period) - now
+            update_coro = partial(self._update_event_status, ven_id, event, 'near')
+            loop.create_task(utils.delayed_call(func=update_coro, delay=ramp_up_start_delay),
+                             name=f'DelayedCall-EventStatusToNear-{event.event_descriptor.event_id}')
+        # Schedule status update to 'active'
+        if event.event_descriptor.event_status in ('near', 'far'):
+            active_start_delay = active_period.dtstart - now
+            update_coro = partial(self._update_event_status, ven_id, event, 'active')
+            loop.create_task(utils.delayed_call(func=update_coro, delay=active_start_delay),
+                             name=f'DelayedCall-EventStatusToActive-{event.event_descriptor.event_id}')
+        # Schedule status update to 'completed'
+        if event.event_descriptor.event_status in ('near', 'far', 'active'):
+            active_end_delay = active_period.dtstart + active_period.duration - now
+            update_coro = partial(self._update_event_status, ven_id, event, 'completed')
+            loop.create_task(utils.delayed_call(func=update_coro, delay=active_end_delay),
+                             name=f'DelayedCall-EventStatusToCompleted-{event.event_descriptor.event_id}')