Procházet zdrojové kódy

Return response if no message is available

This solves a bug in the internal message queue implementation, which would hold on to the request as long as there was no queued message available when using the internal message queue.

Signed-off-by: Stan Janssen <stan.janssen@elaad.nl>
Stan Janssen před 4 roky
rodič
revize
a1fec1b824
2 změnil soubory, kde provedl 57 přidání a 4 odebrání
  1. 7 4
      openleadr/service/poll_service.py
  2. 50 0
      test/test_queues.py

+ 7 - 4
openleadr/service/poll_service.py

@@ -16,7 +16,7 @@
 
 from openleadr.service import service, handler, VTNService
 from openleadr import objects
-from asyncio import iscoroutine
+import asyncio
 from dataclasses import asdict
 import logging
 logger = logging.getLogger('openleadr')
@@ -116,13 +116,16 @@ class PollService(VTNService):
         if self.polling_method == 'external':
             result = self.on_poll(ven_id=payload['ven_id'])
         elif payload['ven_id'] in self.message_queues:
-            result = await self.message_queues[payload['ven_id']].get()
+            try:
+                result = self.message_queues[payload['ven_id']].get_nowait()
+            except asyncio.QueueEmpty:
+                return 'oadrResponse', {}
         else:
             return 'oadrResponse', {}
-        if iscoroutine(result):
+        if asyncio.iscoroutine(result):
             result = await result
         if result is None:
-            return result
+            return 'oadrResponse', {}
         if isinstance(result, tuple):
             return result
         if isinstance(result, list):

+ 50 - 0
test/test_queues.py

@@ -0,0 +1,50 @@
+from openleadr import OpenADRClient, OpenADRServer, enable_default_logging
+import pytest
+import asyncio
+import datetime
+from functools import partial
+
+enable_default_logging()
+
+def on_create_party_registration(registration_info):
+    print("Registered party")
+    return 'ven123', 'reg123'
+
+async def on_event(event):
+    return 'optIn'
+
+async def event_callback(ven_id, event_id, opt_type, future):
+    future.set_result(opt_type)
+
+@pytest.mark.asyncio
+async def test_internal_message_queue():
+    loop = asyncio.get_event_loop()
+    client = OpenADRClient(ven_name='myven',
+                           vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
+    client.add_handler('on_event', on_event)
+    server = OpenADRServer(vtn_id='myvtn', requested_poll_freq=datetime.timedelta(seconds=1))
+    server.add_handler('on_create_party_registration', on_create_party_registration)
+    event_callback_future = loop.create_future()
+    server.add_event(ven_id='ven123',
+                     signal_name='simple',
+                     signal_type='level',
+                     intervals=[{'dtstart': datetime.datetime.now(),
+                                 'duration': datetime.timedelta(minutes=5),
+                                 'signal_payload': 1}],
+                     callback=partial(event_callback, future=event_callback_future))
+
+    await server.run_async()
+    await asyncio.sleep(1)
+    await client.run()
+    await asyncio.sleep(1)
+    status = await event_callback_future
+    assert status == 'optIn'
+
+    message_type, message_payload = await asyncio.wait_for(client.poll(), 0.5)
+    assert message_type == 'oadrResponse'
+
+    message_type, message_payload = await asyncio.wait_for(client.poll(), 0.5)
+    assert message_type == 'oadrResponse'
+
+    await client.stop()
+    await server.stop()