test_queues.py 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. from openleadr import OpenADRClient, OpenADRServer, enable_default_logging
  2. import pytest
  3. import asyncio
  4. import datetime
  5. from functools import partial
  6. enable_default_logging()
  7. def on_create_party_registration(registration_info):
  8. print("Registered party")
  9. return 'ven123', 'reg123'
  10. async def on_event(event):
  11. return 'optIn'
  12. async def event_callback(ven_id, event_id, opt_type, future):
  13. future.set_result(opt_type)
  14. @pytest.mark.asyncio
  15. async def test_internal_message_queue():
  16. loop = asyncio.get_event_loop()
  17. client = OpenADRClient(ven_name='myven',
  18. vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  19. client.add_handler('on_event', on_event)
  20. server = OpenADRServer(vtn_id='myvtn', requested_poll_freq=datetime.timedelta(seconds=1))
  21. server.add_handler('on_create_party_registration', on_create_party_registration)
  22. event_callback_future = loop.create_future()
  23. server.add_event(ven_id='ven123',
  24. signal_name='simple',
  25. signal_type='level',
  26. intervals=[{'dtstart': datetime.datetime.now(),
  27. 'duration': datetime.timedelta(minutes=5),
  28. 'signal_payload': 1}],
  29. callback=partial(event_callback, future=event_callback_future))
  30. await server.run_async()
  31. await asyncio.sleep(1)
  32. await client.run()
  33. await asyncio.sleep(1)
  34. status = await event_callback_future
  35. assert status == 'optIn'
  36. message_type, message_payload = await asyncio.wait_for(client.poll(), 0.5)
  37. assert message_type == 'oadrResponse'
  38. message_type, message_payload = await asyncio.wait_for(client.poll(), 0.5)
  39. assert message_type == 'oadrResponse'
  40. await client.stop()
  41. await server.stop()