Browse Source

Add support for using a Future instead of a function or coroutine as event status callback

This makes it much easier to send and Event to a client and wait for the response in-line, instead of passing the event ID around in your own application and having a separate callback listen for the status.

Signed-off-by: Stan Janssen <stan.janssen@elaad.nl>
Stan Janssen 4 years ago
parent
commit
cce5acf34b
3 changed files with 184 additions and 23 deletions
  1. 136 20
      docs/server.rst
  2. 6 3
      openleadr/service/event_service.py
  3. 42 0
      test/test_queues.py

+ 136 - 20
docs/server.rst

@@ -6,6 +6,72 @@ Server
 
 
 If you are implementing an OpenADR Server ("Virtual Top Node") using OpenLEADR, read this page.
 If you are implementing an OpenADR Server ("Virtual Top Node") using OpenLEADR, read this page.
 
 
+.. _server_example:
+
+1-minute VTN example
+====================
+
+Here's an example of a server that accepts registrations from a VEN named
+'ven_123', requests all reports that it offers, and creates an Event for this
+VEN.
+
+.. code-block:: python3
+
+    import asyncio
+    from datetime import datetime, timezone, timedelta
+    from openleadr import OpenADRServer
+
+    async def on_create_party_registration(registration_info):
+        """
+        Inspect the registration info and return a ven_id and registration_id.
+        """
+        if registration_info['ven_name'] == 'ven123':
+            ven_id = 'ven_id_123'
+            registration_id = 'reg_id_123'
+            return ven_id, registration_id
+        else:
+            return False
+
+    async def on_register_report(ven_id, resource_id, measurement, unit, scale,
+                                 min_sampling_interval, max_sampling_interval):
+        """
+        Inspect a report offering from the VEN and return a callback and sampling interval for receiving the reports.
+        """
+        callback = partial(on_update_report, ven_id=ven_id, resource_id=resource_id, measurement=measurement)
+        sampling_interval = min_sampling_interval
+        return callback, sampling_interval
+
+    async def on_update_report(data, ven_id, resource_id, measurement):
+        """
+        Callback that receives report data from the VEN and handles it.
+        """
+        for time, value in data:
+            print(f"Ven {ven_id} reported {measurement} = {value} at time {time} for resource {resource_id}")
+
+    # Create the server object
+    server = OpenADRServer(vtn_id='myvtn')
+
+    # Add the handler for client (VEN) registrations
+    server.add_handler('on_create_party_registration', on_create_party_registration)
+
+    # Add the handler for report registrations from the VEN
+    server.add_handler('on_register_report', on_register_report)
+
+    # Add a prepared event for a VEN that will be picked up when it polls for new messages.
+    server.add_event(ven_id='ven_id_123',
+                     event_name='simple',
+                     event_type='level',
+                     intervals=[{'dtstart': datetime(2021, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
+                                 'duration': timedelta(minutes=10),
+                                 'signal_payload': 1}])
+
+    # Run the server on the asyncio event loop
+    loop = asyncio.get_event_loop()
+    loop.create_task(server.run())
+    loop.run_forever()
+
+Read on for more details!
+
 .. _server_registration:
 .. _server_registration:
 
 
 Registration
 Registration
@@ -76,16 +142,15 @@ The VEN can decide whether to opt in or opt out of the event. To be notified of
         print(f"VEN {ven_id} responded {opt_status} to event {event_id}")
         print(f"VEN {ven_id} responded {opt_status} to event {event_id}")
 
 
     server = OpenADRServer(vtn_id='myvtn')
     server = OpenADRServer(vtn_id='myvtn')
-    server.add_event(ven_id='ven123',
-                     event_id='event123',
-                     signal_name='simple',
-                     signal_type='level',
-                     intervals=[{'dtstart': datetime(2020. 1, 1, 12, 0, 0, tzinfo=timezone.utc),
-                                 'signal_payload': 1},
-                                 {'dtstart': datetime(2020. 1, 1, 12, 15, 0, tzinfo=timezone.utc),
-                                 'signal_payload': 0}],
-                     target=[{'resource_id': 'Device001'}],
-                     callback=partial(event_callback, ven_id='ven123', event_id='event123'))
+    event_id = server.add_event(ven_id='ven123',
+                                signal_name='simple',
+                                signal_type='level',
+                                intervals=[{'dtstart': datetime(2020. 1, 1, 12, 0, 0, tzinfo=timezone.utc),
+                                            'signal_payload': 1},
+                                            {'dtstart': datetime(2020. 1, 1, 12, 15, 0, tzinfo=timezone.utc),
+                                            'signal_payload': 0}],
+                                target=[{'resource_id': 'Device001'}],
+                                callback=event_callback)
 
 
 
 
 Alternatively, you can use the handy constructors in ``openleadr.objects`` to format parts of the event:
 Alternatively, you can use the handy constructors in ``openleadr.objects`` to format parts of the event:
@@ -97,17 +162,68 @@ Alternatively, you can use the handy constructors in ``openleadr.objects`` to fo
     from datetime import datetime, timezone
     from datetime import datetime, timezone
     from functools import partial
     from functools import partial
 
 
+    async def event_callback(ven_id, event_id, opt_status):
+        print(f"VEN {ven_id} responded {opt_status} to event {event_id}")
+
     server = OpenADRServer(vtn_id='myvtn')
     server = OpenADRServer(vtn_id='myvtn')
-    server.add_event(ven_id='ven123',
-                     event_id='event123',
-                     signal_name='simple',
-                     signal_type='level',
-                     intervals=[Interval(dtstart=datetime(2020, 1, 1, 12, 15, 0, tzinfo=timezone.utc),
-                                         signal_payload=0),
-                                Interval(dtstart=datetime(2020, 1, 1, 12, 15, 0, tzinfo=timezone.utc),
-                                         signal_payload=1)]
-                     target=[Target(resource_id='Device001')],
-                     callback=partial(event_callback, ven_id='ven123', event_id='event123'))
+    event_id = server.add_event(ven_id='ven123',
+                                signal_name='simple',
+                                signal_type='level',
+                                intervals=[Interval(dtstart=datetime(2020, 1, 1, 12, 15, 0, tzinfo=timezone.utc),
+                                                    signal_payload=0),
+                                           Interval(dtstart=datetime(2020, 1, 1, 12, 15, 0, tzinfo=timezone.utc),
+                                                    signal_payload=1)]
+                                target=[Target(resource_id='Device001')],
+                                callback=event_callback)
+
+If you want to add a "raw" event directly, you can use this example as a guid:
+
+.. code-block:: python3
+
+    from openleadr import OpenADRServer
+    from openleadr.objects import Event, EventDescriptor, EventSignal, Target, Interval
+    from datetime import datetime, timezone
+    from functools import partial
+
+    async def event_callback(ven_id, event_id, opt_status):
+        print(f"VEN {ven_id} responded {opt_status} to event {event_id}")
+
+    server = OpenADRServer(vtn_id='myvtn')
+    event = Event(event_descriptor=EventDescriptor(event_id='event001',
+                                                   modification_number=0,
+                                                   event_status='far',
+                                                   market_context='http://marketcontext01'),
+                  event_signals=[EventSignal(signal_id='signal001',
+                                             signal_type='level',
+                                             signal_name='simple',
+                                             intervals=[Interval(dtstart=now,
+                                                                 duration=datetime.timedelta(minutes=10),
+                                                                 signal_payload=1)]),
+                                 EventSignal(signal_id='signal002',
+                                             signal_type='price',
+                                             signal_name='ELECTRICITY_PRICE',
+                                             intervals=[Interval(dtstart=now,
+                                                                 duration=datetime.timedelta(minutes=10),
+                                                                 signal_payload=1)])],
+                  targets=[objects.Target(ven_id='ven123')])
+
+    server.add_raw_event(ven_id='ven123', event=event, callback=event_callback)
+
+If you want to add an event and wait for the response in a single coroutine, you can pass an asyncio Future instead of a function or coroutine as the callback argument:
+
+.. code-block:: python3
+
+    import asyncio
+
+    ...
+
+    async def generate_event():
+        loop = asyncio.get_event_loop()
+        opt_status_future = loop.create_future()
+        server.add_event(..., callback=opt_status_future)
+        opt_status = await opt_status_future
+        print(f"The opt status for this event is {opt_status}")
+
 
 
 A word on event targets
 A word on event targets
 ~~~~~~~~~~~~~~~~~~~~~~~
 ~~~~~~~~~~~~~~~~~~~~~~~

+ 6 - 3
openleadr/service/event_service.py

@@ -83,9 +83,12 @@ class EventService(VTNService):
                 opt_type = event_response['opt_type']
                 opt_type = event_response['opt_type']
                 if event_response['event_id'] in self.pending_events:
                 if event_response['event_id'] in self.pending_events:
                     event, callback = self.pending_events.pop(event_id)
                     event, callback = self.pending_events.pop(event_id)
-                    result = callback(ven_id=ven_id, event_id=event_id, opt_type=opt_type)
-                    if asyncio.iscoroutine(result):
-                        result = await result
+                    if asyncio.isfuture(callback):
+                        callback.set_result(opt_type)
+                    else:
+                        result = callback(ven_id=ven_id, event_id=event_id, opt_type=opt_type)
+                        if asyncio.iscoroutine(result):
+                            result = await result
                     if opt_type == 'optIn':
                     if opt_type == 'optIn':
                         self.running_events[event_id] = (event, callback)
                         self.running_events[event_id] = (event, callback)
                         now = datetime.now(timezone.utc)
                         now = datetime.now(timezone.utc)

+ 42 - 0
test/test_queues.py

@@ -241,3 +241,45 @@ async def test_raw_event():
     await client.stop()
     await client.stop()
     await server.stop()
     await server.stop()
 
 
+
+@pytest.mark.asyncio
+async def test_create_event_with_future_as_callback():
+    now = datetime.datetime.now(datetime.timezone.utc)
+    server = OpenADRServer(vtn_id='myvtn')
+    server.add_handler('on_create_party_registration', on_create_party_registration)
+    event = objects.Event(event_descriptor=objects.EventDescriptor(event_id='event001',
+                                                                   modification_number=0,
+                                                                   event_status='far',
+                                                                   market_context='http://marketcontext01'),
+                          event_signals=[objects.EventSignal(signal_id='signal001',
+                                                             signal_type='level',
+                                                             signal_name='simple',
+                                                             intervals=[objects.Interval(dtstart=now,
+                                                                                         duration=datetime.timedelta(minutes=10),
+                                                                                         signal_payload=1)]),
+                                        objects.EventSignal(signal_id='signal002',
+                                                            signal_type='price',
+                                                            signal_name='ELECTRICITY_PRICE',
+                                                            intervals=[objects.Interval(dtstart=now,
+                                                                                        duration=datetime.timedelta(minutes=10),
+                                                                                        signal_payload=1)])],
+                          targets=[objects.Target(ven_id='ven123')])
+    loop = asyncio.get_event_loop()
+    event_callback_future = loop.create_future()
+    server.add_raw_event(ven_id='ven123', event=event, callback=event_callback_future)
+
+    on_event_future = loop.create_future()
+    client = OpenADRClient(ven_name='myven',
+                           vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
+    client.add_handler('on_event', partial(on_event_opt_in, future=on_event_future))
+
+    await server.run_async()
+    await client.run()
+    event = await on_event_future
+    assert len(event['event_signals']) == 2
+
+    result = await event_callback_future
+    assert result == 'optIn'
+
+    await client.stop()
+    await server.stop()