test_reports.py 19 KB


  1. from openleadr import OpenADRClient, OpenADRServer, enable_default_logging
  2. import asyncio
  3. import pytest
  4. from datetime import datetime, timedelta
  5. from functools import partial
  6. import logging
  7. from random import random
  8. import time
  9. from openleadr.messaging import create_message
  10. loop = asyncio.get_event_loop()
  11. loop.set_debug(True)
  12. enable_default_logging()
  13. async def collect_data(future=None):
  14. print("Collect Data")
  15. value = 100 * random()
  16. if future:
  17. future.set_result(value)
  18. return value
  19. async def lookup_ven(ven_name=None, ven_id=None):
  20. """
  21. Look up a ven by its name or ID
  22. """
  23. return {'ven_id': '1234'}
  24. async def receive_data(data, future=None):
  25. if future:
  26. future.set_result(data)
  27. pass
  28. async def on_update_report(report, futures=None):
  29. if futures:
  30. for future in futures:
  31. if future.done() is False:
  32. future.set_result(report)
  33. break
  34. pass
  35. async def on_register_report(ven_id, resource_id, measurement, unit, scale,
  36. min_sampling_interval, max_sampling_interval, bundling=1, futures=None, receive_futures=None):
  37. """
  38. Deal with this report.
  39. """
  40. print(f"Called on register report {ven_id}, {resource_id}, {measurement}, {unit}, {scale}, {min_sampling_interval}, {max_sampling_interval}")
  41. if futures:
  42. futures.pop(0).set_result(True)
  43. if receive_futures:
  44. callback = partial(receive_data, future=receive_futures.pop(0))
  45. else:
  46. callback = receive_data
  47. if bundling > 1:
  48. print(f"Returning from on register report {callback}, {min_sampling_interval}, {bundling * min_sampling_interval}")
  49. return callback, min_sampling_interval, bundling * min_sampling_interval
  50. print(f"Returning from on register report {callback}, {min_sampling_interval}")
  51. return callback, min_sampling_interval
  52. async def on_register_report_full(report, futures=None):
  53. """
  54. Deal with this report.
  55. """
  56. if futures:
  57. futures.pop().set_result(True)
  58. granularity = min(*[rd['sampling_rate']['min_period'] for rd in report['report_descriptions']])
  59. report_requests = [(rd['r_id'], on_update_report, granularity) for rd in report['report_descriptions'] if report['report_name'] == 'METADATA_TELEMETRY_USAGE']
  60. return report_requests
  61. async def on_create_party_registration(ven_name, future=None):
  62. if future:
  63. future.set_result(True)
  64. ven_id = '1234'
  65. registration_id = 'abcd'
  66. return ven_id, registration_id
  67. @pytest.mark.asyncio
  68. async def test_report_registration():
  69. """
  70. Test the registration of two reports with two r_ids each.
  71. """
  72. # Create a server
  73. logger = logging.getLogger('openleadr')
  74. logger.setLevel(logging.DEBUG)
  75. server = OpenADRServer(vtn_id='testvtn')
  76. server.add_handler('on_register_report', on_register_report)
  77. server.add_handler('on_create_party_registration', on_create_party_registration)
  78. # Create a client
  79. client = OpenADRClient(ven_name='myven', vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b',)
  80. # Add 4 reports
  81. client.add_report(callback=collect_data,
  82. report_specifier_id='CurrentReport',
  83. resource_id='Device001',
  84. measurement='current',
  85. unit='A')
  86. client.add_report(callback=collect_data,
  87. report_specifier_id='CurrentReport',
  88. resource_id='Device002',
  89. measurement='current',
  90. unit='A')
  91. client.add_report(callback=collect_data,
  92. report_specifier_id='VoltageReport',
  93. resource_id='Device001',
  94. measurement='voltage',
  95. unit='V')
  96. client.add_report(callback=collect_data,
  97. report_specifier_id='VoltageReport',
  98. resource_id='Device002',
  99. measurement='voltage',
  100. unit='V')
  101. asyncio.create_task(server.run_async())
  102. await asyncio.sleep(1)
  103. # Register the client
  104. await client.create_party_registration()
  105. # Register the reports
  106. await client.register_reports(client.reports)
  107. assert len(client.report_requests) == 2
  108. assert len(server.services['report_service'].report_callbacks) == 4
  109. await client.stop()
  110. await server.stop()
  111. async def collect_status():
  112. return 1
  113. @pytest.mark.asyncio
  114. async def test_report_registration_with_status_report():
  115. """
  116. Test the registration of two reports with two r_ids each.
  117. """
  118. # Create a server
  119. logger = logging.getLogger('openleadr')
  120. logger.setLevel(logging.DEBUG)
  121. server = OpenADRServer(vtn_id='testvtn')
  122. server.add_handler('on_register_report', on_register_report)
  123. server.add_handler('on_create_party_registration', on_create_party_registration)
  124. # Create a client
  125. client = OpenADRClient(ven_name='myven', vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b',)
  126. # Add 4 reports
  127. client.add_report(callback=collect_data,
  128. report_specifier_id='CurrentReport',
  129. resource_id='Device001',
  130. measurement='current',
  131. unit='A')
  132. client.add_report(callback=collect_data,
  133. report_specifier_id='CurrentReport',
  134. resource_id='Device002',
  135. measurement='current',
  136. unit='A')
  137. client.add_report(callback=collect_data,
  138. report_specifier_id='VoltageReport',
  139. resource_id='Device001',
  140. measurement='voltage',
  141. unit='V')
  142. client.add_report(callback=collect_data,
  143. report_specifier_id='VoltageReport',
  144. resource_id='Device002',
  145. measurement='voltage',
  146. unit='V')
  147. client.add_report(callback=collect_status,
  148. report_name='TELEMETRY_STATUS',
  149. report_specifier_id='StatusReport',
  150. resource_id='Device001')
  151. asyncio.create_task(server.run_async())
  152. await asyncio.sleep(1)
  153. # Register the client
  154. await client.create_party_registration()
  155. # Register the reports
  156. await client.register_reports(client.reports)
  157. assert len(client.report_requests) == 3
  158. assert len(server.services['report_service'].report_callbacks) == 5
  159. await client.stop()
  160. await server.stop()
  161. @pytest.mark.asyncio
  162. async def test_report_registration_full():
  163. """
  164. Test the registration of two reports with two r_ids each.
  165. """
  166. # Create a server
  167. logger = logging.getLogger('openleadr')
  168. logger.setLevel(logging.DEBUG)
  169. server = OpenADRServer(vtn_id='testvtn')
  170. server.add_handler('on_register_report', on_register_report_full)
  171. server.add_handler('on_create_party_registration', on_create_party_registration)
  172. # Create a client
  173. client = OpenADRClient(ven_name='myven', vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  174. # Add 4 reports
  175. client.add_report(callback=collect_data,
  176. report_specifier_id='PowerReport',
  177. resource_id='Device001',
  178. measurement='power_real',
  179. unit='W')
  180. client.add_report(callback=collect_data,
  181. report_specifier_id='PowerReport',
  182. resource_id='Device002',
  183. measurement='power_real',
  184. unit='W')
  185. client.add_report(callback=collect_data,
  186. report_specifier_id='VoltageReport',
  187. resource_id='Device001',
  188. measurement='voltage',
  189. unit='V')
  190. client.add_report(callback=collect_data,
  191. report_specifier_id='VoltageReport',
  192. resource_id='Device002',
  193. measurement='voltage',
  194. unit='V')
  195. await server.run_async()
  196. await asyncio.sleep(0.1)
  197. # Register the client
  198. await client.create_party_registration()
  199. # Register the reports
  200. await client.register_reports(client.reports)
  201. assert len(client.report_requests) == 2
  202. assert len(server.services['report_service'].report_callbacks) == 4
  203. await client.stop()
  204. await server.stop()
  205. @pytest.mark.asyncio
  206. async def test_update_reports():
  207. """
  208. Tests the timely delivery of requested reports
  209. """
  210. # Create a server
  211. logger = logging.getLogger('openleadr')
  212. logger.setLevel(logging.DEBUG)
  213. loop = asyncio.get_event_loop()
  214. server = OpenADRServer(vtn_id='testvtn')
  215. register_report_future_1 = loop.create_future()
  216. register_report_future_2 = loop.create_future()
  217. register_report_futures = [register_report_future_1, register_report_future_2]
  218. receive_report_future_1 = loop.create_future()
  219. receive_report_future_2 = loop.create_future()
  220. receive_report_future_3 = loop.create_future()
  221. receive_report_future_4 = loop.create_future()
  222. receive_report_futures = [receive_report_future_1, receive_report_future_2, receive_report_future_3, receive_report_future_4]
  223. server.add_handler('on_register_report', partial(on_register_report, futures=register_report_futures, receive_futures=receive_report_futures))
  224. party_future = loop.create_future()
  225. server.add_handler('on_create_party_registration', partial(on_create_party_registration, future=party_future))
  226. # Create a client
  227. client = OpenADRClient(ven_name='myven', vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  228. # Add 4 reports
  229. future_1 = loop.create_future()
  230. client.add_report(callback=partial(collect_data, future=future_1),
  231. report_specifier_id='PowerReport',
  232. resource_id='Device001',
  233. measurement='power_real',
  234. sampling_rate=timedelta(seconds=2),
  235. unit='W')
  236. future_2 = loop.create_future()
  237. client.add_report(callback=partial(collect_data, future=future_2),
  238. report_specifier_id='PowerReport',
  239. resource_id='Device002',
  240. measurement='power_real',
  241. sampling_rate=timedelta(seconds=2),
  242. unit='W')
  243. future_3 = loop.create_future()
  244. client.add_report(callback=partial(collect_data, future=future_3),
  245. report_specifier_id='VoltageReport',
  246. resource_id='Device001',
  247. measurement='voltage',
  248. sampling_rate=timedelta(seconds=2),
  249. unit='V')
  250. future_4 = loop.create_future()
  251. client.add_report(callback=partial(collect_data, future=future_4),
  252. report_specifier_id='VoltageReport',
  253. resource_id='Device002',
  254. measurement='voltage',
  255. sampling_rate=timedelta(seconds=2),
  256. unit='V')
  257. assert len(client.reports) == 2
  258. asyncio.create_task(server.run_async())
  259. await asyncio.sleep(1)
  260. # Run the client asynchronously
  261. print("Running the client")
  262. asyncio.create_task(client.run())
  263. print("Awaiting party future")
  264. await party_future
  265. print("Awaiting report futures")
  266. await asyncio.gather(register_report_future_1, register_report_future_2)
  267. await asyncio.sleep(0.1)
  268. assert len(server.services['report_service'].report_callbacks) == 4
  269. print("Awaiting data collection futures")
  270. await future_1
  271. await future_2
  272. await future_3
  273. await future_4
  274. print("Awaiting update report futures")
  275. await asyncio.gather(receive_report_future_1, receive_report_future_2, receive_report_future_3, receive_report_future_4)
  276. print("Done gathering")
  277. assert receive_report_future_1.result()[0][1] == future_1.result()
  278. assert receive_report_future_2.result()[0][1] == future_2.result()
  279. assert receive_report_future_3.result()[0][1] == future_3.result()
  280. assert receive_report_future_4.result()[0][1] == future_4.result()
  281. await client.stop()
  282. await server.stop()
  283. async def get_historic_data(date_from, date_to):
  284. pass
  285. async def collect_data_multi(futures=None):
  286. print("Data Collected")
  287. if futures:
  288. for i, future in enumerate(futures):
  289. if future.done() is False:
  290. print(f"Marking future {i} as done")
  291. future.set_result(True)
  292. break
  293. return 3.14
  294. @pytest.mark.asyncio
  295. async def test_incremental_reports():
  296. loop = asyncio.get_event_loop()
  297. client = OpenADRClient(ven_name='myven', vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  298. collect_futures = [loop.create_future() for i in range(2)]
  299. client.add_report(callback=partial(collect_data_multi, futures=collect_futures),
  300. report_specifier_id='myhistory',
  301. measurement='voltage',
  302. resource_id='mydevice',
  303. sampling_rate=timedelta(seconds=2))
  304. server = OpenADRServer(vtn_id='myvtn')
  305. register_report_future = loop.create_future()
  306. update_report_future = loop.create_future()
  307. server.add_handler('on_register_report', partial(on_register_report,
  308. bundling=2,
  309. futures=[register_report_future],
  310. receive_futures=[update_report_future]))
  311. party_future = loop.create_future()
  312. server.add_handler('on_create_party_registration',
  313. partial(on_create_party_registration, future=party_future))
  314. loop.create_task(server.run_async())
  315. await asyncio.sleep(1)
  316. await client.run()
  317. print("Awaiting party future")
  318. await party_future
  319. print("Awaiting register report future")
  320. await register_report_future
  321. print("Awaiting first data collection future... ", end="")
  322. await collect_futures[0]
  323. print("check")
  324. await asyncio.sleep(1)
  325. print("Checking that the report was not yet sent... ", end="")
  326. assert update_report_future.done() is False
  327. print("check")
  328. print("Awaiting data collection second future... ", end="")
  329. await collect_futures[1]
  330. print("check")
  331. print("Awaiting report update future")
  332. result = await update_report_future
  333. assert len(result) == 2
  334. await server.stop()
  335. await client.stop()
  336. await asyncio.sleep(0)
  337. async def collect_data_history(date_from, date_to, sampling_interval, futures):
  338. data = [(date_from, 1.0), (date_to, 2.0)]
  339. if futures:
  340. for future in futures:
  341. if future.done() is False:
  342. future.set_result(data)
  343. break
  344. return data
  345. @pytest.mark.asyncio
  346. async def test_update_report_data_collection_mode_full():
  347. loop = asyncio.get_event_loop()
  348. client = OpenADRClient(ven_name='myven', vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  349. data_collection_future = loop.create_future()
  350. client.add_report(callback=partial(collect_data_history, futures=[data_collection_future]),
  351. resource_id='Device001',
  352. measurement='power_real',
  353. data_collection_mode='full',
  354. sampling_rate=timedelta(seconds=1),
  355. unit='W')
  356. report_register_future = loop.create_future()
  357. report_received_future = loop.create_future()
  358. party_registration_future = loop.create_future()
  359. server = OpenADRServer(vtn_id='myvtn')
  360. server.add_handler('on_create_party_registration', partial(on_create_party_registration, future=party_registration_future))
  361. server.add_handler('on_register_report', partial(on_register_report,
  362. bundling=2,
  363. futures=[report_register_future],
  364. receive_futures=[report_received_future]))
  365. await server.run_async()
  366. await asyncio.sleep(0.1)
  367. print(f"The time is now {datetime.now()}")
  368. t = time.time()
  369. wait_for = int(t/2) * 2 + 2 - t
  370. await asyncio.sleep(wait_for)
  371. print(f"The time is now {datetime.now()}, running client")
  372. await client.run()
  373. await party_registration_future
  374. await report_register_future
  375. await asyncio.sleep(1)
  376. print(f"The time is now {datetime.now()}, checking if report was triggered")
  377. assert data_collection_future.done() is False
  378. print("Waiting for the data collection to occur")
  379. await data_collection_future
  380. print("Waiting for the report to be received")
  381. await report_received_future
  382. print("Done")
  383. await server.stop()
  384. await client.stop()
  385. def test_add_report_invalid_unit(caplog):
  386. client = OpenADRClient(ven_name='myven', vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  387. client.add_report(callback=print,
  388. report_specifier_id='myreport',
  389. measurement='voltage',
  390. resource_id='mydevice',
  391. sampling_rate=timedelta(seconds=10),
  392. unit='A')
  393. assert caplog.record_tuples == [("openleadr", logging.WARNING, f"The supplied unit A for measurement voltage will be ignored, V will be used instead. Allowed units for this measurement are: V")]
  394. def test_add_report_invalid_scale():
  395. client = OpenADRClient(ven_name='myven', vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  396. with pytest.raises(ValueError):
  397. client.add_report(callback=print,
  398. report_specifier_id='myreport',
  399. measurement='power_real',
  400. resource_id='mydevice',
  401. sampling_rate=timedelta(seconds=10),
  402. unit='W',
  403. scale='xxx')
  404. def test_add_report_invalid_description(caplog):
  405. client = OpenADRClient(ven_name='myven', vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  406. client.add_report(callback=print,
  407. report_specifier_id='myreport',
  408. measurement={'name': 'voltage', 'description': 'SomethingWrong', 'unit': 'V'},
  409. resource_id='mydevice',
  410. sampling_rate=timedelta(seconds=10))
  411. msg = create_message('oadrRegisterReport', reports=client.reports)
  412. def test_add_report_invalid_description(caplog):
  413. client = OpenADRClient(ven_name='myven', vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  414. with pytest.raises(ValueError):
  415. client.add_report(callback=print,
  416. report_specifier_id='myreport',
  417. measurement={'name': 'voltage', 'description': 'SomethingWrong', 'unit': 'V'},
  418. resource_id='mydevice',
  419. sampling_rate=timedelta(seconds=10))
  420. def test_add_report_non_standard_measurement():
  421. client = OpenADRClient(ven_name='myven', vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
  422. client.add_report(callback=print,
  423. report_specifier_id='myreport',
  424. measurement='rainbows',
  425. resource_id='mydevice',
  426. sampling_rate=timedelta(seconds=10),
  427. unit='A')
  428. assert len(client.reports) == 1
  429. assert client.reports[0].report_descriptions[0].measurement.name == 'customUnit'
  430. assert client.reports[0].report_descriptions[0].measurement.description == 'rainbows'
  431. if __name__ == "__main__":
  432. asyncio.run(test_update_reports())