test_queues.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. from openleadr import OpenADRClient, OpenADRServer, enable_default_logging, objects, utils
  2. import pytest
  3. import asyncio
  4. import datetime
  5. from functools import partial
  6. enable_default_logging()
  7. def on_create_party_registration(registration_info):
  8. print("Registered party")
  9. return 'ven123', 'reg123'
  10. async def on_event(event):
  11. return 'optIn'
  12. async def on_event_opt_in(event, future):
  13. if future.done() is False:
  14. future.set_result(event)
  15. return 'optIn'
  16. async def on_update_event(event, futures):
  17. for future in futures:
  18. if future.done() is False:
  19. future.set_result(event)
  20. break
  21. return 'optIn'
  22. async def on_event_opt_out(event, futures):
  23. for future in futures:
  24. if future.done() is False:
  25. future.set_result(event)
  26. break
  27. return 'optOut'
  28. async def event_callback(ven_id, event_id, opt_type, future):
  29. if future.done() is False:
  30. future.set_result(opt_type)
  31. @pytest.mark.asyncio
  32. async def test_internal_message_queue():
  33. loop = asyncio.get_event_loop()
  34. client = OpenADRClient(ven_name='myven',
  35. vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  36. client.add_handler('on_event', on_event)
  37. server = OpenADRServer(vtn_id='myvtn', requested_poll_freq=datetime.timedelta(seconds=1))
  38. server.add_handler('on_create_party_registration', on_create_party_registration)
  39. event_callback_future = loop.create_future()
  40. server.add_event(ven_id='ven123',
  41. signal_name='simple',
  42. signal_type='level',
  43. intervals=[{'dtstart': datetime.datetime.now(datetime.timezone.utc),
  44. 'duration': datetime.timedelta(seconds=3),
  45. 'signal_payload': 1}],
  46. callback=partial(event_callback, future=event_callback_future))
  47. await server.run_async()
  48. await asyncio.sleep(1)
  49. await client.run()
  50. await asyncio.sleep(1)
  51. status = await event_callback_future
  52. assert status == 'optIn'
  53. message_type, message_payload = await asyncio.wait_for(client.poll(), 0.5)
  54. assert message_type == 'oadrResponse'
  55. message_type, message_payload = await asyncio.wait_for(client.poll(), 0.5)
  56. assert message_type == 'oadrResponse'
  57. await asyncio.sleep(1) # Wait for the event to be completed
  58. await client.stop()
  59. await server.stop()
  60. @pytest.mark.asyncio
  61. async def test_event_status_opt_in():
  62. loop = asyncio.get_event_loop()
  63. client = OpenADRClient(ven_name='myven',
  64. vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  65. distribute_event_future = loop.create_future()
  66. event_update_futures = [loop.create_future() for i in range(2)]
  67. client.add_handler('on_event', partial(on_event_opt_in, future=distribute_event_future))
  68. client.add_handler('on_update_event', partial(on_update_event, futures=event_update_futures))
  69. server = OpenADRServer(vtn_id='myvtn', requested_poll_freq=datetime.timedelta(seconds=1))
  70. server.add_handler('on_create_party_registration', on_create_party_registration)
  71. event_callback_future = loop.create_future()
  72. event_id = server.add_event(ven_id='ven123',
  73. signal_name='simple',
  74. signal_type='level',
  75. intervals=[{'dtstart': datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(seconds=2),
  76. 'duration': datetime.timedelta(seconds=2),
  77. 'signal_payload': 1}],
  78. callback=partial(event_callback, future=event_callback_future))
  79. assert server.services['event_service'].pending_events[event_id][0].event_descriptor.event_status == 'far'
  80. await server.run_async()
  81. await asyncio.sleep(0.5)
  82. await client.run()
  83. await event_callback_future
  84. print("Waiting for event future 1")
  85. result = await distribute_event_future
  86. assert result['event_descriptor']['event_status'] == 'far'
  87. assert len(client.responded_events) == 1
  88. print("Watiting for event future 2")
  89. result = await event_update_futures[0]
  90. assert result['event_descriptor']['event_status'] == 'active'
  91. assert len(client.responded_events) == 1
  92. print("Watiting for event future 3")
  93. result = await event_update_futures[1]
  94. assert result['event_descriptor']['event_status'] == 'completed'
  95. assert len(client.responded_events) == 0
  96. await client.stop()
  97. await server.stop()
  98. await asyncio.sleep(0)
  99. @pytest.mark.asyncio
  100. async def test_event_status_opt_in_with_ramp_up():
  101. loop = asyncio.get_event_loop()
  102. client = OpenADRClient(ven_name='myven',
  103. vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  104. distribute_event_future = loop.create_future()
  105. event_update_futures = [loop.create_future() for i in range(3)]
  106. client.add_handler('on_event', partial(on_event_opt_in, future=distribute_event_future))
  107. client.add_handler('on_update_event', partial(on_update_event, futures=event_update_futures))
  108. server = OpenADRServer(vtn_id='myvtn', requested_poll_freq=datetime.timedelta(seconds=1))
  109. server.add_handler('on_create_party_registration', on_create_party_registration)
  110. event_callback_future = loop.create_future()
  111. event_id = server.add_event(ven_id='ven123',
  112. signal_name='simple',
  113. signal_type='level',
  114. intervals=[{'dtstart': datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(seconds=4),
  115. 'duration': datetime.timedelta(seconds=2),
  116. 'signal_payload': 1}],
  117. ramp_up_period=datetime.timedelta(seconds=2),
  118. callback=partial(event_callback, future=event_callback_future))
  119. assert server.services['event_service'].pending_events[event_id][0].event_descriptor.event_status == 'far'
  120. await server.run_async()
  121. await asyncio.sleep(0.5)
  122. await client.run()
  123. await event_callback_future
  124. print("Waiting for event future 1")
  125. result = await distribute_event_future
  126. assert result['event_descriptor']['event_status'] == 'far'
  127. print("Watiting for event future 2")
  128. result = await event_update_futures[0]
  129. assert result['event_descriptor']['event_status'] == 'near'
  130. print("Watiting for event future 3")
  131. result = await event_update_futures[1]
  132. assert result['event_descriptor']['event_status'] == 'active'
  133. print("Watiting for event future 4")
  134. result = await event_update_futures[2]
  135. assert result['event_descriptor']['event_status'] == 'completed'
  136. await asyncio.sleep(0.5)
  137. await client.stop()
  138. await server.stop()
  139. await asyncio.sleep(1)
  140. @pytest.mark.asyncio
  141. async def test_request_event():
  142. loop = asyncio.get_event_loop()
  143. client = OpenADRClient(ven_name='myven',
  144. vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  145. server = OpenADRServer(vtn_id='myvtn', requested_poll_freq=datetime.timedelta(seconds=1))
  146. server.add_handler('on_create_party_registration', on_create_party_registration)
  147. event_id = server.add_event(ven_id='ven123',
  148. signal_name='simple',
  149. signal_type='level',
  150. intervals=[{'dtstart': datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(seconds=4),
  151. 'duration': datetime.timedelta(seconds=2),
  152. 'signal_payload': 1}],
  153. ramp_up_period=datetime.timedelta(seconds=2),
  154. callback=partial(event_callback))
  155. assert server.services['event_service'].pending_events[event_id][0].event_descriptor.event_status == 'far'
  156. await server.run_async()
  157. await client.create_party_registration()
  158. message_type, message_payload = await client.request_event()
  159. assert message_type == 'oadrDistributeEvent'
  160. message_type, message_payload = await client.request_event()
  161. assert message_type == 'oadrResponse'
  162. await client.stop()
  163. await server.stop()
  164. @pytest.mark.asyncio
  165. async def test_raw_event():
  166. now = datetime.datetime.now(datetime.timezone.utc)
  167. server = OpenADRServer(vtn_id='myvtn')
  168. server.add_handler('on_create_party_registration', on_create_party_registration)
  169. event = objects.Event(event_descriptor=objects.EventDescriptor(event_id='event001',
  170. modification_number=0,
  171. event_status='far',
  172. market_context='http://marketcontext01'),
  173. event_signals=[objects.EventSignal(signal_id='signal001',
  174. signal_type='level',
  175. signal_name='simple',
  176. intervals=[objects.Interval(dtstart=now,
  177. duration=datetime.timedelta(minutes=10),
  178. signal_payload=1)]),
  179. objects.EventSignal(signal_id='signal002',
  180. signal_type='price',
  181. signal_name='ELECTRICITY_PRICE',
  182. intervals=[objects.Interval(dtstart=now,
  183. duration=datetime.timedelta(minutes=10),
  184. signal_payload=1)])],
  185. targets=[objects.Target(ven_id='ven123')])
  186. loop = asyncio.get_event_loop()
  187. event_callback_future = loop.create_future()
  188. server.add_raw_event(ven_id='ven123', event=event, callback=partial(event_callback, future=event_callback_future))
  189. on_event_future = loop.create_future()
  190. client = OpenADRClient(ven_name='myven',
  191. vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  192. client.add_handler('on_event', partial(on_event_opt_in, future=on_event_future))
  193. await server.run_async()
  194. await client.run()
  195. event = await on_event_future
  196. assert len(event['event_signals']) == 2
  197. result = await event_callback_future
  198. assert result == 'optIn'
  199. await client.stop()
  200. await server.stop()
  201. @pytest.mark.asyncio
  202. async def test_create_event_with_future_as_callback():
  203. now = datetime.datetime.now(datetime.timezone.utc)
  204. server = OpenADRServer(vtn_id='myvtn')
  205. server.add_handler('on_create_party_registration', on_create_party_registration)
  206. event = objects.Event(event_descriptor=objects.EventDescriptor(event_id='event001',
  207. modification_number=0,
  208. event_status='far',
  209. market_context='http://marketcontext01'),
  210. event_signals=[objects.EventSignal(signal_id='signal001',
  211. signal_type='level',
  212. signal_name='simple',
  213. intervals=[objects.Interval(dtstart=now,
  214. duration=datetime.timedelta(minutes=10),
  215. signal_payload=1)]),
  216. objects.EventSignal(signal_id='signal002',
  217. signal_type='price',
  218. signal_name='ELECTRICITY_PRICE',
  219. intervals=[objects.Interval(dtstart=now,
  220. duration=datetime.timedelta(minutes=10),
  221. signal_payload=1)])],
  222. targets=[objects.Target(ven_id='ven123')])
  223. loop = asyncio.get_event_loop()
  224. event_callback_future = loop.create_future()
  225. server.add_raw_event(ven_id='ven123', event=event, callback=event_callback_future)
  226. on_event_future = loop.create_future()
  227. client = OpenADRClient(ven_name='myven',
  228. vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  229. client.add_handler('on_event', partial(on_event_opt_in, future=on_event_future))
  230. await server.run_async()
  231. await client.run()
  232. event = await on_event_future
  233. assert len(event['event_signals']) == 2
  234. result = await event_callback_future
  235. assert result == 'optIn'
  236. await client.stop()
  237. await server.stop()