test_utils.py 23 KB


  1. from openleadr import utils, objects
  2. from dataclasses import dataclass, asdict
  3. import pytest
  4. from datetime import datetime, timezone, timedelta
  5. from collections import deque
  6. @dataclass
  7. class dc:
  8. a: int = 2
  9. def test_hasmember():
  10. obj = {'a': 1}
  11. assert utils.hasmember(obj, 'a') == True
  12. assert utils.hasmember(obj, 'b') == False
  13. obj = dc()
  14. assert utils.hasmember(obj, 'a') == True
  15. assert utils.hasmember(obj, 'b') == False
  16. def test_getmember():
  17. obj = {'a': 1}
  18. assert utils.getmember(obj, 'a') == 1
  19. obj = dc()
  20. assert utils.getmember(obj, 'a') == 2
  21. def test_setmember():
  22. obj = {'a': 1}
  23. utils.setmember(obj, 'a', 10)
  24. assert utils.getmember(obj, 'a') == 10
  25. obj = dc()
  26. utils.setmember(obj, 'a', 10)
  27. assert utils.getmember(obj, 'a') == 10
  28. def test_setmember_nested():
  29. dc_parent = dc()
  30. dc_parent.a = dc()
  31. assert utils.getmember(utils.getmember(dc_parent, 'a'), 'a') == 2
  32. utils.setmember(utils.getmember(dc_parent, 'a'), 'a', 3)
  33. assert dc_parent.a.a == 3
  34. @pytest.mark.asyncio
  35. async def test_delayed_call_with_func():
  36. async def myfunc():
  37. pass
  38. await utils.delayed_call(myfunc, delay=0.1)
  39. @pytest.mark.asyncio
  40. async def test_delayed_call_with_coro():
  41. async def mycoro():
  42. pass
  43. await utils.delayed_call(mycoro(), delay=0.1)
  44. @pytest.mark.asyncio
  45. async def test_delayed_call_with_coro_func():
  46. async def mycoro():
  47. pass
  48. await utils.delayed_call(mycoro, delay=0.1)
  49. def test_determine_event_status_completed():
  50. active_period = {'dtstart': datetime.now(timezone.utc) - timedelta(seconds=10),
  51. 'duration': timedelta(seconds=5)}
  52. assert utils.determine_event_status(active_period) == 'completed'
  53. def test_determine_event_status_active():
  54. active_period = {'dtstart': datetime.now(timezone.utc) - timedelta(seconds=10),
  55. 'duration': timedelta(seconds=15)}
  56. assert utils.determine_event_status(active_period) == 'active'
  57. def test_determine_event_status_near():
  58. active_period = {'dtstart': datetime.now(timezone.utc) + timedelta(seconds=3),
  59. 'duration': timedelta(seconds=5),
  60. 'ramp_up_period': timedelta(seconds=5)}
  61. assert utils.determine_event_status(active_period) == 'near'
  62. def test_determine_event_status_far():
  63. active_period = {'dtstart': datetime.now(timezone.utc) + timedelta(seconds=10),
  64. 'duration': timedelta(seconds=5)}
  65. assert utils.determine_event_status(active_period) == 'far'
  66. def test_determine_event_status_far_with_ramp_up():
  67. active_period = {'dtstart': datetime.now(timezone.utc) + timedelta(seconds=10),
  68. 'duration': timedelta(seconds=5),
  69. 'ramp_up_period': timedelta(seconds=5)}
  70. assert utils.determine_event_status(active_period) == 'far'
  71. def test_get_active_period_from_intervals():
  72. now = datetime.now(timezone.utc)
  73. intervals=[{'dtstart': now,
  74. 'duration': timedelta(seconds=5)},
  75. {'dtstart': now + timedelta(seconds=5),
  76. 'duration': timedelta(seconds=5)}]
  77. assert utils.get_active_period_from_intervals(intervals) == {'dtstart': now,
  78. 'duration': timedelta(seconds=10)}
  79. intervals=[objects.Interval(dtstart=now,
  80. duration=timedelta(seconds=5),
  81. signal_payload=1),
  82. objects.Interval(dtstart=now + timedelta(seconds=5),
  83. duration=timedelta(seconds=5),
  84. signal_payload=2)]
  85. assert utils.get_active_period_from_intervals(intervals) == {'dtstart': now,
  86. 'duration': timedelta(seconds=10)}
  87. assert utils.get_active_period_from_intervals(intervals, False) == objects.ActivePeriod(dtstart=now,
  88. duration=timedelta(seconds=10))
  89. def test_cron_config():
  90. assert utils.cron_config(timedelta(seconds=5)) == {'second': '*/5', 'minute': '*', 'hour': '*'}
  91. assert utils.cron_config(timedelta(minutes=1)) == {'second': '0', 'minute': '*/1', 'hour': '*'}
  92. assert utils.cron_config(timedelta(minutes=5)) == {'second': '0', 'minute': '*/5', 'hour': '*'}
  93. assert utils.cron_config(timedelta(hours=1)) == {'second': '0', 'minute': '0', 'hour': '*/1'}
  94. assert utils.cron_config(timedelta(hours=2)) == {'second': '0', 'minute': '0', 'hour': '*/2'}
  95. assert utils.cron_config(timedelta(hours=25)) == {'second': '0', 'minute': '0', 'hour': '0'}
  96. assert utils.cron_config(timedelta(seconds=10), randomize_seconds=True) == {'second': '*/10',
  97. 'minute': '*',
  98. 'hour': '*',
  99. 'jitter': 1}
  100. def test_get_event_from_deque():
  101. d = deque()
  102. now = datetime.now(timezone.utc)
  103. event1 = objects.Event(event_descriptor=objects.EventDescriptor(event_id='event123',
  104. event_status='far',
  105. modification_number='1',
  106. market_context='http://marketcontext01'),
  107. event_signals=[objects.EventSignal(signal_name='simple',
  108. signal_type='level',
  109. signal_id=utils.generate_id(),
  110. intervals=[objects.Interval(dtstart=now,
  111. duration=timedelta(minutes=10),
  112. signal_payload=1)])],
  113. targets=[{'ven_id': 'ven123'}])
  114. msg_one = {'message': 'one'}
  115. msg_two = {'message': 'two'}
  116. msg_three = {'message': 'three'}
  117. event2 = objects.Event(event_descriptor=objects.EventDescriptor(event_id='event123',
  118. event_status='far',
  119. modification_number='1',
  120. market_context='http://marketcontext01'),
  121. event_signals=[objects.EventSignal(signal_name='simple',
  122. signal_type='level',
  123. signal_id=utils.generate_id(),
  124. intervals=[objects.Interval(dtstart=now,
  125. duration=timedelta(minutes=10),
  126. signal_payload=1)])],
  127. targets=[{'ven_id': 'ven123'}])
  128. d.append(event1)
  129. d.append(msg_one)
  130. d.append(msg_two)
  131. d.append(msg_three)
  132. d.append(event2)
  133. assert utils.get_next_event_from_deque(d) is event1
  134. assert utils.get_next_event_from_deque(d) is event2
  135. assert utils.get_next_event_from_deque(d) is None
  136. assert utils.get_next_event_from_deque(d) is None
  137. assert len(d) == 3
  138. assert d.popleft() is msg_one
  139. assert d.popleft() is msg_two
  140. assert d.popleft() is msg_three
  141. assert len(d) == 0
  142. assert utils.get_next_event_from_deque(d) is None
  143. def test_validate_report_measurement_dict_missing_items(caplog):
  144. measurement = {'name': 'rainbows'}
  145. with pytest.raises(ValueError) as err:
  146. utils.validate_report_measurement_dict(measurement)
  147. assert str(err.value) == (f"The measurement dict must contain the following keys: "
  148. "'name', 'description', 'unit'. Please correct this.")
  149. def test_validate_report_measurement_dict_invalid_name(caplog):
  150. measurement = {'name': 'rainbows',
  151. 'unit': 'B',
  152. 'description': 'Rainbows'}
  153. utils.validate_report_measurement_dict(measurement)
  154. assert measurement['name'] == 'customUnit'
  155. assert (f"You provided a measurement with an unknown name rainbows. "
  156. "This was corrected to 'customUnit'. Please correct this in your "
  157. "report definition.") in caplog.messages
  158. def test_validate_report_measurement_dict_invalid_unit():
  159. with pytest.raises(ValueError) as err:
  160. measurement = {'name': 'current',
  161. 'unit': 'B',
  162. 'description': 'Current'}
  163. utils.validate_report_measurement_dict(measurement)
  164. assert str(err.value) == (f"The unit 'B' is not acceptable for measurement 'current'. Allowed "
  165. f"units are: 'A'.")
  166. def test_validate_report_measurement_dict_invalid_description(caplog):
  167. with pytest.raises(ValueError) as err:
  168. measurement = {'name': 'current',
  169. 'unit': 'A',
  170. 'description': 'something'}
  171. utils.validate_report_measurement_dict(measurement)
  172. str(err.value) == (f"The measurement's description 'something' "
  173. f"did not match the expected description for this type "
  174. f" ('Current'). Please correct this, or use "
  175. "'customUnit' as the name.")
  176. def test_validate_report_measurement_dict_invalid_description_case(caplog):
  177. measurement = {'name': 'current',
  178. 'unit': 'A',
  179. 'description': 'CURRENT'}
  180. utils.validate_report_measurement_dict(measurement)
  181. assert measurement['description'] == 'Current'
  182. assert (f"The description for the measurement with name 'current' "
  183. f"was not in the correct case; you provided 'CURRENT' but "
  184. f"it should be 'Current'. "
  185. "This was automatically corrected.") in caplog.messages
  186. def test_validate_report_measurement_dict_missing_power_attributes(caplog):
  187. with pytest.raises(ValueError) as err:
  188. measurement = {'name': 'powerReal',
  189. 'description': 'RealPower',
  190. 'unit': 'W'}
  191. utils.validate_report_measurement_dict(measurement)
  192. assert str(err.value) == ("A 'power' related measurement must contain a "
  193. "'power_attributes' section that contains the following "
  194. "keys: 'voltage' (int), 'ac' (boolean), 'hertz' (int)")
  195. def test_validate_report_measurement_dict_invalid_power_attributes(caplog):
  196. with pytest.raises(ValueError) as err:
  197. measurement = {'name': 'powerReal',
  198. 'description': 'RealPower',
  199. 'unit': 'W',
  200. 'power_attributes': {'a': 123}}
  201. utils.validate_report_measurement_dict(measurement)
  202. assert str(err.value) == ("The power_attributes of the measurement must contain the "
  203. "following keys: 'voltage' (int), 'ac' (bool), 'hertz' (int).")
  204. def test_ungroup_target_by_type_with_single_str():
  205. targets_by_type = {'ven_id': 'ven123'}
  206. targets = utils.ungroup_targets_by_type(targets_by_type)
  207. assert targets == [{'ven_id': 'ven123'}]
  208. def test_find_by_with_dict():
  209. search_dict = {'one': {'a': 123, 'b': 456},
  210. 'two': {'a': 321, 'b': 654}}
  211. result = utils.find_by(search_dict, 'a', 123)
  212. assert result == {'a': 123, 'b': 456}
  213. def test_find_by_with_missing_member():
  214. search_list = [{'a': 123, 'b': 456},
  215. {'a': 321, 'b': 654, 'c': 1000}]
  216. result = utils.find_by(search_list, 'c', 1000)
  217. assert result == {'a': 321, 'b': 654, 'c': 1000}
  218. def test_ensure_str():
  219. assert utils.ensure_str("Hello") == "Hello"
  220. assert utils.ensure_str(b"Hello") == "Hello"
  221. assert utils.ensure_str(None) is None
  222. with pytest.raises(TypeError) as err:
  223. utils.ensure_str(1)
  224. assert str(err.value) == "Must be bytes or str"
  225. def test_ensure_bytes():
  226. assert utils.ensure_bytes("Hello") == b"Hello"
  227. assert utils.ensure_bytes(b"Hello") == b"Hello"
  228. assert utils.ensure_bytes(None) is None
  229. with pytest.raises(TypeError) as err:
  230. utils.ensure_bytes(1)
  231. assert str(err.value) == "Must be bytes or str"
  232. def test_booleanformat():
  233. assert utils.booleanformat("true") == "true"
  234. assert utils.booleanformat("false") == "false"
  235. assert utils.booleanformat(True) == "true"
  236. assert utils.booleanformat(False) == "false"
  237. with pytest.raises(ValueError) as err:
  238. assert utils.booleanformat(123)
  239. assert str(err.value) == "A boolean value must be provided, not 123."
  240. def test_parse_duration():
  241. assert utils.parse_duration("PT1M") == timedelta(minutes=1)
  242. assert utils.parse_duration("PT1M5S") == timedelta(minutes=1, seconds=5)
  243. assert utils.parse_duration("PT1H5M10S") == timedelta(hours=1, minutes=5, seconds=10)
  244. assert utils.parse_duration("P1DT1H5M10S") == timedelta(days=1, hours=1, minutes=5, seconds=10)
  245. assert utils.parse_duration("P1M") == timedelta(days=30)
  246. assert utils.parse_duration("-P1M") == timedelta(days=-30)
  247. assert utils.parse_duration("2W") == timedelta(days=14)
  248. with pytest.raises(ValueError) as err:
  249. utils.parse_duration("Hello")
  250. assert str(err.value) == f"The duration 'Hello' did not match the requested format"
  251. def test_parse_datetime():
  252. assert utils.parse_datetime("2020-12-15T11:29:34Z") == datetime(2020, 12, 15, 11, 29, 34, tzinfo=timezone.utc)
  253. assert utils.parse_datetime("2020-12-15T11:29:34.123456Z") == datetime(2020, 12, 15, 11, 29, 34, 123456, tzinfo=timezone.utc)
  254. assert utils.parse_datetime("2020-12-15T11:29:34.123Z") == datetime(2020, 12, 15, 11, 29, 34, 123000, tzinfo=timezone.utc)
  255. assert utils.parse_datetime("2020-12-15T11:29:34.123456789Z") == datetime(2020, 12, 15, 11, 29, 34, 123456, tzinfo=timezone.utc)
  256. @pytest.mark.asyncio
  257. async def test_await_if_required():
  258. def normal_func():
  259. return 123
  260. async def coro_func():
  261. return 456
  262. result = await utils.await_if_required(normal_func())
  263. assert result == 123
  264. result = await utils.await_if_required(coro_func())
  265. assert result == 456
  266. result = await utils.await_if_required(None)
  267. assert result == None
  268. @pytest.mark.asyncio
  269. async def test_gather_if_required():
  270. def normal_func():
  271. return 123
  272. async def coro_func():
  273. return 456
  274. raw_results = [normal_func(), normal_func(), normal_func()]
  275. results = await utils.gather_if_required(raw_results)
  276. assert results == [123, 123, 123]
  277. raw_results = [coro_func(), coro_func(), coro_func()]
  278. results = await utils.gather_if_required(raw_results)
  279. assert results == [456, 456, 456]
  280. raw_results = [coro_func(), normal_func(), None]
  281. results = await utils.gather_if_required(raw_results)
  282. assert results == [456, 123, None]
  283. raw_results = []
  284. results = await utils.gather_if_required(raw_results)
  285. assert results == []
  286. def test_order_events():
  287. now = datetime.now(timezone.utc)
  288. event_1_active_high_prio = objects.Event(event_descriptor=objects.EventDescriptor(event_id='event001',
  289. modification_number=0,
  290. created_date_time=now,
  291. event_status='far',
  292. priority=1,
  293. market_context='http://context01'),
  294. active_period=objects.ActivePeriod(dtstart=now - timedelta(minutes=5),
  295. duration=timedelta(minutes=10)),
  296. event_signals=[objects.EventSignal(intervals=[objects.Interval(dtstart=now,
  297. duration=timedelta(minutes=10),
  298. signal_payload=1)],
  299. signal_name='simple',
  300. signal_type='level',
  301. signal_id='signal001')],
  302. targets=[{'ven_id': 'ven001'}])
  303. event_2_active_low_prio = objects.Event(event_descriptor=objects.EventDescriptor(event_id='event001',
  304. modification_number=0,
  305. created_date_time=now,
  306. event_status='far',
  307. priority=2,
  308. market_context='http://context01'),
  309. active_period=objects.ActivePeriod(dtstart=now - timedelta(minutes=5),
  310. duration=timedelta(minutes=10)),
  311. event_signals=[objects.EventSignal(intervals=[objects.Interval(dtstart=now,
  312. duration=timedelta(minutes=10),
  313. signal_payload=1)],
  314. signal_name='simple',
  315. signal_type='level',
  316. signal_id='signal001')],
  317. targets=[{'ven_id': 'ven001'}])
  318. event_3_active_no_prio = objects.Event(event_descriptor=objects.EventDescriptor(event_id='event001',
  319. modification_number=0,
  320. created_date_time=now,
  321. event_status='far',
  322. market_context='http://context01'),
  323. active_period=objects.ActivePeriod(dtstart=now - timedelta(minutes=5),
  324. duration=timedelta(minutes=10)),
  325. event_signals=[objects.EventSignal(intervals=[objects.Interval(dtstart=now,
  326. duration=timedelta(minutes=10),
  327. signal_payload=1)],
  328. signal_name='simple',
  329. signal_type='level',
  330. signal_id='signal001')],
  331. targets=[{'ven_id': 'ven001'}])
  332. event_4_far_early = objects.Event(event_descriptor=objects.EventDescriptor(event_id='event001',
  333. modification_number=0,
  334. created_date_time=now,
  335. event_status='far',
  336. market_context='http://context01'),
  337. active_period=objects.ActivePeriod(dtstart=now + timedelta(minutes=5),
  338. duration=timedelta(minutes=10)),
  339. event_signals=[objects.EventSignal(intervals=[objects.Interval(dtstart=now,
  340. duration=timedelta(minutes=10),
  341. signal_payload=1)],
  342. signal_name='simple',
  343. signal_type='level',
  344. signal_id='signal001')],
  345. targets=[{'ven_id': 'ven001'}])
  346. event_5_far_later = objects.Event(event_descriptor=objects.EventDescriptor(event_id='event001',
  347. modification_number=0,
  348. created_date_time=now,
  349. event_status='far',
  350. market_context='http://context01'),
  351. active_period=objects.ActivePeriod(dtstart=now + timedelta(minutes=10),
  352. duration=timedelta(minutes=10)),
  353. event_signals=[objects.EventSignal(intervals=[objects.Interval(dtstart=now,
  354. duration=timedelta(minutes=10),
  355. signal_payload=1)],
  356. signal_name='simple',
  357. signal_type='level',
  358. signal_id='signal001')],
  359. targets=[{'ven_id': 'ven001'}])
  360. events = [event_5_far_later, event_4_far_early, event_3_active_no_prio, event_2_active_low_prio, event_1_active_high_prio]
  361. ordered_events = utils.order_events(events)
  362. assert ordered_events == [event_1_active_high_prio, event_2_active_low_prio, event_3_active_no_prio, event_4_far_early, event_5_far_later]
  363. ordered_events = utils.order_events(event_1_active_high_prio)
  364. assert ordered_events == [event_1_active_high_prio]
  365. event_1_as_dict = asdict(event_1_active_high_prio)
  366. ordered_events = utils.order_events(event_1_as_dict)
  367. assert ordered_events == [event_1_as_dict]