test_utils.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. from openleadr import utils, objects
  2. from dataclasses import dataclass
  3. import pytest
  4. from datetime import datetime, timezone, timedelta
  5. from collections import deque
  6. @dataclass
  7. class dc:
  8. a: int = 2
  9. def test_hasmember():
  10. obj = {'a': 1}
  11. assert utils.hasmember(obj, 'a') == True
  12. assert utils.hasmember(obj, 'b') == False
  13. obj = dc()
  14. assert utils.hasmember(obj, 'a') == True
  15. assert utils.hasmember(obj, 'b') == False
  16. def test_getmember():
  17. obj = {'a': 1}
  18. assert utils.getmember(obj, 'a') == 1
  19. obj = dc()
  20. assert utils.getmember(obj, 'a') == 2
  21. def test_setmember():
  22. obj = {'a': 1}
  23. utils.setmember(obj, 'a', 10)
  24. assert utils.getmember(obj, 'a') == 10
  25. obj = dc()
  26. utils.setmember(obj, 'a', 10)
  27. assert utils.getmember(obj, 'a') == 10
  28. @pytest.mark.asyncio
  29. async def test_delayed_call_with_func():
  30. async def myfunc():
  31. pass
  32. await utils.delayed_call(myfunc, delay=0.1)
  33. @pytest.mark.asyncio
  34. async def test_delayed_call_with_coro():
  35. async def mycoro():
  36. pass
  37. await utils.delayed_call(mycoro(), delay=0.1)
  38. @pytest.mark.asyncio
  39. async def test_delayed_call_with_coro_func():
  40. async def mycoro():
  41. pass
  42. await utils.delayed_call(mycoro, delay=0.1)
  43. def test_determine_event_status_completed():
  44. active_period = {'dtstart': datetime.now(timezone.utc) - timedelta(seconds=10),
  45. 'duration': timedelta(seconds=5)}
  46. assert utils.determine_event_status(active_period) == 'completed'
  47. def test_determine_event_status_active():
  48. active_period = {'dtstart': datetime.now(timezone.utc) - timedelta(seconds=10),
  49. 'duration': timedelta(seconds=15)}
  50. assert utils.determine_event_status(active_period) == 'active'
  51. def test_determine_event_status_near():
  52. active_period = {'dtstart': datetime.now(timezone.utc) + timedelta(seconds=3),
  53. 'duration': timedelta(seconds=5),
  54. 'ramp_up_duration': timedelta(seconds=5)}
  55. assert utils.determine_event_status(active_period) == 'near'
  56. def test_determine_event_status_far():
  57. active_period = {'dtstart': datetime.now(timezone.utc) + timedelta(seconds=10),
  58. 'duration': timedelta(seconds=5)}
  59. assert utils.determine_event_status(active_period) == 'far'
  60. def test_determine_event_status_far_with_ramp_up():
  61. active_period = {'dtstart': datetime.now(timezone.utc) + timedelta(seconds=10),
  62. 'duration': timedelta(seconds=5),
  63. 'ramp_up_duration': timedelta(seconds=5)}
  64. assert utils.determine_event_status(active_period) == 'far'
  65. def test_get_active_period_from_intervals():
  66. now = datetime.now(timezone.utc)
  67. intervals=[{'dtstart': now,
  68. 'duration': timedelta(seconds=5)},
  69. {'dtstart': now + timedelta(seconds=5),
  70. 'duration': timedelta(seconds=5)}]
  71. assert utils.get_active_period_from_intervals(intervals) == {'dtstart': now,
  72. 'duration': timedelta(seconds=10)}
  73. intervals=[objects.Interval(dtstart=now,
  74. duration=timedelta(seconds=5),
  75. signal_payload=1),
  76. objects.Interval(dtstart=now + timedelta(seconds=5),
  77. duration=timedelta(seconds=5),
  78. signal_payload=2)]
  79. assert utils.get_active_period_from_intervals(intervals) == {'dtstart': now,
  80. 'duration': timedelta(seconds=10)}
  81. assert utils.get_active_period_from_intervals(intervals, False) == objects.ActivePeriod(dtstart=now,
  82. duration=timedelta(seconds=10))
  83. def test_cron_config():
  84. assert utils.cron_config(timedelta(seconds=5)) == {'second': '*/5', 'minute': '*', 'hour': '*'}
  85. assert utils.cron_config(timedelta(minutes=1)) == {'second': '0', 'minute': '*/1', 'hour': '*'}
  86. assert utils.cron_config(timedelta(minutes=5)) == {'second': '0', 'minute': '*/5', 'hour': '*'}
  87. assert utils.cron_config(timedelta(hours=1)) == {'second': '0', 'minute': '0', 'hour': '*/1'}
  88. assert utils.cron_config(timedelta(hours=2)) == {'second': '0', 'minute': '0', 'hour': '*/2'}
  89. assert utils.cron_config(timedelta(hours=25)) == {'second': '0', 'minute': '0', 'hour': '0'}
  90. assert utils.cron_config(timedelta(seconds=10), randomize_seconds=True) == {'second': '*/10',
  91. 'minute': '*',
  92. 'hour': '*',
  93. 'jitter': 1}
  94. def test_get_event_from_deque():
  95. d = deque()
  96. now = datetime.now(timezone.utc)
  97. event1 = objects.Event(event_descriptor=objects.EventDescriptor(event_id='event123',
  98. event_status='far',
  99. modification_number='1',
  100. market_context='http://marketcontext01'),
  101. event_signals=[objects.EventSignal(signal_name='simple',
  102. signal_type='level',
  103. signal_id=utils.generate_id(),
  104. intervals=[objects.Interval(dtstart=now,
  105. duration=timedelta(minutes=10),
  106. signal_payload=1)])],
  107. targets=[{'ven_id': 'ven123'}])
  108. msg_one = {'message': 'one'}
  109. msg_two = {'message': 'two'}
  110. msg_three = {'message': 'three'}
  111. event2 = objects.Event(event_descriptor=objects.EventDescriptor(event_id='event123',
  112. event_status='far',
  113. modification_number='1',
  114. market_context='http://marketcontext01'),
  115. event_signals=[objects.EventSignal(signal_name='simple',
  116. signal_type='level',
  117. signal_id=utils.generate_id(),
  118. intervals=[objects.Interval(dtstart=now,
  119. duration=timedelta(minutes=10),
  120. signal_payload=1)])],
  121. targets=[{'ven_id': 'ven123'}])
  122. d.append(event1)
  123. d.append(msg_one)
  124. d.append(msg_two)
  125. d.append(msg_three)
  126. d.append(event2)
  127. assert utils.get_next_event_from_deque(d) is event1
  128. assert utils.get_next_event_from_deque(d) is event2
  129. assert utils.get_next_event_from_deque(d) is None
  130. assert utils.get_next_event_from_deque(d) is None
  131. assert len(d) == 3
  132. assert d.popleft() is msg_one
  133. assert d.popleft() is msg_two
  134. assert d.popleft() is msg_three
  135. assert len(d) == 0
  136. assert utils.get_next_event_from_deque(d) is None
  137. def test_validate_report_measurement_dict_missing_items(caplog):
  138. measurement = {'name': 'rainbows'}
  139. with pytest.raises(ValueError) as err:
  140. utils.validate_report_measurement_dict(measurement)
  141. assert str(err.value) == (f"The measurement dict must contain the following keys: "
  142. "'name', 'description', 'unit'. Please correct this.")
  143. def test_validate_report_measurement_dict_invalid_name(caplog):
  144. measurement = {'name': 'rainbows',
  145. 'unit': 'B',
  146. 'description': 'Rainbows'}
  147. utils.validate_report_measurement_dict(measurement)
  148. assert measurement['name'] == 'customUnit'
  149. assert (f"You provided a measurement with an unknown name rainbows. "
  150. "This was corrected to 'customUnit'. Please correct this in your "
  151. "report definition.") in caplog.messages
  152. def test_validate_report_measurement_dict_invalid_unit():
  153. with pytest.raises(ValueError) as err:
  154. measurement = {'name': 'current',
  155. 'unit': 'B',
  156. 'description': 'Current'}
  157. utils.validate_report_measurement_dict(measurement)
  158. assert str(err.value) == (f"The unit 'B' is not acceptable for measurement 'current'. Allowed "
  159. f"units are: 'A'.")
  160. def test_validate_report_measurement_dict_invalid_description(caplog):
  161. with pytest.raises(ValueError) as err:
  162. measurement = {'name': 'current',
  163. 'unit': 'A',
  164. 'description': 'something'}
  165. utils.validate_report_measurement_dict(measurement)
  166. str(err.value) == (f"The measurement's description 'something' "
  167. f"did not match the expected description for this type "
  168. f" ('Current'). Please correct this, or use "
  169. "'customUnit' as the name.")
  170. def test_validate_report_measurement_dict_invalid_description_case(caplog):
  171. measurement = {'name': 'current',
  172. 'unit': 'A',
  173. 'description': 'CURRENT'}
  174. utils.validate_report_measurement_dict(measurement)
  175. assert measurement['description'] == 'Current'
  176. assert (f"The description for the measurement with name 'current' "
  177. f"was not in the correct case; you provided 'CURRENT' but "
  178. f"it should be 'Current'. "
  179. "This was automatically corrected.") in caplog.messages
  180. def test_validate_report_measurement_dict_missing_power_attributes(caplog):
  181. with pytest.raises(ValueError) as err:
  182. measurement = {'name': 'powerReal',
  183. 'description': 'RealPower',
  184. 'unit': 'W'}
  185. utils.validate_report_measurement_dict(measurement)
  186. assert str(err.value) == ("A 'power' related measurement must contain a "
  187. "'power_attributes' section that contains the following "
  188. "keys: 'voltage' (int), 'ac' (boolean), 'hertz' (int)")
  189. def test_validate_report_measurement_dict_invalid_power_attributes(caplog):
  190. with pytest.raises(ValueError) as err:
  191. measurement = {'name': 'powerReal',
  192. 'description': 'RealPower',
  193. 'unit': 'W',
  194. 'power_attributes': {'a': 123}}
  195. utils.validate_report_measurement_dict(measurement)
  196. assert str(err.value) == ("The power_attributes of the measurement must contain the "
  197. "following keys: 'voltage' (int), 'ac' (bool), 'hertz' (int).")
  198. def test_ungroup_target_by_type_with_single_str():
  199. targets_by_type = {'ven_id': 'ven123'}
  200. targets = utils.ungroup_targets_by_type(targets_by_type)
  201. assert targets == [{'ven_id': 'ven123'}]
  202. def test_find_by_with_dict():
  203. search_dict = {'one': {'a': 123, 'b': 456},
  204. 'two': {'a': 321, 'b': 654}}
  205. result = utils.find_by(search_dict, 'a', 123)
  206. assert result == {'a': 123, 'b': 456}
  207. def test_find_by_with_missing_member():
  208. search_list = [{'a': 123, 'b': 456},
  209. {'a': 321, 'b': 654, 'c': 1000}]
  210. result = utils.find_by(search_list, 'c', 1000)
  211. assert result == {'a': 321, 'b': 654, 'c': 1000}
  212. def test_ensure_str():
  213. assert utils.ensure_str("Hello") == "Hello"
  214. assert utils.ensure_str(b"Hello") == "Hello"
  215. assert utils.ensure_str(None) is None
  216. with pytest.raises(TypeError) as err:
  217. utils.ensure_str(1)
  218. assert str(err.value) == "Must be bytes or str"
  219. def test_ensure_bytes():
  220. assert utils.ensure_bytes("Hello") == b"Hello"
  221. assert utils.ensure_bytes(b"Hello") == b"Hello"
  222. assert utils.ensure_bytes(None) is None
  223. with pytest.raises(TypeError) as err:
  224. utils.ensure_bytes(1)
  225. assert str(err.value) == "Must be bytes or str"
  226. def test_booleanformat():
  227. assert utils.booleanformat("true") == "true"
  228. assert utils.booleanformat("false") == "false"
  229. assert utils.booleanformat(True) == "true"
  230. assert utils.booleanformat(False) == "false"
  231. with pytest.raises(ValueError) as err:
  232. assert utils.booleanformat(123)
  233. assert str(err.value) == "A boolean value must be provided, not 123."
  234. def test_parse_duration():
  235. assert utils.parse_duration("PT1M") == timedelta(minutes=1)
  236. assert utils.parse_duration("PT1M5S") == timedelta(minutes=1, seconds=5)
  237. assert utils.parse_duration("PT1H5M10S") == timedelta(hours=1, minutes=5, seconds=10)
  238. assert utils.parse_duration("P1DT1H5M10S") == timedelta(days=1, hours=1, minutes=5, seconds=10)
  239. assert utils.parse_duration("P1M") == timedelta(days=30)
  240. assert utils.parse_duration("-P1M") == timedelta(days=-30)
  241. assert utils.parse_duration("2W") == timedelta(days=14)
  242. with pytest.raises(ValueError) as err:
  243. utils.parse_duration("Hello")
  244. assert str(err.value) == f"The duration 'Hello' did not match the requested format"