test_event_warnings_errors.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. from openleadr import OpenADRClient, OpenADRServer, enable_default_logging, utils
  2. import pytest
  3. from functools import partial
  4. import asyncio
  5. from datetime import datetime, timedelta, timezone
  6. import logging
  7. async def on_create_party_registration(ven_name):
  8. return 'venid', 'regid'
  9. async def on_event_accepted(ven_id, event_id, opt_type, future=None):
  10. if future and future.done() is False:
  11. future.set_result(opt_type)
  12. async def good_on_event(event):
  13. return 'optIn'
  14. async def faulty_on_event(event):
  15. return None
  16. async def broken_on_event(event):
  17. raise KeyError("BOOM")
  18. @pytest.mark.asyncio
  19. async def test_client_no_event_handler(caplog):
  20. caplog.set_level(logging.WARNING)
  21. enable_default_logging()
  22. logger = logging.getLogger('openleadr')
  23. logger.setLevel(logging.DEBUG)
  24. client = OpenADRClient(ven_name='myven',
  25. vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  26. server = OpenADRServer(vtn_id='myvtn', requested_poll_freq=timedelta(seconds=1))
  27. server.add_handler('on_create_party_registration', on_create_party_registration)
  28. print("Running server")
  29. await server.run_async()
  30. await asyncio.sleep(0.1)
  31. print("Running client")
  32. await client.run()
  33. event_confirm_future = asyncio.get_event_loop().create_future()
  34. print("Adding event")
  35. server.add_event(ven_id='venid',
  36. signal_name='simple',
  37. signal_type='level',
  38. intervals=[{'dtstart': datetime.now(timezone.utc),
  39. 'duration': timedelta(seconds=1),
  40. 'signal_payload': 1.1}],
  41. target={'ven_id': 'venid'},
  42. callback=partial(on_event_accepted, future=event_confirm_future))
  43. print("Waiting for a response to the event")
  44. result = await event_confirm_future
  45. assert result == 'optOut'
  46. assert ("You should implement your own on_event handler. This handler receives "
  47. "an Event dict and should return either 'optIn' or 'optOut' based on your "
  48. "choice. Will opt out of the event for now.") in [rec.message for rec in caplog.records]
  49. await client.stop()
  50. await server.stop()
  51. await asyncio.gather(*[t for t in asyncio.all_tasks()][1:])
  52. @pytest.mark.asyncio
  53. async def test_client_faulty_event_handler(caplog):
  54. caplog.set_level(logging.WARNING)
  55. enable_default_logging()
  56. logger = logging.getLogger('openleadr')
  57. logger.setLevel(logging.DEBUG)
  58. client = OpenADRClient(ven_name='myven',
  59. vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  60. client.add_handler('on_event', faulty_on_event)
  61. server = OpenADRServer(vtn_id='myvtn', requested_poll_freq=timedelta(seconds=1))
  62. server.add_handler('on_create_party_registration', on_create_party_registration)
  63. print("Running server")
  64. await server.run_async()
  65. await asyncio.sleep(0.1)
  66. print("Running client")
  67. await client.run()
  68. event_confirm_future = asyncio.get_event_loop().create_future()
  69. print("Adding event")
  70. server.add_event(ven_id='venid',
  71. signal_name='simple',
  72. signal_type='level',
  73. intervals=[{'dtstart': datetime.now(timezone.utc),
  74. 'duration': timedelta(seconds=1),
  75. 'signal_payload': 1.1}],
  76. target={'ven_id': 'venid'},
  77. callback=partial(on_event_accepted, future=event_confirm_future))
  78. print("Waiting for a response to the event")
  79. result = await event_confirm_future
  80. assert result == 'optOut'
  81. assert ("Your on_event or on_update_event handler must return 'optIn' or 'optOut'; "
  82. f"you supplied {None}. Please fix your on_event handler.") in [rec.message for rec in caplog.records]
  83. await client.stop()
  84. await server.stop()
  85. await asyncio.gather(*[t for t in asyncio.all_tasks()][1:])
  86. @pytest.mark.asyncio
  87. async def test_client_exception_event_handler(caplog):
  88. caplog.set_level(logging.WARNING)
  89. enable_default_logging()
  90. logger = logging.getLogger('openleadr')
  91. logger.setLevel(logging.DEBUG)
  92. client = OpenADRClient(ven_name='myven',
  93. vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  94. client.add_handler('on_event', broken_on_event)
  95. server = OpenADRServer(vtn_id='myvtn', requested_poll_freq=timedelta(seconds=1))
  96. server.add_handler('on_create_party_registration', on_create_party_registration)
  97. print("Running server")
  98. await server.run_async()
  99. await asyncio.sleep(0.1)
  100. print("Running client")
  101. await client.run()
  102. event_confirm_future = asyncio.get_event_loop().create_future()
  103. print("Adding event")
  104. server.add_event(ven_id='venid',
  105. signal_name='simple',
  106. signal_type='level',
  107. intervals=[{'dtstart': datetime.now(timezone.utc),
  108. 'duration': timedelta(seconds=1),
  109. 'signal_payload': 1.1}],
  110. target={'ven_id': 'venid'},
  111. callback=partial(on_event_accepted, future=event_confirm_future))
  112. print("Waiting for a response to the event")
  113. result = await event_confirm_future
  114. assert result == 'optOut'
  115. err = KeyError("BOOM")
  116. assert ("Your on_event handler encountered an error. Will Opt Out of the event. "
  117. f"The error was {err.__class__.__name__}: {str(err)}") in [rec.message for rec in caplog.records]
  118. await client.stop()
  119. await server.stop()
  120. await asyncio.gather(*[t for t in asyncio.all_tasks()][1:])
  121. @pytest.mark.asyncio
  122. async def test_client_good_event_handler(caplog):
  123. caplog.set_level(logging.WARNING)
  124. enable_default_logging()
  125. logger = logging.getLogger('openleadr')
  126. logger.setLevel(logging.DEBUG)
  127. client = OpenADRClient(ven_name='myven',
  128. vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  129. client.add_handler('on_event', good_on_event)
  130. server = OpenADRServer(vtn_id='myvtn', requested_poll_freq=timedelta(seconds=1))
  131. server.add_handler('on_create_party_registration', on_create_party_registration)
  132. print("Running server")
  133. await server.run_async()
  134. await asyncio.sleep(0.1)
  135. print("Running client")
  136. await client.run()
  137. event_confirm_future = asyncio.get_event_loop().create_future()
  138. print("Adding event")
  139. server.add_event(ven_id='venid',
  140. signal_name='simple',
  141. signal_type='level',
  142. intervals=[{'dtstart': datetime.now(timezone.utc),
  143. 'duration': timedelta(seconds=1),
  144. 'signal_payload': 1.1}],
  145. target={'ven_id': 'venid'},
  146. callback=partial(on_event_accepted, future=event_confirm_future))
  147. print("Waiting for a response to the event")
  148. result = await event_confirm_future
  149. assert result == 'optIn'
  150. assert len(caplog.records) == 0
  151. await client.stop()
  152. await server.stop()
  153. await asyncio.sleep(1)
  154. @pytest.mark.asyncio
  155. async def test_server_warning_conflicting_poll_methods(caplog):
  156. caplog.set_level(logging.WARNING)
  157. enable_default_logging()
  158. logger = logging.getLogger('openleadr')
  159. logger.setLevel(logging.DEBUG)
  160. server = OpenADRServer(vtn_id='myvtn', requested_poll_freq=timedelta(seconds=1))
  161. server.add_handler('on_poll', print)
  162. server.add_event(ven_id='venid',
  163. signal_name='simple',
  164. signal_type='level',
  165. intervals=[{'dtstart': datetime.now(timezone.utc),
  166. 'duration': timedelta(seconds=1),
  167. 'signal_payload': 1.1}],
  168. target={'ven_id': 'venid'},
  169. callback=on_event_accepted)
  170. assert ("You cannot use the add_event method after you assign your own on_poll "
  171. "handler. If you use your own on_poll handler, you are responsible for "
  172. "delivering events from that handler. If you want to use OpenLEADRs "
  173. "message queuing system, you should not assign an on_poll handler. "
  174. "Your Event will NOT be added.") in [record.msg for record in caplog.records]
  175. @pytest.mark.asyncio
  176. async def test_server_warning_naive_datetimes_in_event(caplog):
  177. caplog.set_level(logging.WARNING)
  178. enable_default_logging()
  179. logger = logging.getLogger('openleadr')
  180. logger.setLevel(logging.DEBUG)
  181. server = OpenADRServer(vtn_id='myvtn', requested_poll_freq=timedelta(seconds=1))
  182. server.add_event(ven_id='venid',
  183. signal_name='simple',
  184. signal_type='level',
  185. intervals=[{'dtstart': datetime.now(),
  186. 'duration': timedelta(seconds=1),
  187. 'signal_payload': 1.1}],
  188. target={'ven_id': 'venid'},
  189. callback=on_event_accepted)
  190. assert ("You supplied a naive datetime object to your interval's dtstart. "
  191. "This will be interpreted as a timestamp in your local timezone "
  192. "and then converted to UTC before sending. Please supply timezone-"
  193. "aware timestamps like datetime.datetime.new(timezone.utc) or "
  194. "datetime.datetime(..., tzinfo=datetime.timezone.utc)") in [record.msg for record in caplog.records]
  195. @pytest.mark.asyncio
  196. async def test_client_warning_no_update_event_handler(caplog):
  197. caplog.set_level(logging.WARNING)
  198. enable_default_logging()
  199. logger = logging.getLogger('openleadr')
  200. logger.setLevel(logging.DEBUG)
  201. server = OpenADRServer(vtn_id='myvtn', requested_poll_freq=timedelta(seconds=1))
  202. server.add_handler('on_create_party_registration', on_create_party_registration)
  203. server.add_event(ven_id='venid',
  204. signal_name='simple',
  205. signal_type='level',
  206. intervals=[{'dtstart': datetime.now(timezone.utc),
  207. 'duration': timedelta(seconds=1),
  208. 'signal_payload': 1.1}],
  209. target={'ven_id': 'venid'},
  210. callback=on_event_accepted)
  211. client = OpenADRClient(ven_name='myven',
  212. vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  213. client.add_handler('on_event', good_on_event)
  214. await server.run_async()
  215. await asyncio.sleep(0.5)
  216. await client.run()
  217. await asyncio.sleep(2)
  218. assert ("You should implement your own on_update_event handler. This handler receives "
  219. "an Event dict and should return either 'optIn' or 'optOut' based on your "
  220. "choice. Will re-use the previous opt status for this event_id for now") in [record.msg for record in caplog.records]
  221. await client.stop()
  222. await server.stop()
  223. await asyncio.gather(*[t for t in asyncio.all_tasks()][1:])