test_queues.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. from openleadr import OpenADRClient, OpenADRServer, enable_default_logging
  2. import pytest
  3. import asyncio
  4. import datetime
  5. from functools import partial
  6. enable_default_logging()
  7. def on_create_party_registration(registration_info):
  8. print("Registered party")
  9. return 'ven123', 'reg123'
  10. async def on_event(event):
  11. return 'optIn'
  12. async def on_event_opt_in(event, future):
  13. if future.done() is False:
  14. future.set_result(event)
  15. return 'optIn'
  16. async def on_update_event(event, futures):
  17. for future in futures:
  18. if future.done() is False:
  19. future.set_result(event)
  20. break
  21. return 'optIn'
  22. async def on_event_opt_out(event, futures):
  23. for future in futures:
  24. if future.done() is False:
  25. future.set_result(event)
  26. break
  27. return 'optOut'
  28. async def event_callback(ven_id, event_id, opt_type, future):
  29. if future.done() is False:
  30. future.set_result(opt_type)
  31. @pytest.mark.asyncio
  32. async def test_internal_message_queue():
  33. loop = asyncio.get_event_loop()
  34. client = OpenADRClient(ven_name='myven',
  35. vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  36. client.add_handler('on_event', on_event)
  37. server = OpenADRServer(vtn_id='myvtn', requested_poll_freq=datetime.timedelta(seconds=1))
  38. server.add_handler('on_create_party_registration', on_create_party_registration)
  39. event_callback_future = loop.create_future()
  40. server.add_event(ven_id='ven123',
  41. signal_name='simple',
  42. signal_type='level',
  43. intervals=[{'dtstart': datetime.datetime.now(datetime.timezone.utc),
  44. 'duration': datetime.timedelta(seconds=3),
  45. 'signal_payload': 1}],
  46. callback=partial(event_callback, future=event_callback_future))
  47. await server.run_async()
  48. await asyncio.sleep(1)
  49. await client.run()
  50. await asyncio.sleep(1)
  51. status = await event_callback_future
  52. assert status == 'optIn'
  53. message_type, message_payload = await asyncio.wait_for(client.poll(), 0.5)
  54. assert message_type == 'oadrResponse'
  55. message_type, message_payload = await asyncio.wait_for(client.poll(), 0.5)
  56. assert message_type == 'oadrResponse'
  57. await asyncio.sleep(1) # Wait for the event to be completed
  58. await client.stop()
  59. await server.stop()
  60. @pytest.mark.asyncio
  61. async def test_event_status_opt_in():
  62. loop = asyncio.get_event_loop()
  63. client = OpenADRClient(ven_name='myven',
  64. vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  65. distribute_event_future = loop.create_future()
  66. event_update_futures = [loop.create_future() for i in range(2)]
  67. client.add_handler('on_event', partial(on_event_opt_in, future=distribute_event_future))
  68. client.add_handler('on_update_event', partial(on_update_event, futures=event_update_futures))
  69. server = OpenADRServer(vtn_id='myvtn', requested_poll_freq=datetime.timedelta(seconds=1))
  70. server.add_handler('on_create_party_registration', on_create_party_registration)
  71. event_callback_future = loop.create_future()
  72. event_id = server.add_event(ven_id='ven123',
  73. signal_name='simple',
  74. signal_type='level',
  75. intervals=[{'dtstart': datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(seconds=2),
  76. 'duration': datetime.timedelta(seconds=2),
  77. 'signal_payload': 1}],
  78. callback=partial(event_callback, future=event_callback_future))
  79. assert server.services['event_service'].pending_events[event_id][0].event_descriptor.event_status == 'far'
  80. await server.run_async()
  81. await asyncio.sleep(0.5)
  82. await client.run()
  83. await event_callback_future
  84. print("Waiting for event future 1")
  85. result = await distribute_event_future
  86. assert result['event_descriptor']['event_status'] == 'far'
  87. assert len(client.responded_events) == 1
  88. print("Watiting for event future 2")
  89. result = await event_update_futures[0]
  90. assert result['event_descriptor']['event_status'] == 'active'
  91. assert len(client.responded_events) == 1
  92. print("Watiting for event future 3")
  93. result = await event_update_futures[1]
  94. assert result['event_descriptor']['event_status'] == 'completed'
  95. assert len(client.responded_events) == 0
  96. await client.stop()
  97. await server.stop()
  98. await asyncio.sleep(0)
  99. @pytest.mark.asyncio
  100. async def test_event_status_opt_in_with_ramp_up():
  101. loop = asyncio.get_event_loop()
  102. client = OpenADRClient(ven_name='myven',
  103. vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  104. distribute_event_future = loop.create_future()
  105. event_update_futures = [loop.create_future() for i in range(3)]
  106. client.add_handler('on_event', partial(on_event_opt_in, future=distribute_event_future))
  107. client.add_handler('on_update_event', partial(on_update_event, futures=event_update_futures))
  108. server = OpenADRServer(vtn_id='myvtn', requested_poll_freq=datetime.timedelta(seconds=1))
  109. server.add_handler('on_create_party_registration', on_create_party_registration)
  110. event_callback_future = loop.create_future()
  111. event_id = server.add_event(ven_id='ven123',
  112. signal_name='simple',
  113. signal_type='level',
  114. intervals=[{'dtstart': datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(seconds=4),
  115. 'duration': datetime.timedelta(seconds=2),
  116. 'signal_payload': 1}],
  117. ramp_up_period=datetime.timedelta(seconds=2),
  118. callback=partial(event_callback, future=event_callback_future))
  119. assert server.services['event_service'].pending_events[event_id][0].event_descriptor.event_status == 'far'
  120. await server.run_async()
  121. await asyncio.sleep(0.5)
  122. await client.run()
  123. await event_callback_future
  124. print("Waiting for event future 1")
  125. result = await distribute_event_future
  126. assert result['event_descriptor']['event_status'] == 'far'
  127. print("Watiting for event future 2")
  128. result = await event_update_futures[0]
  129. assert result['event_descriptor']['event_status'] == 'near'
  130. print("Watiting for event future 3")
  131. result = await event_update_futures[1]
  132. assert result['event_descriptor']['event_status'] == 'active'
  133. print("Watiting for event future 4")
  134. result = await event_update_futures[2]
  135. assert result['event_descriptor']['event_status'] == 'completed'
  136. await asyncio.sleep(0.5)
  137. await client.stop()
  138. await server.stop()
  139. await asyncio.sleep(1)