test_event_distribution.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. from openleadr import OpenADRClient, OpenADRServer, enable_default_logging, objects, utils
  2. import pytest
  3. import asyncio
  4. import datetime
  5. from functools import partial
  6. enable_default_logging()
  7. def on_create_party_registration(registration_info):
  8. print("Registered party")
  9. return 'ven123', 'reg123'
  10. async def on_event(event):
  11. return 'optIn'
  12. async def on_event_opt_in(event, future=None):
  13. if future and future.done() is False:
  14. future.set_result(event)
  15. return 'optIn'
  16. async def on_update_event(event, futures):
  17. for future in futures:
  18. if future.done() is False:
  19. future.set_result(event)
  20. break
  21. return 'optIn'
  22. async def on_event_opt_out(event, futures):
  23. for future in futures:
  24. if future.done() is False:
  25. future.set_result(event)
  26. break
  27. return 'optOut'
  28. async def event_callback(ven_id, event_id, opt_type, future):
  29. if future.done() is False:
  30. future.set_result(opt_type)
  31. @pytest.mark.asyncio
  32. async def test_internal_message_queue():
  33. loop = asyncio.get_event_loop()
  34. client = OpenADRClient(ven_name='myven',
  35. vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  36. client.add_handler('on_event', on_event)
  37. server = OpenADRServer(vtn_id='myvtn', requested_poll_freq=datetime.timedelta(seconds=1))
  38. server.add_handler('on_create_party_registration', on_create_party_registration)
  39. event_callback_future = loop.create_future()
  40. server.add_event(ven_id='ven123',
  41. signal_name='simple',
  42. signal_type='level',
  43. intervals=[{'dtstart': datetime.datetime.now(datetime.timezone.utc),
  44. 'duration': datetime.timedelta(seconds=3),
  45. 'signal_payload': 1}],
  46. callback=partial(event_callback, future=event_callback_future))
  47. await server.run_async()
  48. #await asyncio.sleep(1)
  49. await client.run()
  50. #await asyncio.sleep(1)
  51. status = await event_callback_future
  52. assert status == 'optIn'
  53. message_type, message_payload = await asyncio.wait_for(client.poll(), 0.5)
  54. assert message_type == 'oadrResponse'
  55. message_type, message_payload = await asyncio.wait_for(client.poll(), 0.5)
  56. assert message_type == 'oadrResponse'
  57. #await asyncio.sleep(1) # Wait for the event to be completed
  58. await client.stop()
  59. await server.stop()
  60. @pytest.mark.asyncio
  61. async def test_request_event():
  62. loop = asyncio.get_event_loop()
  63. client = OpenADRClient(ven_name='myven',
  64. vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  65. server = OpenADRServer(vtn_id='myvtn', requested_poll_freq=datetime.timedelta(seconds=1))
  66. server.add_handler('on_create_party_registration', on_create_party_registration)
  67. event_id = server.add_event(ven_id='ven123',
  68. signal_name='simple',
  69. signal_type='level',
  70. intervals=[{'dtstart': datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(seconds=4),
  71. 'duration': datetime.timedelta(seconds=2),
  72. 'signal_payload': 1}],
  73. ramp_up_period=datetime.timedelta(seconds=2),
  74. callback=partial(event_callback))
  75. assert server.events['ven123'][0].event_descriptor.event_status == 'far'
  76. await server.run_async()
  77. await client.create_party_registration()
  78. message_type, message_payload = await client.request_event()
  79. assert message_type == 'oadrDistributeEvent'
  80. message_type, message_payload = await client.request_event()
  81. assert message_type == 'oadrDistributeEvent'
  82. await client.stop()
  83. await server.stop()
  84. @pytest.mark.asyncio
  85. async def test_raw_event():
  86. now = datetime.datetime.now(datetime.timezone.utc)
  87. server = OpenADRServer(vtn_id='myvtn')
  88. server.add_handler('on_create_party_registration', on_create_party_registration)
  89. event = objects.Event(event_descriptor=objects.EventDescriptor(event_id='event001',
  90. modification_number=0,
  91. event_status='far',
  92. market_context='http://marketcontext01'),
  93. event_signals=[objects.EventSignal(signal_id='signal001',
  94. signal_type='level',
  95. signal_name='simple',
  96. intervals=[objects.Interval(dtstart=now,
  97. duration=datetime.timedelta(minutes=10),
  98. signal_payload=1)]),
  99. objects.EventSignal(signal_id='signal002',
  100. signal_type='price',
  101. signal_name='ELECTRICITY_PRICE',
  102. intervals=[objects.Interval(dtstart=now,
  103. duration=datetime.timedelta(minutes=10),
  104. signal_payload=1)])],
  105. targets=[objects.Target(ven_id='ven123')])
  106. loop = asyncio.get_event_loop()
  107. event_callback_future = loop.create_future()
  108. server.add_raw_event(ven_id='ven123', event=event, callback=partial(event_callback, future=event_callback_future))
  109. on_event_future = loop.create_future()
  110. client = OpenADRClient(ven_name='myven',
  111. vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  112. client.add_handler('on_event', partial(on_event_opt_in, future=on_event_future))
  113. await server.run_async()
  114. await client.run()
  115. event = await on_event_future
  116. assert len(event['event_signals']) == 2
  117. result = await event_callback_future
  118. assert result == 'optIn'
  119. await client.stop()
  120. await server.stop()
  121. @pytest.mark.asyncio
  122. async def test_create_event_with_future_as_callback():
  123. now = datetime.datetime.now(datetime.timezone.utc)
  124. server = OpenADRServer(vtn_id='myvtn')
  125. server.add_handler('on_create_party_registration', on_create_party_registration)
  126. event = objects.Event(event_descriptor=objects.EventDescriptor(event_id='event001',
  127. modification_number=0,
  128. event_status='far',
  129. market_context='http://marketcontext01'),
  130. event_signals=[objects.EventSignal(signal_id='signal001',
  131. signal_type='level',
  132. signal_name='simple',
  133. intervals=[objects.Interval(dtstart=now,
  134. duration=datetime.timedelta(minutes=10),
  135. signal_payload=1)]),
  136. objects.EventSignal(signal_id='signal002',
  137. signal_type='price',
  138. signal_name='ELECTRICITY_PRICE',
  139. intervals=[objects.Interval(dtstart=now,
  140. duration=datetime.timedelta(minutes=10),
  141. signal_payload=1)])],
  142. targets=[objects.Target(ven_id='ven123')])
  143. loop = asyncio.get_event_loop()
  144. event_callback_future = loop.create_future()
  145. server.add_raw_event(ven_id='ven123', event=event, callback=event_callback_future)
  146. on_event_future = loop.create_future()
  147. client = OpenADRClient(ven_name='myven',
  148. vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  149. client.add_handler('on_event', partial(on_event_opt_in, future=on_event_future))
  150. await server.run_async()
  151. await client.run()
  152. event = await on_event_future
  153. assert len(event['event_signals']) == 2
  154. result = await event_callback_future
  155. assert result == 'optIn'
  156. await client.stop()
  157. await server.stop()
  158. @pytest.mark.asyncio
  159. async def test_multiple_events_in_queue():
  160. now = datetime.datetime.now(datetime.timezone.utc)
  161. server = OpenADRServer(vtn_id='myvtn')
  162. server.add_handler('on_create_party_registration', on_create_party_registration)
  163. loop = asyncio.get_event_loop()
  164. event_1_callback_future = loop.create_future()
  165. event_2_callback_future = loop.create_future()
  166. server.add_event(ven_id='ven123',
  167. signal_name='simple',
  168. signal_type='level',
  169. intervals=[objects.Interval(dtstart=now,
  170. duration=datetime.timedelta(seconds=1),
  171. signal_payload=1)],
  172. callback=event_1_callback_future)
  173. await server.run()
  174. on_event_future = loop.create_future()
  175. client = OpenADRClient(ven_name='ven123',
  176. vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  177. await client.create_party_registration()
  178. response_type, response_payload = await client.request_event()
  179. assert response_type == 'oadrDistributeEvent'
  180. events = response_payload['events']
  181. assert len(events) == 1
  182. event_id = events[0]['event_descriptor']['event_id']
  183. request_id = response_payload['request_id']
  184. await client.created_event(request_id=request_id,
  185. event_id=event_id,
  186. opt_type='optIn',
  187. modification_number=0)
  188. server.add_event(ven_id='ven123',
  189. signal_name='simple',
  190. signal_type='level',
  191. intervals=[objects.Interval(dtstart=now + datetime.timedelta(seconds=1),
  192. duration=datetime.timedelta(seconds=1),
  193. signal_payload=1)],
  194. callback=event_2_callback_future)
  195. response_type, response_payload = await client.request_event()
  196. assert response_type == 'oadrDistributeEvent'
  197. events = response_payload['events']
  198. # Assert that we still have two events in the response
  199. assert len(events) == 2
  200. # Wait one second and retrieve the events again
  201. await asyncio.sleep(1)
  202. response_type, response_payload = await client.request_event()
  203. assert response_type == 'oadrDistributeEvent'
  204. events = response_payload['events']
  205. assert len(events) == 2
  206. assert events[1]['event_descriptor']['event_status'] == 'completed'
  207. response_type, response_payload = await client.request_event()
  208. assert response_type == 'oadrDistributeEvent'
  209. events = response_payload['events']
  210. assert len(events) == 1
  211. await asyncio.sleep(1)
  212. response_type, response_payload = await client.request_event()
  213. assert response_type == 'oadrDistributeEvent'
  214. response_type, response_payload = await client.request_event()
  215. assert response_type == 'oadrResponse'
  216. await server.stop()
  217. @pytest.mark.asyncio
  218. async def test_client_event_cleanup():
  219. now = datetime.datetime.now(datetime.timezone.utc)
  220. server = OpenADRServer(vtn_id='myvtn')
  221. server.add_handler('on_create_party_registration', on_create_party_registration)
  222. loop = asyncio.get_event_loop()
  223. event_1_callback_future = loop.create_future()
  224. event_2_callback_future = loop.create_future()
  225. server.add_event(ven_id='ven123',
  226. signal_name='simple',
  227. signal_type='level',
  228. intervals=[objects.Interval(dtstart=now,
  229. duration=datetime.timedelta(seconds=1),
  230. signal_payload=1)],
  231. callback=event_1_callback_future)
  232. await server.run()
  233. client = OpenADRClient(ven_name='ven123',
  234. vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  235. client.add_handler('on_event', on_event_opt_in)
  236. await client.run()
  237. await asyncio.sleep(0.5)
  238. assert len(client.received_events) == 1
  239. await asyncio.sleep(0.5)
  240. await client._event_cleanup()
  241. assert len(client.received_events) == 0
  242. await server.stop()
  243. await client.stop()