test_reports.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810
  1. from openleadr import OpenADRClient, OpenADRServer, enable_default_logging
  2. import asyncio
  3. import pytest
  4. import aiohttp
  5. from datetime import datetime, timedelta
  6. from functools import partial
  7. import logging
  8. from random import random
  9. import time
  10. from openleadr.messaging import create_message
  11. loop = asyncio.get_event_loop()
  12. loop.set_debug(True)
  13. enable_default_logging()
  14. async def collect_data(future=None):
  15. print("Collect Data")
  16. value = 100 * random()
  17. if future:
  18. future.set_result(value)
  19. return value
  20. async def lookup_ven(ven_name=None, ven_id=None):
  21. """
  22. Look up a ven by its name or ID
  23. """
  24. return {'ven_id': 'ven1234'}
  25. async def receive_data(data, future=None):
  26. if future:
  27. future.set_result(data)
  28. pass
  29. async def on_update_report(report, futures=None):
  30. if futures:
  31. for future in futures:
  32. if future.done() is False:
  33. future.set_result(report)
  34. break
  35. pass
  36. async def on_register_report(ven_id, resource_id, measurement, unit, scale,
  37. min_sampling_interval, max_sampling_interval, bundling=1, futures=None, receive_futures=None):
  38. """
  39. Deal with this report.
  40. """
  41. print(f"Called on register report {ven_id}, {resource_id}, {measurement}, {unit}, {scale}, {min_sampling_interval}, {max_sampling_interval}")
  42. assert resource_id in ('Device001', 'Device002')
  43. if futures:
  44. futures.pop(0).set_result(True)
  45. if receive_futures:
  46. callback = partial(receive_data, future=receive_futures.pop(0))
  47. else:
  48. callback = receive_data
  49. if bundling > 1:
  50. print(f"Returning from on register report {callback}, {min_sampling_interval}, {bundling * min_sampling_interval}")
  51. return callback, min_sampling_interval, bundling * min_sampling_interval
  52. print(f"Returning from on register report {callback}, {min_sampling_interval}")
  53. return callback, min_sampling_interval
  54. async def on_register_report_full(report, futures=None):
  55. """
  56. Deal with this report.
  57. """
  58. if futures:
  59. futures.pop().set_result(True)
  60. granularity = min(*[rd['sampling_rate']['min_period'] for rd in report['report_descriptions']])
  61. report_requests = [(rd['r_id'], on_update_report, granularity) for rd in report['report_descriptions'] if report['report_name'] == 'METADATA_TELEMETRY_USAGE']
  62. return report_requests
  63. async def on_create_party_registration(ven_name, future=None):
  64. if future:
  65. future.set_result(True)
  66. ven_id = '1234'
  67. registration_id = 'abcd'
  68. return ven_id, registration_id
  69. @pytest.mark.asyncio
  70. async def test_report_registration():
  71. """
  72. Test the registration of two reports with two r_ids each.
  73. """
  74. # Create a server
  75. logger = logging.getLogger('openleadr')
  76. logger.setLevel(logging.DEBUG)
  77. server = OpenADRServer(vtn_id='testvtn')
  78. server.add_handler('on_register_report', on_register_report)
  79. server.add_handler('on_create_party_registration', on_create_party_registration)
  80. # Create a client
  81. client = OpenADRClient(ven_name='myven', vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b',)
  82. # Add 4 reports
  83. client.add_report(callback=collect_data,
  84. report_specifier_id='CurrentReport',
  85. resource_id='Device001',
  86. measurement='current',
  87. unit='A')
  88. client.add_report(callback=collect_data,
  89. report_specifier_id='CurrentReport',
  90. resource_id='Device002',
  91. measurement='current',
  92. unit='A')
  93. client.add_report(callback=collect_data,
  94. report_specifier_id='VoltageReport',
  95. resource_id='Device001',
  96. measurement='voltage',
  97. unit='V')
  98. client.add_report(callback=collect_data,
  99. report_specifier_id='VoltageReport',
  100. resource_id='Device002',
  101. measurement='voltage',
  102. unit='V')
  103. asyncio.create_task(server.run_async())
  104. #await asyncio.sleep(1)
  105. # Register the client
  106. await client.create_party_registration()
  107. # Register the reports
  108. await client.register_reports(client.reports)
  109. assert len(client.report_requests) == 2
  110. assert len(server.services['report_service'].report_callbacks) == 4
  111. await client.stop()
  112. await server.stop()
  113. async def collect_status():
  114. return 1
  115. @pytest.mark.asyncio
  116. async def test_report_registration_with_status_report():
  117. """
  118. Test the registration of two reports with two r_ids each.
  119. """
  120. # Create a server
  121. logger = logging.getLogger('openleadr')
  122. logger.setLevel(logging.DEBUG)
  123. server = OpenADRServer(vtn_id='testvtn')
  124. server.add_handler('on_register_report', on_register_report)
  125. server.add_handler('on_create_party_registration', on_create_party_registration)
  126. # Create a client
  127. client = OpenADRClient(ven_name='myven', vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b',)
  128. # Add 4 reports
  129. client.add_report(callback=collect_data,
  130. report_specifier_id='CurrentReport',
  131. resource_id='Device001',
  132. measurement='current',
  133. unit='A')
  134. client.add_report(callback=collect_data,
  135. report_specifier_id='CurrentReport',
  136. resource_id='Device002',
  137. measurement='current',
  138. unit='A')
  139. client.add_report(callback=collect_data,
  140. report_specifier_id='VoltageReport',
  141. resource_id='Device001',
  142. measurement='voltage',
  143. unit='V')
  144. client.add_report(callback=collect_data,
  145. report_specifier_id='VoltageReport',
  146. resource_id='Device002',
  147. measurement='voltage',
  148. unit='V')
  149. client.add_report(callback=collect_status,
  150. report_name='TELEMETRY_STATUS',
  151. report_specifier_id='StatusReport',
  152. resource_id='Device001')
  153. asyncio.create_task(server.run_async())
  154. # await asyncio.sleep(1)
  155. # Register the client
  156. await client.create_party_registration()
  157. # Register the reports
  158. await client.register_reports(client.reports)
  159. assert len(client.report_requests) == 3
  160. assert len(server.services['report_service'].report_callbacks) == 5
  161. await client.stop()
  162. await server.stop()
  163. @pytest.mark.asyncio
  164. async def test_report_registration_full():
  165. """
  166. Test the registration of two reports with two r_ids each.
  167. """
  168. # Create a server
  169. logger = logging.getLogger('openleadr')
  170. logger.setLevel(logging.DEBUG)
  171. server = OpenADRServer(vtn_id='testvtn')
  172. server.add_handler('on_register_report', on_register_report_full)
  173. server.add_handler('on_create_party_registration', on_create_party_registration)
  174. # Create a client
  175. client = OpenADRClient(ven_name='myven', vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  176. # Add 4 reports
  177. client.add_report(callback=collect_data,
  178. report_specifier_id='PowerReport',
  179. resource_id='Device001',
  180. measurement='power_real',
  181. unit='W')
  182. client.add_report(callback=collect_data,
  183. report_specifier_id='PowerReport',
  184. resource_id='Device002',
  185. measurement='power_real',
  186. unit='W')
  187. client.add_report(callback=collect_data,
  188. report_specifier_id='VoltageReport',
  189. resource_id='Device001',
  190. measurement='voltage',
  191. unit='V')
  192. client.add_report(callback=collect_data,
  193. report_specifier_id='VoltageReport',
  194. resource_id='Device002',
  195. measurement='voltage',
  196. unit='V')
  197. await server.run_async()
  198. # await asyncio.sleep(0.1)
  199. # Register the client
  200. await client.create_party_registration()
  201. # Register the reports
  202. await client.register_reports(client.reports)
  203. assert len(client.report_requests) == 2
  204. assert len(server.services['report_service'].report_callbacks) == 4
  205. await client.stop()
  206. await server.stop()
  207. @pytest.mark.asyncio
  208. async def test_update_reports():
  209. """
  210. Tests the timely delivery of requested reports
  211. """
  212. # Create a server
  213. logger = logging.getLogger('openleadr')
  214. logger.setLevel(logging.DEBUG)
  215. loop = asyncio.get_event_loop()
  216. server = OpenADRServer(vtn_id='testvtn')
  217. register_report_future_1 = loop.create_future()
  218. register_report_future_2 = loop.create_future()
  219. register_report_futures = [register_report_future_1, register_report_future_2]
  220. receive_report_future_1 = loop.create_future()
  221. receive_report_future_2 = loop.create_future()
  222. receive_report_future_3 = loop.create_future()
  223. receive_report_future_4 = loop.create_future()
  224. receive_report_futures = [receive_report_future_1, receive_report_future_2, receive_report_future_3, receive_report_future_4]
  225. server.add_handler('on_register_report', partial(on_register_report, futures=register_report_futures, receive_futures=receive_report_futures))
  226. party_future = loop.create_future()
  227. server.add_handler('on_create_party_registration', partial(on_create_party_registration, future=party_future))
  228. # Create a client
  229. client = OpenADRClient(ven_name='myven', vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  230. # Add 4 reports
  231. future_1 = loop.create_future()
  232. client.add_report(callback=partial(collect_data, future=future_1),
  233. report_specifier_id='PowerReport',
  234. resource_id='Device001',
  235. measurement='power_real',
  236. sampling_rate=timedelta(seconds=2),
  237. unit='W')
  238. future_2 = loop.create_future()
  239. client.add_report(callback=partial(collect_data, future=future_2),
  240. report_specifier_id='PowerReport',
  241. resource_id='Device002',
  242. measurement='power_real',
  243. sampling_rate=timedelta(seconds=2),
  244. unit='W')
  245. future_3 = loop.create_future()
  246. client.add_report(callback=partial(collect_data, future=future_3),
  247. report_specifier_id='VoltageReport',
  248. resource_id='Device001',
  249. measurement='voltage',
  250. sampling_rate=timedelta(seconds=2),
  251. unit='V')
  252. future_4 = loop.create_future()
  253. client.add_report(callback=partial(collect_data, future=future_4),
  254. report_specifier_id='VoltageReport',
  255. resource_id='Device002',
  256. measurement='voltage',
  257. sampling_rate=timedelta(seconds=2),
  258. unit='V')
  259. assert len(client.reports) == 2
  260. asyncio.create_task(server.run_async())
  261. # await asyncio.sleep(1)
  262. # Run the client asynchronously
  263. print("Running the client")
  264. asyncio.create_task(client.run())
  265. print("Awaiting party future")
  266. await party_future
  267. print("Awaiting report futures")
  268. await asyncio.gather(register_report_future_1, register_report_future_2)
  269. await asyncio.sleep(0.1)
  270. assert len(server.services['report_service'].report_callbacks) == 4
  271. print("Awaiting data collection futures")
  272. await future_1
  273. await future_2
  274. await future_3
  275. await future_4
  276. print("Awaiting update report futures")
  277. await asyncio.gather(receive_report_future_1, receive_report_future_2, receive_report_future_3, receive_report_future_4)
  278. print("Done gathering")
  279. assert receive_report_future_1.result()[0][1] == future_1.result()
  280. assert receive_report_future_2.result()[0][1] == future_2.result()
  281. assert receive_report_future_3.result()[0][1] == future_3.result()
  282. assert receive_report_future_4.result()[0][1] == future_4.result()
  283. await client.stop()
  284. await server.stop()
  285. async def get_historic_data(date_from, date_to):
  286. pass
  287. async def collect_data_multi(futures=None):
  288. print("Data Collected")
  289. if futures:
  290. for i, future in enumerate(futures):
  291. if future.done() is False:
  292. print(f"Marking future {i} as done")
  293. future.set_result(True)
  294. break
  295. return 3.14
  296. @pytest.mark.asyncio
  297. async def test_incremental_reports():
  298. loop = asyncio.get_event_loop()
  299. client = OpenADRClient(ven_name='myven', vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  300. collect_futures = [loop.create_future() for i in range(2)]
  301. client.add_report(callback=partial(collect_data_multi, futures=collect_futures),
  302. report_specifier_id='myhistory',
  303. measurement='voltage',
  304. resource_id='Device001',
  305. sampling_rate=timedelta(seconds=2))
  306. server = OpenADRServer(vtn_id='myvtn')
  307. register_report_future = loop.create_future()
  308. update_report_future = loop.create_future()
  309. server.add_handler('on_register_report', partial(on_register_report,
  310. bundling=2,
  311. futures=[register_report_future],
  312. receive_futures=[update_report_future]))
  313. party_future = loop.create_future()
  314. server.add_handler('on_create_party_registration',
  315. partial(on_create_party_registration, future=party_future))
  316. loop.create_task(server.run_async())
  317. # await asyncio.sleep(1)
  318. await client.run()
  319. print("Awaiting party future")
  320. await party_future
  321. print("Awaiting register report future")
  322. await register_report_future
  323. print("Awaiting first data collection future... ", end="")
  324. await collect_futures[0]
  325. print("check")
  326. # await asyncio.sleep(1)
  327. print("Checking that the report was not yet sent... ", end="")
  328. assert update_report_future.done() is False
  329. print("check")
  330. print("Awaiting data collection second future... ", end="")
  331. await collect_futures[1]
  332. print("check")
  333. print("Awaiting report update future")
  334. result = await update_report_future
  335. assert len(result) == 2
  336. await server.stop()
  337. await client.stop()
  338. # await asyncio.sleep(0)
  339. async def collect_data_history(date_from, date_to, sampling_interval, futures):
  340. data = [(date_from, 1.0), (date_to, 2.0)]
  341. if futures:
  342. for future in futures:
  343. if future.done() is False:
  344. future.set_result(data)
  345. break
  346. return data
  347. @pytest.mark.asyncio
  348. async def test_update_report_data_collection_mode_full():
  349. loop = asyncio.get_event_loop()
  350. client = OpenADRClient(ven_name='myven', vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  351. data_collection_future = loop.create_future()
  352. client.add_report(callback=partial(collect_data_history, futures=[data_collection_future]),
  353. resource_id='Device001',
  354. measurement='power_real',
  355. data_collection_mode='full',
  356. sampling_rate=timedelta(seconds=1),
  357. unit='W')
  358. report_register_future = loop.create_future()
  359. report_received_future = loop.create_future()
  360. party_registration_future = loop.create_future()
  361. server = OpenADRServer(vtn_id='myvtn')
  362. server.add_handler('on_create_party_registration', partial(on_create_party_registration, future=party_registration_future))
  363. server.add_handler('on_register_report', partial(on_register_report,
  364. bundling=2,
  365. futures=[report_register_future],
  366. receive_futures=[report_received_future]))
  367. await server.run_async()
  368. # await asyncio.sleep(0.1)
  369. print(f"The time is now {datetime.now()}")
  370. t = time.time()
  371. wait_for = int(t/2) * 2 + 2 - t
  372. # await asyncio.sleep(wait_for)
  373. print(f"The time is now {datetime.now()}, running client")
  374. await client.run()
  375. await party_registration_future
  376. await report_register_future
  377. # await asyncio.sleep(1)
  378. print(f"The time is now {datetime.now()}, checking if report was triggered")
  379. assert data_collection_future.done() is False
  380. print("Waiting for the data collection to occur")
  381. await data_collection_future
  382. print("Waiting for the report to be received")
  383. await report_received_future
  384. print("Done")
  385. await server.stop()
  386. await client.stop()
  387. def test_add_report_invalid_unit(caplog):
  388. client = OpenADRClient(ven_name='myven', vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  389. client.add_report(callback=print,
  390. report_specifier_id='myreport',
  391. measurement='voltage',
  392. resource_id='Device001',
  393. sampling_rate=timedelta(seconds=10),
  394. unit='A')
  395. assert caplog.record_tuples == [("openleadr", logging.WARNING, f"The supplied unit A for measurement voltage will be ignored, V will be used instead. Allowed units for this measurement are: V")]
  396. def test_add_report_invalid_scale():
  397. client = OpenADRClient(ven_name='myven', vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  398. with pytest.raises(ValueError):
  399. client.add_report(callback=print,
  400. report_specifier_id='myreport',
  401. measurement='power_real',
  402. resource_id='Device001',
  403. sampling_rate=timedelta(seconds=10),
  404. unit='W',
  405. scale='xxx')
  406. def test_add_report_invalid_description(caplog):
  407. client = OpenADRClient(ven_name='myven', vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  408. client.add_report(callback=print,
  409. report_specifier_id='myreport',
  410. measurement={'name': 'voltage', 'description': 'SomethingWrong', 'unit': 'V'},
  411. resource_id='Device001',
  412. sampling_rate=timedelta(seconds=10))
  413. msg = create_message('oadrRegisterReport', reports=client.reports)
  414. def test_add_report_invalid_description(caplog):
  415. client = OpenADRClient(ven_name='myven', vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  416. with pytest.raises(ValueError):
  417. client.add_report(callback=print,
  418. report_specifier_id='myreport',
  419. measurement={'name': 'voltage', 'description': 'SomethingWrong', 'unit': 'V'},
  420. resource_id='Device001',
  421. sampling_rate=timedelta(seconds=10))
  422. def test_add_report_non_standard_measurement():
  423. client = OpenADRClient(ven_name='myven', vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  424. client.add_report(callback=print,
  425. report_specifier_id='myreport',
  426. measurement='rainbows',
  427. resource_id='Device001',
  428. sampling_rate=timedelta(seconds=10),
  429. unit='A')
  430. assert len(client.reports) == 1
  431. assert client.reports[0].report_descriptions[0].measurement.name == 'customUnit'
  432. assert client.reports[0].report_descriptions[0].measurement.description == 'rainbows'
  433. @pytest.mark.asyncio
  434. async def test_different_on_register_report_handlers(caplog):
  435. def on_create_party_registration(registration_info):
  436. return 'ven123', 'reg123'
  437. def get_value():
  438. return 123.456
  439. def report_callback(data):
  440. pass
  441. def on_register_report_returning_none(ven_id, resource_id, measurement, unit, scale, min_sampling_interval, max_sampling_interval):
  442. return None
  443. def on_register_report_returning_string(ven_id, resource_id, measurement, unit, scale, min_sampling_interval, max_sampling_interval):
  444. return "Hello There"
  445. def on_register_report_returning_uncallable_first_element(ven_id, resource_id, measurement, unit, scale, min_sampling_interval, max_sampling_interval):
  446. return ("Hello", "There")
  447. def on_register_report_returning_non_datetime_second_element(ven_id, resource_id, measurement, unit, scale, min_sampling_interval, max_sampling_interval):
  448. return (report_callback, "Hello There")
  449. def on_register_report_returning_non_datetime_third_element(ven_id, resource_id, measurement, unit, scale, min_sampling_interval, max_sampling_interval):
  450. return (report_callback, timedelta(minutes=10), "Hello There")
  451. def on_register_report_returning_too_long_tuple(ven_id, resource_id, measurement, unit, scale, min_sampling_interval, max_sampling_interval):
  452. return (report_callback, timedelta(minutes=10), timedelta(minutes=10), "Hello")
  453. def on_register_report_full_returning_string(report):
  454. return "Hello There"
  455. def on_register_report_full_returning_list_of_strings(report):
  456. return ["Hello", "There"]
  457. def on_register_report_full_returning_list_of_tuples_of_wrong_length(report):
  458. return [("Hello", "There")]
  459. def on_register_report_full_returning_list_of_tuples_with_no_callable(report):
  460. return [("Hello", "There", "World")]
  461. def on_register_report_full_returning_list_of_tuples_with_no_timedelta(report):
  462. return [(report_callback, "Hello There")]
  463. server = OpenADRServer(vtn_id='myvtn')
  464. server.add_handler('on_create_party_registration', on_create_party_registration)
  465. client = OpenADRClient(ven_name='myven',
  466. vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  467. client.add_report(resource_id='Device001',
  468. measurement='voltage',
  469. sampling_rate=timedelta(minutes=10),
  470. callback=get_value)
  471. await server.run()
  472. await client.create_party_registration()
  473. assert client.ven_id == 'ven123'
  474. await client.register_reports(client.reports)
  475. assert len(client.report_requests) == 0
  476. messages = [rec.message for rec in caplog.records if rec.levelno == logging.ERROR]
  477. assert len(messages) == 0
  478. caplog.clear()
  479. server.add_handler('on_register_report', on_register_report_returning_none)
  480. await client.register_reports(client.reports)
  481. assert len(client.report_requests) == 0
  482. messages = [rec.message for rec in caplog.records if rec.levelno == logging.ERROR]
  483. assert len(messages) == 0
  484. caplog.clear()
  485. server.add_handler('on_register_report', on_register_report_returning_string)
  486. await client.register_reports(client.reports)
  487. assert len(client.report_requests) == 0
  488. assert "Your on_register_report handler must return a tuple or None; it returned 'Hello There' (str)." in caplog.messages
  489. caplog.clear()
  490. server.add_handler('on_register_report', on_register_report_returning_uncallable_first_element)
  491. await client.register_reports(client.reports)
  492. assert len(client.report_requests) == 0
  493. assert(f"Your on_register_report handler did not return the correct tuple. "
  494. "It should return a (callback, sampling_interval) or "
  495. "(callback, sampling_interval, reporting_interval) tuple, where "
  496. "the callback is a callable function or coroutine, and "
  497. "sampling_interval and reporting_interval are of type datetime.timedelta. "
  498. "It returned: '('Hello', 'There')'. The first element was not callable.") in caplog.messages
  499. caplog.clear()
  500. server.add_handler('on_register_report', on_register_report_returning_non_datetime_second_element)
  501. await client.register_reports(client.reports)
  502. assert len(client.report_requests) == 0
  503. assert (f"Your on_register_report handler did not return the correct tuple. "
  504. "It should return a (callback, sampling_interval) or "
  505. "(callback, sampling_interval, reporting_interval) tuple, where "
  506. "sampling_interval and reporting_interval are of type datetime.timedelta. "
  507. f"It returned: '{(report_callback, 'Hello There')}'. The second element was not of type timedelta.") in caplog.messages
  508. caplog.clear()
  509. server.add_handler('on_register_report', on_register_report_returning_non_datetime_third_element)
  510. await client.register_reports(client.reports)
  511. assert len(client.report_requests) == 0
  512. assert ("Your on_register_report handler did not return the correct tuple. "
  513. "It should return a (callback, sampling_interval) or "
  514. "(callback, sampling_interval, reporting_interval) tuple, where "
  515. "sampling_interval and reporting_interval are of type datetime.timedelta. "
  516. f"It returned: '{(report_callback, timedelta(minutes=10), 'Hello There')}'. The third element was not of type timedelta.") in caplog.messages
  517. caplog.clear()
  518. server.add_handler('on_register_report', on_register_report_returning_too_long_tuple)
  519. await client.register_reports(client.reports)
  520. assert len(client.report_requests) == 0
  521. assert ("Your on_register_report handler returned a tuple of the wrong length. "
  522. "It should be 2 or 3. "
  523. f"It returned: '{(report_callback, timedelta(minutes=10), timedelta(minutes=10), 'Hello')}'.") in caplog.messages
  524. caplog.clear()
  525. server.add_handler('on_register_report', on_register_report_full_returning_string)
  526. await client.register_reports(client.reports)
  527. assert len(client.report_requests) == 0
  528. assert "Your on_register_report handler must return a list of tuples or None; it returned 'Hello There' (str)." in caplog.messages
  529. caplog.clear()
  530. server.add_handler('on_register_report', on_register_report_full_returning_list_of_strings)
  531. await client.register_reports(client.reports)
  532. assert len(client.report_requests) == 0
  533. assert ("Your on_register_report handler must return a list of tuples or None; "
  534. f"The first item from the list was 'Hello' (str).") in caplog.messages
  535. caplog.clear()
  536. server.add_handler('on_register_report', on_register_report_full_returning_list_of_tuples_of_wrong_length)
  537. await client.register_reports(client.reports)
  538. assert len(client.report_requests) == 0
  539. assert ("Your on_register_report handler returned tuples of the wrong length. "
  540. "It should be 3 or 4. It returned: '('Hello', 'There')'.") in caplog.messages
  541. caplog.clear()
  542. server.add_handler('on_register_report', on_register_report_full_returning_list_of_tuples_with_no_callable)
  543. await client.register_reports(client.reports)
  544. assert len(client.report_requests) == 0
  545. assert ("Your on_register_report handler did not return the correct tuple. "
  546. "It should return a list of (r_id, callback, sampling_interval) or "
  547. "(r_id, callback, sampling_interval, reporting_interval) tuples, "
  548. "where the r_id is a string, callback is a callable function or "
  549. "coroutine, and sampling_interval and reporting_interval are of "
  550. "type datetime.timedelta. It returned: '('Hello', 'There', 'World')'. "
  551. "The second element was not callable.") in caplog.messages
  552. caplog.clear()
  553. server.add_handler('on_register_report', on_register_report_full_returning_list_of_tuples_with_no_timedelta)
  554. await client.register_reports(client.reports)
  555. assert len(client.report_requests) == 0
  556. assert ("Your on_register_report handler returned tuples of the wrong length. "
  557. f"It should be 3 or 4. It returned: '({report_callback}, 'Hello There')'.") in caplog.messages
  558. await server.stop()
  559. await client.stop()
  560. @pytest.mark.asyncio
  561. async def test_report_registration_broken_handlers_raw_message(caplog):
  562. msg = """<?xml version="1.0" encoding="UTF-8" standalone="no" ?>
  563. <p1:oadrPayload xmlns:p1="http://openadr.org/oadr-2.0b/2012/07">
  564. <p1:oadrSignedObject>
  565. <p1:oadrRegisterReport xmlns:p3="http://docs.oasis-open.org/ns/energyinterop/201110" p3:schemaVersion="2.0b" xmlns:p2="http://docs.oasis-open.org/ns/energyinterop/201110/payloads">
  566. <p2:requestID>B8A6E0D2D4</p2:requestID>
  567. <p1:oadrReport xmlns:p3="urn:ietf:params:xml:ns:icalendar-2.0" xmlns:p4="http://docs.oasis-open.org/ns/energyinterop/201110">
  568. <p3:duration>
  569. <p3:duration>PT120M</p3:duration>
  570. </p3:duration>
  571. <p1:oadrReportDescription xmlns:p4="http://docs.oasis-open.org/ns/energyinterop/201110" xmlns:p5="http://docs.oasis-open.org/ns/emix/2011/06/power" xmlns:p6="http://docs.oasis-open.org/ns/emix/2011/06">
  572. <p4:rID>rid_energy_4184bb93</p4:rID>
  573. <p4:reportDataSource>
  574. <p4:resourceID>DEVICE1</p4:resourceID>
  575. </p4:reportDataSource>
  576. <p4:reportType>reading</p4:reportType>
  577. <p5:energyReal xmlns:p6="http://docs.oasis-open.org/ns/emix/2011/06/siscale">
  578. <p5:itemDescription/>
  579. <p5:itemUnits>Wh</p5:itemUnits>
  580. <p6:siScaleCode>none</p6:siScaleCode>
  581. </p5:energyReal>
  582. <p4:readingType>Direct Read</p4:readingType>
  583. <p6:marketContext/>
  584. <p1:oadrSamplingRate>
  585. <p1:oadrMinPeriod>PT1M</p1:oadrMinPeriod>
  586. <p1:oadrMaxPeriod>PT1M</p1:oadrMaxPeriod>
  587. <p1:oadrOnChange>false</p1:oadrOnChange>
  588. </p1:oadrSamplingRate>
  589. </p1:oadrReportDescription>
  590. <p1:oadrReportDescription xmlns:p4="http://docs.oasis-open.org/ns/energyinterop/201110" xmlns:p5="http://docs.oasis-open.org/ns/emix/2011/06/power" xmlns:p6="http://docs.oasis-open.org/ns/emix/2011/06">
  591. <p4:rID>rid_power_4184bb93</p4:rID>
  592. <p4:reportDataSource>
  593. <p4:resourceID>DEVICE1</p4:resourceID>
  594. </p4:reportDataSource>
  595. <p4:reportType>reading</p4:reportType>
  596. <p5:powerReal xmlns:p6="http://docs.oasis-open.org/ns/emix/2011/06/siscale">
  597. <p5:itemDescription/>
  598. <p5:itemUnits>W</p5:itemUnits>
  599. <p6:siScaleCode>none</p6:siScaleCode>
  600. <p5:powerAttributes>
  601. <p5:hertz>60</p5:hertz>
  602. <p5:voltage>120</p5:voltage>
  603. <p5:ac>true</p5:ac>
  604. </p5:powerAttributes>
  605. </p5:powerReal>
  606. <p4:readingType>Direct Read</p4:readingType>
  607. <p6:marketContext/>
  608. <p1:oadrSamplingRate>
  609. <p1:oadrMinPeriod>PT1M</p1:oadrMinPeriod>
  610. <p1:oadrMaxPeriod>PT1M</p1:oadrMaxPeriod>
  611. <p1:oadrOnChange>false</p1:oadrOnChange>
  612. </p1:oadrSamplingRate>
  613. </p1:oadrReportDescription>
  614. <p4:reportRequestID>0</p4:reportRequestID>
  615. <p4:reportSpecifierID>DEMO_TELEMETRY_USAGE</p4:reportSpecifierID>
  616. <p4:reportName>METADATA_TELEMETRY_USAGE</p4:reportName>
  617. <p4:createdDateTime>2020-12-15T14:10:32Z</p4:createdDateTime>
  618. </p1:oadrReport>
  619. <p3:venID>ven_id</p3:venID>
  620. </p1:oadrRegisterReport>
  621. </p1:oadrSignedObject>
  622. </p1:oadrPayload>"""
  623. server = OpenADRServer(vtn_id='myvtn')
  624. await server.run()
  625. # Test with no configured callbacks
  626. from aiohttp import ClientSession
  627. async with ClientSession() as session:
  628. async with session.post("http://localhost:8080/OpenADR2/Simple/2.0b/EiReport",
  629. headers={'content-type': 'Application/XML'},
  630. data=msg.encode('utf-8')) as resp:
  631. assert resp.status == 200
  632. # Test with a working callback
  633. def report_callback(data):
  634. print(data)
  635. def working_on_register_report(ven_id, resource_id, measurement, unit, scale, min_sampling_interval, max_sampling_interval):
  636. return report_callback, min_sampling_interval
  637. server.add_handler('on_register_report', working_on_register_report)
  638. async with ClientSession() as session:
  639. async with session.post("http://localhost:8080/OpenADR2/Simple/2.0b/EiReport",
  640. headers={'content-type': 'Application/XML'},
  641. data=msg.encode('utf-8')) as resp:
  642. assert resp.status == 200
  643. # Test with a broken callback
  644. def broken_on_register_report(ven_id, resource_id, measurement, unit, scale, min_sampling_interval, max_sampling_interval):
  645. return "Hello There"
  646. server.add_handler('on_register_report', broken_on_register_report)
  647. async with ClientSession() as session:
  648. async with session.post("http://localhost:8080/OpenADR2/Simple/2.0b/EiReport",
  649. headers={'content-type': 'Application/XML'},
  650. data=msg.encode('utf-8')) as resp:
  651. assert resp.status == 200
  652. # assert "Your on_register_report handler must return a tuple; it returned 'Hello There' (str)." in caplog.messages
  653. # Test with a broken full callback
  654. def broken_on_register_report_full(report):
  655. return "Hello There Again"
  656. server.add_handler('on_register_report', broken_on_register_report_full)
  657. async with ClientSession() as session:
  658. async with session.post("http://localhost:8080/OpenADR2/Simple/2.0b/EiReport",
  659. headers={'content-type': 'Application/XML'},
  660. data=msg.encode('utf-8')) as resp:
  661. assert resp.status == 200
  662. assert f"Your on_register_report handler must return a list of tuples or None; it returned 'Hello There Again' (str)." in caplog.messages
  663. await server.stop()
  664. @pytest.mark.asyncio
  665. async def test_register_historic_report():
  666. client = OpenADRClient(ven_name='myven',
  667. vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  668. client.add_report(report_name='HISTORY_USAGE',
  669. callback=get_historic_data,
  670. measurement='voltage',
  671. resource_id='Device001',
  672. sampling_rate=timedelta(seconds=1))
  673. server = OpenADRServer(vtn_id='myvtn')
  674. server.add_handler('on_create_party_registration', on_create_party_registration)
  675. # server.add_handler('on_register_report', on_register_report_historic)
  676. await server.run()
  677. await client.run()
  678. assert len(server.registered_reports) == 1
  679. await client.stop()
  680. await server.stop()