test_utils.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423
  1. from openleadr import utils, objects
  2. from dataclasses import dataclass, asdict
  3. import pytest
  4. from datetime import datetime, timezone, timedelta
  5. from collections import deque
  6. @dataclass
  7. class dc:
  8. a: int = 2
  9. def test_hasmember():
  10. obj = {'a': 1}
  11. assert utils.hasmember(obj, 'a') == True
  12. assert utils.hasmember(obj, 'b') == False
  13. obj = dc()
  14. assert utils.hasmember(obj, 'a') == True
  15. assert utils.hasmember(obj, 'b') == False
  16. def test_getmember():
  17. obj = {'a': 1}
  18. assert utils.getmember(obj, 'a') == 1
  19. obj = dc()
  20. assert utils.getmember(obj, 'a') == 2
  21. def test_setmember():
  22. obj = {'a': 1}
  23. utils.setmember(obj, 'a', 10)
  24. assert utils.getmember(obj, 'a') == 10
  25. obj = dc()
  26. utils.setmember(obj, 'a', 10)
  27. assert utils.getmember(obj, 'a') == 10
  28. @pytest.mark.asyncio
  29. async def test_delayed_call_with_func():
  30. async def myfunc():
  31. pass
  32. await utils.delayed_call(myfunc, delay=0.1)
  33. @pytest.mark.asyncio
  34. async def test_delayed_call_with_coro():
  35. async def mycoro():
  36. pass
  37. await utils.delayed_call(mycoro(), delay=0.1)
  38. @pytest.mark.asyncio
  39. async def test_delayed_call_with_coro_func():
  40. async def mycoro():
  41. pass
  42. await utils.delayed_call(mycoro, delay=0.1)
  43. def test_determine_event_status_completed():
  44. active_period = {'dtstart': datetime.now(timezone.utc) - timedelta(seconds=10),
  45. 'duration': timedelta(seconds=5)}
  46. assert utils.determine_event_status(active_period) == 'completed'
  47. def test_determine_event_status_active():
  48. active_period = {'dtstart': datetime.now(timezone.utc) - timedelta(seconds=10),
  49. 'duration': timedelta(seconds=15)}
  50. assert utils.determine_event_status(active_period) == 'active'
  51. def test_determine_event_status_near():
  52. active_period = {'dtstart': datetime.now(timezone.utc) + timedelta(seconds=3),
  53. 'duration': timedelta(seconds=5),
  54. 'ramp_up_duration': timedelta(seconds=5)}
  55. assert utils.determine_event_status(active_period) == 'near'
  56. def test_determine_event_status_far():
  57. active_period = {'dtstart': datetime.now(timezone.utc) + timedelta(seconds=10),
  58. 'duration': timedelta(seconds=5)}
  59. assert utils.determine_event_status(active_period) == 'far'
  60. def test_determine_event_status_far_with_ramp_up():
  61. active_period = {'dtstart': datetime.now(timezone.utc) + timedelta(seconds=10),
  62. 'duration': timedelta(seconds=5),
  63. 'ramp_up_duration': timedelta(seconds=5)}
  64. assert utils.determine_event_status(active_period) == 'far'
  65. def test_get_active_period_from_intervals():
  66. now = datetime.now(timezone.utc)
  67. intervals=[{'dtstart': now,
  68. 'duration': timedelta(seconds=5)},
  69. {'dtstart': now + timedelta(seconds=5),
  70. 'duration': timedelta(seconds=5)}]
  71. assert utils.get_active_period_from_intervals(intervals) == {'dtstart': now,
  72. 'duration': timedelta(seconds=10)}
  73. intervals=[objects.Interval(dtstart=now,
  74. duration=timedelta(seconds=5),
  75. signal_payload=1),
  76. objects.Interval(dtstart=now + timedelta(seconds=5),
  77. duration=timedelta(seconds=5),
  78. signal_payload=2)]
  79. assert utils.get_active_period_from_intervals(intervals) == {'dtstart': now,
  80. 'duration': timedelta(seconds=10)}
  81. assert utils.get_active_period_from_intervals(intervals, False) == objects.ActivePeriod(dtstart=now,
  82. duration=timedelta(seconds=10))
  83. def test_cron_config():
  84. assert utils.cron_config(timedelta(seconds=5)) == {'second': '*/5', 'minute': '*', 'hour': '*'}
  85. assert utils.cron_config(timedelta(minutes=1)) == {'second': '0', 'minute': '*/1', 'hour': '*'}
  86. assert utils.cron_config(timedelta(minutes=5)) == {'second': '0', 'minute': '*/5', 'hour': '*'}
  87. assert utils.cron_config(timedelta(hours=1)) == {'second': '0', 'minute': '0', 'hour': '*/1'}
  88. assert utils.cron_config(timedelta(hours=2)) == {'second': '0', 'minute': '0', 'hour': '*/2'}
  89. assert utils.cron_config(timedelta(hours=25)) == {'second': '0', 'minute': '0', 'hour': '0'}
  90. assert utils.cron_config(timedelta(seconds=10), randomize_seconds=True) == {'second': '*/10',
  91. 'minute': '*',
  92. 'hour': '*',
  93. 'jitter': 1}
  94. def test_get_event_from_deque():
  95. d = deque()
  96. now = datetime.now(timezone.utc)
  97. event1 = objects.Event(event_descriptor=objects.EventDescriptor(event_id='event123',
  98. event_status='far',
  99. modification_number='1',
  100. market_context='http://marketcontext01'),
  101. event_signals=[objects.EventSignal(signal_name='simple',
  102. signal_type='level',
  103. signal_id=utils.generate_id(),
  104. intervals=[objects.Interval(dtstart=now,
  105. duration=timedelta(minutes=10),
  106. signal_payload=1)])],
  107. targets=[{'ven_id': 'ven123'}])
  108. msg_one = {'message': 'one'}
  109. msg_two = {'message': 'two'}
  110. msg_three = {'message': 'three'}
  111. event2 = objects.Event(event_descriptor=objects.EventDescriptor(event_id='event123',
  112. event_status='far',
  113. modification_number='1',
  114. market_context='http://marketcontext01'),
  115. event_signals=[objects.EventSignal(signal_name='simple',
  116. signal_type='level',
  117. signal_id=utils.generate_id(),
  118. intervals=[objects.Interval(dtstart=now,
  119. duration=timedelta(minutes=10),
  120. signal_payload=1)])],
  121. targets=[{'ven_id': 'ven123'}])
  122. d.append(event1)
  123. d.append(msg_one)
  124. d.append(msg_two)
  125. d.append(msg_three)
  126. d.append(event2)
  127. assert utils.get_next_event_from_deque(d) is event1
  128. assert utils.get_next_event_from_deque(d) is event2
  129. assert utils.get_next_event_from_deque(d) is None
  130. assert utils.get_next_event_from_deque(d) is None
  131. assert len(d) == 3
  132. assert d.popleft() is msg_one
  133. assert d.popleft() is msg_two
  134. assert d.popleft() is msg_three
  135. assert len(d) == 0
  136. assert utils.get_next_event_from_deque(d) is None
  137. def test_validate_report_measurement_dict_missing_items(caplog):
  138. measurement = {'name': 'rainbows'}
  139. with pytest.raises(ValueError) as err:
  140. utils.validate_report_measurement_dict(measurement)
  141. assert str(err.value) == (f"The measurement dict must contain the following keys: "
  142. "'name', 'description', 'unit'. Please correct this.")
  143. def test_validate_report_measurement_dict_invalid_name(caplog):
  144. measurement = {'name': 'rainbows',
  145. 'unit': 'B',
  146. 'description': 'Rainbows'}
  147. utils.validate_report_measurement_dict(measurement)
  148. assert measurement['name'] == 'customUnit'
  149. assert (f"You provided a measurement with an unknown name rainbows. "
  150. "This was corrected to 'customUnit'. Please correct this in your "
  151. "report definition.") in caplog.messages
  152. def test_validate_report_measurement_dict_invalid_unit():
  153. with pytest.raises(ValueError) as err:
  154. measurement = {'name': 'current',
  155. 'unit': 'B',
  156. 'description': 'Current'}
  157. utils.validate_report_measurement_dict(measurement)
  158. assert str(err.value) == (f"The unit 'B' is not acceptable for measurement 'current'. Allowed "
  159. f"units are: 'A'.")
  160. def test_validate_report_measurement_dict_invalid_description(caplog):
  161. with pytest.raises(ValueError) as err:
  162. measurement = {'name': 'current',
  163. 'unit': 'A',
  164. 'description': 'something'}
  165. utils.validate_report_measurement_dict(measurement)
  166. str(err.value) == (f"The measurement's description 'something' "
  167. f"did not match the expected description for this type "
  168. f" ('Current'). Please correct this, or use "
  169. "'customUnit' as the name.")
  170. def test_validate_report_measurement_dict_invalid_description_case(caplog):
  171. measurement = {'name': 'current',
  172. 'unit': 'A',
  173. 'description': 'CURRENT'}
  174. utils.validate_report_measurement_dict(measurement)
  175. assert measurement['description'] == 'Current'
  176. assert (f"The description for the measurement with name 'current' "
  177. f"was not in the correct case; you provided 'CURRENT' but "
  178. f"it should be 'Current'. "
  179. "This was automatically corrected.") in caplog.messages
  180. def test_validate_report_measurement_dict_missing_power_attributes(caplog):
  181. with pytest.raises(ValueError) as err:
  182. measurement = {'name': 'powerReal',
  183. 'description': 'RealPower',
  184. 'unit': 'W'}
  185. utils.validate_report_measurement_dict(measurement)
  186. assert str(err.value) == ("A 'power' related measurement must contain a "
  187. "'power_attributes' section that contains the following "
  188. "keys: 'voltage' (int), 'ac' (boolean), 'hertz' (int)")
  189. def test_validate_report_measurement_dict_invalid_power_attributes(caplog):
  190. with pytest.raises(ValueError) as err:
  191. measurement = {'name': 'powerReal',
  192. 'description': 'RealPower',
  193. 'unit': 'W',
  194. 'power_attributes': {'a': 123}}
  195. utils.validate_report_measurement_dict(measurement)
  196. assert str(err.value) == ("The power_attributes of the measurement must contain the "
  197. "following keys: 'voltage' (int), 'ac' (bool), 'hertz' (int).")
  198. def test_ungroup_target_by_type_with_single_str():
  199. targets_by_type = {'ven_id': 'ven123'}
  200. targets = utils.ungroup_targets_by_type(targets_by_type)
  201. assert targets == [{'ven_id': 'ven123'}]
  202. def test_find_by_with_dict():
  203. search_dict = {'one': {'a': 123, 'b': 456},
  204. 'two': {'a': 321, 'b': 654}}
  205. result = utils.find_by(search_dict, 'a', 123)
  206. assert result == {'a': 123, 'b': 456}
  207. def test_find_by_with_missing_member():
  208. search_list = [{'a': 123, 'b': 456},
  209. {'a': 321, 'b': 654, 'c': 1000}]
  210. result = utils.find_by(search_list, 'c', 1000)
  211. assert result == {'a': 321, 'b': 654, 'c': 1000}
  212. def test_ensure_str():
  213. assert utils.ensure_str("Hello") == "Hello"
  214. assert utils.ensure_str(b"Hello") == "Hello"
  215. assert utils.ensure_str(None) is None
  216. with pytest.raises(TypeError) as err:
  217. utils.ensure_str(1)
  218. assert str(err.value) == "Must be bytes or str"
  219. def test_ensure_bytes():
  220. assert utils.ensure_bytes("Hello") == b"Hello"
  221. assert utils.ensure_bytes(b"Hello") == b"Hello"
  222. assert utils.ensure_bytes(None) is None
  223. with pytest.raises(TypeError) as err:
  224. utils.ensure_bytes(1)
  225. assert str(err.value) == "Must be bytes or str"
  226. def test_booleanformat():
  227. assert utils.booleanformat("true") == "true"
  228. assert utils.booleanformat("false") == "false"
  229. assert utils.booleanformat(True) == "true"
  230. assert utils.booleanformat(False) == "false"
  231. with pytest.raises(ValueError) as err:
  232. assert utils.booleanformat(123)
  233. assert str(err.value) == "A boolean value must be provided, not 123."
  234. def test_parse_duration():
  235. assert utils.parse_duration("PT1M") == timedelta(minutes=1)
  236. assert utils.parse_duration("PT1M5S") == timedelta(minutes=1, seconds=5)
  237. assert utils.parse_duration("PT1H5M10S") == timedelta(hours=1, minutes=5, seconds=10)
  238. assert utils.parse_duration("P1DT1H5M10S") == timedelta(days=1, hours=1, minutes=5, seconds=10)
  239. assert utils.parse_duration("P1M") == timedelta(days=30)
  240. assert utils.parse_duration("-P1M") == timedelta(days=-30)
  241. assert utils.parse_duration("2W") == timedelta(days=14)
  242. with pytest.raises(ValueError) as err:
  243. utils.parse_duration("Hello")
  244. assert str(err.value) == f"The duration 'Hello' did not match the requested format"
  245. def test_parse_datetime():
  246. assert utils.parse_datetime("2020-12-15T11:29:34Z") == datetime(2020, 12, 15, 11, 29, 34, tzinfo=timezone.utc)
  247. assert utils.parse_datetime("2020-12-15T11:29:34.123456Z") == datetime(2020, 12, 15, 11, 29, 34, 123456, tzinfo=timezone.utc)
  248. assert utils.parse_datetime("2020-12-15T11:29:34.123Z") == datetime(2020, 12, 15, 11, 29, 34, 123000, tzinfo=timezone.utc)
  249. assert utils.parse_datetime("2020-12-15T11:29:34.123456789Z") == datetime(2020, 12, 15, 11, 29, 34, 123456, tzinfo=timezone.utc)
  250. @pytest.mark.asyncio
  251. async def test_await_if_required():
  252. def normal_func():
  253. return 123
  254. async def coro_func():
  255. return 456
  256. result = await utils.await_if_required(normal_func())
  257. assert result == 123
  258. result = await utils.await_if_required(coro_func())
  259. assert result == 456
  260. result = await utils.await_if_required(None)
  261. assert result == None
  262. @pytest.mark.asyncio
  263. async def test_gather_if_required():
  264. def normal_func():
  265. return 123
  266. async def coro_func():
  267. return 456
  268. raw_results = [normal_func(), normal_func(), normal_func()]
  269. results = await utils.gather_if_required(raw_results)
  270. assert results == [123, 123, 123]
  271. raw_results = [coro_func(), coro_func(), coro_func()]
  272. results = await utils.gather_if_required(raw_results)
  273. assert results == [456, 456, 456]
  274. raw_results = [coro_func(), normal_func(), None]
  275. results = await utils.gather_if_required(raw_results)
  276. assert results == [456, 123, None]
  277. raw_results = []
  278. results = await utils.gather_if_required(raw_results)
  279. assert results == []
  280. def test_order_events():
  281. now = datetime.now(timezone.utc)
  282. event_1_active_high_prio = objects.Event(event_descriptor=objects.EventDescriptor(event_id='event001',
  283. modification_number=0,
  284. created_date_time=now,
  285. event_status='far',
  286. priority=1,
  287. market_context='http://context01'),
  288. active_period=objects.ActivePeriod(dtstart=now - timedelta(minutes=5),
  289. duration=timedelta(minutes=10)),
  290. event_signals=[objects.EventSignal(intervals=[objects.Interval(dtstart=now,
  291. duration=timedelta(minutes=10),
  292. signal_payload=1)],
  293. signal_name='simple',
  294. signal_type='level',
  295. signal_id='signal001')],
  296. targets=[{'ven_id': 'ven001'}])
  297. event_2_active_low_prio = objects.Event(event_descriptor=objects.EventDescriptor(event_id='event001',
  298. modification_number=0,
  299. created_date_time=now,
  300. event_status='far',
  301. priority=2,
  302. market_context='http://context01'),
  303. active_period=objects.ActivePeriod(dtstart=now - timedelta(minutes=5),
  304. duration=timedelta(minutes=10)),
  305. event_signals=[objects.EventSignal(intervals=[objects.Interval(dtstart=now,
  306. duration=timedelta(minutes=10),
  307. signal_payload=1)],
  308. signal_name='simple',
  309. signal_type='level',
  310. signal_id='signal001')],
  311. targets=[{'ven_id': 'ven001'}])
  312. event_3_active_no_prio = objects.Event(event_descriptor=objects.EventDescriptor(event_id='event001',
  313. modification_number=0,
  314. created_date_time=now,
  315. event_status='far',
  316. market_context='http://context01'),
  317. active_period=objects.ActivePeriod(dtstart=now - timedelta(minutes=5),
  318. duration=timedelta(minutes=10)),
  319. event_signals=[objects.EventSignal(intervals=[objects.Interval(dtstart=now,
  320. duration=timedelta(minutes=10),
  321. signal_payload=1)],
  322. signal_name='simple',
  323. signal_type='level',
  324. signal_id='signal001')],
  325. targets=[{'ven_id': 'ven001'}])
  326. event_4_far_early = objects.Event(event_descriptor=objects.EventDescriptor(event_id='event001',
  327. modification_number=0,
  328. created_date_time=now,
  329. event_status='far',
  330. market_context='http://context01'),
  331. active_period=objects.ActivePeriod(dtstart=now + timedelta(minutes=5),
  332. duration=timedelta(minutes=10)),
  333. event_signals=[objects.EventSignal(intervals=[objects.Interval(dtstart=now,
  334. duration=timedelta(minutes=10),
  335. signal_payload=1)],
  336. signal_name='simple',
  337. signal_type='level',
  338. signal_id='signal001')],
  339. targets=[{'ven_id': 'ven001'}])
  340. event_5_far_later = objects.Event(event_descriptor=objects.EventDescriptor(event_id='event001',
  341. modification_number=0,
  342. created_date_time=now,
  343. event_status='far',
  344. market_context='http://context01'),
  345. active_period=objects.ActivePeriod(dtstart=now + timedelta(minutes=10),
  346. duration=timedelta(minutes=10)),
  347. event_signals=[objects.EventSignal(intervals=[objects.Interval(dtstart=now,
  348. duration=timedelta(minutes=10),
  349. signal_payload=1)],
  350. signal_name='simple',
  351. signal_type='level',
  352. signal_id='signal001')],
  353. targets=[{'ven_id': 'ven001'}])
  354. events = [event_5_far_later, event_4_far_early, event_3_active_no_prio, event_2_active_low_prio, event_1_active_high_prio]
  355. ordered_events = utils.order_events(events)
  356. assert ordered_events == [event_1_active_high_prio, event_2_active_low_prio, event_3_active_no_prio, event_4_far_early, event_5_far_later]
  357. ordered_events = utils.order_events(event_1_active_high_prio)
  358. assert ordered_events == [event_1_active_high_prio]
  359. event_1_as_dict = asdict(event_1_active_high_prio)
  360. ordered_events = utils.order_events(event_1_as_dict)
  361. assert ordered_events == [event_1_as_dict]