123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431 |
- from openleadr import utils, objects
- from dataclasses import dataclass, asdict
- import pytest
- from datetime import datetime, timezone, timedelta
- from collections import deque
- @dataclass
- class dc:
- a: int = 2
- def test_hasmember():
- obj = {'a': 1}
- assert utils.hasmember(obj, 'a') == True
- assert utils.hasmember(obj, 'b') == False
- obj = dc()
- assert utils.hasmember(obj, 'a') == True
- assert utils.hasmember(obj, 'b') == False
- def test_getmember():
- obj = {'a': 1}
- assert utils.getmember(obj, 'a') == 1
- obj = dc()
- assert utils.getmember(obj, 'a') == 2
- def test_setmember():
- obj = {'a': 1}
- utils.setmember(obj, 'a', 10)
- assert utils.getmember(obj, 'a') == 10
- obj = dc()
- utils.setmember(obj, 'a', 10)
- assert utils.getmember(obj, 'a') == 10
- def test_setmember_nested():
- dc_parent = dc()
- dc_parent.a = dc()
- assert utils.getmember(utils.getmember(dc_parent, 'a'), 'a') == 2
- utils.setmember(utils.getmember(dc_parent, 'a'), 'a', 3)
- assert dc_parent.a.a == 3
- @pytest.mark.asyncio
- async def test_delayed_call_with_func():
- async def myfunc():
- pass
- await utils.delayed_call(myfunc, delay=0.1)
- @pytest.mark.asyncio
- async def test_delayed_call_with_coro():
- async def mycoro():
- pass
- await utils.delayed_call(mycoro(), delay=0.1)
- @pytest.mark.asyncio
- async def test_delayed_call_with_coro_func():
- async def mycoro():
- pass
- await utils.delayed_call(mycoro, delay=0.1)
- def test_determine_event_status_completed():
- active_period = {'dtstart': datetime.now(timezone.utc) - timedelta(seconds=10),
- 'duration': timedelta(seconds=5)}
- assert utils.determine_event_status(active_period) == 'completed'
- def test_determine_event_status_active():
- active_period = {'dtstart': datetime.now(timezone.utc) - timedelta(seconds=10),
- 'duration': timedelta(seconds=15)}
- assert utils.determine_event_status(active_period) == 'active'
- def test_determine_event_status_near():
- active_period = {'dtstart': datetime.now(timezone.utc) + timedelta(seconds=3),
- 'duration': timedelta(seconds=5),
- 'ramp_up_period': timedelta(seconds=5)}
- assert utils.determine_event_status(active_period) == 'near'
- def test_determine_event_status_far():
- active_period = {'dtstart': datetime.now(timezone.utc) + timedelta(seconds=10),
- 'duration': timedelta(seconds=5)}
- assert utils.determine_event_status(active_period) == 'far'
- def test_determine_event_status_far_with_ramp_up():
- active_period = {'dtstart': datetime.now(timezone.utc) + timedelta(seconds=10),
- 'duration': timedelta(seconds=5),
- 'ramp_up_period': timedelta(seconds=5)}
- assert utils.determine_event_status(active_period) == 'far'
- def test_get_active_period_from_intervals():
- now = datetime.now(timezone.utc)
- intervals=[{'dtstart': now,
- 'duration': timedelta(seconds=5)},
- {'dtstart': now + timedelta(seconds=5),
- 'duration': timedelta(seconds=5)}]
- assert utils.get_active_period_from_intervals(intervals) == {'dtstart': now,
- 'duration': timedelta(seconds=10)}
- intervals=[objects.Interval(dtstart=now,
- duration=timedelta(seconds=5),
- signal_payload=1),
- objects.Interval(dtstart=now + timedelta(seconds=5),
- duration=timedelta(seconds=5),
- signal_payload=2)]
- assert utils.get_active_period_from_intervals(intervals) == {'dtstart': now,
- 'duration': timedelta(seconds=10)}
- assert utils.get_active_period_from_intervals(intervals, False) == objects.ActivePeriod(dtstart=now,
- duration=timedelta(seconds=10))
- def test_cron_config():
- assert utils.cron_config(timedelta(seconds=5)) == {'second': '*/5', 'minute': '*', 'hour': '*'}
- assert utils.cron_config(timedelta(minutes=1)) == {'second': '0', 'minute': '*/1', 'hour': '*'}
- assert utils.cron_config(timedelta(minutes=5)) == {'second': '0', 'minute': '*/5', 'hour': '*'}
- assert utils.cron_config(timedelta(hours=1)) == {'second': '0', 'minute': '0', 'hour': '*/1'}
- assert utils.cron_config(timedelta(hours=2)) == {'second': '0', 'minute': '0', 'hour': '*/2'}
- assert utils.cron_config(timedelta(hours=25)) == {'second': '0', 'minute': '0', 'hour': '0'}
- assert utils.cron_config(timedelta(seconds=10), randomize_seconds=True) == {'second': '*/10',
- 'minute': '*',
- 'hour': '*',
- 'jitter': 1}
- def test_get_event_from_deque():
- d = deque()
- now = datetime.now(timezone.utc)
- event1 = objects.Event(event_descriptor=objects.EventDescriptor(event_id='event123',
- event_status='far',
- modification_number='1',
- market_context='http://marketcontext01'),
- event_signals=[objects.EventSignal(signal_name='simple',
- signal_type='level',
- signal_id=utils.generate_id(),
- intervals=[objects.Interval(dtstart=now,
- duration=timedelta(minutes=10),
- signal_payload=1)])],
- targets=[{'ven_id': 'ven123'}])
- msg_one = {'message': 'one'}
- msg_two = {'message': 'two'}
- msg_three = {'message': 'three'}
- event2 = objects.Event(event_descriptor=objects.EventDescriptor(event_id='event123',
- event_status='far',
- modification_number='1',
- market_context='http://marketcontext01'),
- event_signals=[objects.EventSignal(signal_name='simple',
- signal_type='level',
- signal_id=utils.generate_id(),
- intervals=[objects.Interval(dtstart=now,
- duration=timedelta(minutes=10),
- signal_payload=1)])],
- targets=[{'ven_id': 'ven123'}])
- d.append(event1)
- d.append(msg_one)
- d.append(msg_two)
- d.append(msg_three)
- d.append(event2)
- assert utils.get_next_event_from_deque(d) is event1
- assert utils.get_next_event_from_deque(d) is event2
- assert utils.get_next_event_from_deque(d) is None
- assert utils.get_next_event_from_deque(d) is None
- assert len(d) == 3
- assert d.popleft() is msg_one
- assert d.popleft() is msg_two
- assert d.popleft() is msg_three
- assert len(d) == 0
- assert utils.get_next_event_from_deque(d) is None
- def test_validate_report_measurement_dict_missing_items(caplog):
- measurement = {'name': 'rainbows'}
- with pytest.raises(ValueError) as err:
- utils.validate_report_measurement_dict(measurement)
- assert str(err.value) == (f"The measurement dict must contain the following keys: "
- "'name', 'description', 'unit'. Please correct this.")
- def test_validate_report_measurement_dict_invalid_name(caplog):
- measurement = {'name': 'rainbows',
- 'unit': 'B',
- 'description': 'Rainbows'}
- utils.validate_report_measurement_dict(measurement)
- assert measurement['name'] == 'customUnit'
- assert (f"You provided a measurement with an unknown name rainbows. "
- "This was corrected to 'customUnit'. Please correct this in your "
- "report definition.") in caplog.messages
- def test_validate_report_measurement_dict_invalid_unit():
- with pytest.raises(ValueError) as err:
- measurement = {'name': 'current',
- 'unit': 'B',
- 'description': 'Current'}
- utils.validate_report_measurement_dict(measurement)
- assert str(err.value) == (f"The unit 'B' is not acceptable for measurement 'current'. Allowed "
- f"units are: 'A'.")
- def test_validate_report_measurement_dict_invalid_description(caplog):
- with pytest.raises(ValueError) as err:
- measurement = {'name': 'current',
- 'unit': 'A',
- 'description': 'something'}
- utils.validate_report_measurement_dict(measurement)
- str(err.value) == (f"The measurement's description 'something' "
- f"did not match the expected description for this type "
- f" ('Current'). Please correct this, or use "
- "'customUnit' as the name.")
- def test_validate_report_measurement_dict_invalid_description_case(caplog):
- measurement = {'name': 'current',
- 'unit': 'A',
- 'description': 'CURRENT'}
- utils.validate_report_measurement_dict(measurement)
- assert measurement['description'] == 'Current'
- assert (f"The description for the measurement with name 'current' "
- f"was not in the correct case; you provided 'CURRENT' but "
- f"it should be 'Current'. "
- "This was automatically corrected.") in caplog.messages
- def test_validate_report_measurement_dict_missing_power_attributes(caplog):
- with pytest.raises(ValueError) as err:
- measurement = {'name': 'powerReal',
- 'description': 'RealPower',
- 'unit': 'W'}
- utils.validate_report_measurement_dict(measurement)
- assert str(err.value) == ("A 'power' related measurement must contain a "
- "'power_attributes' section that contains the following "
- "keys: 'voltage' (int), 'ac' (boolean), 'hertz' (int)")
- def test_validate_report_measurement_dict_invalid_power_attributes(caplog):
- with pytest.raises(ValueError) as err:
- measurement = {'name': 'powerReal',
- 'description': 'RealPower',
- 'unit': 'W',
- 'power_attributes': {'a': 123}}
- utils.validate_report_measurement_dict(measurement)
- assert str(err.value) == ("The power_attributes of the measurement must contain the "
- "following keys: 'voltage' (int), 'ac' (bool), 'hertz' (int).")
- def test_ungroup_target_by_type_with_single_str():
- targets_by_type = {'ven_id': 'ven123'}
- targets = utils.ungroup_targets_by_type(targets_by_type)
- assert targets == [{'ven_id': 'ven123'}]
- def test_find_by_with_dict():
- search_dict = {'one': {'a': 123, 'b': 456},
- 'two': {'a': 321, 'b': 654}}
- result = utils.find_by(search_dict, 'a', 123)
- assert result == {'a': 123, 'b': 456}
- def test_find_by_with_missing_member():
- search_list = [{'a': 123, 'b': 456},
- {'a': 321, 'b': 654, 'c': 1000}]
- result = utils.find_by(search_list, 'c', 1000)
- assert result == {'a': 321, 'b': 654, 'c': 1000}
- def test_ensure_str():
- assert utils.ensure_str("Hello") == "Hello"
- assert utils.ensure_str(b"Hello") == "Hello"
- assert utils.ensure_str(None) is None
- with pytest.raises(TypeError) as err:
- utils.ensure_str(1)
- assert str(err.value) == "Must be bytes or str"
- def test_ensure_bytes():
- assert utils.ensure_bytes("Hello") == b"Hello"
- assert utils.ensure_bytes(b"Hello") == b"Hello"
- assert utils.ensure_bytes(None) is None
- with pytest.raises(TypeError) as err:
- utils.ensure_bytes(1)
- assert str(err.value) == "Must be bytes or str"
- def test_booleanformat():
- assert utils.booleanformat("true") == "true"
- assert utils.booleanformat("false") == "false"
- assert utils.booleanformat(True) == "true"
- assert utils.booleanformat(False) == "false"
- with pytest.raises(ValueError) as err:
- assert utils.booleanformat(123)
- assert str(err.value) == "A boolean value must be provided, not 123."
- def test_parse_duration():
- assert utils.parse_duration("PT1M") == timedelta(minutes=1)
- assert utils.parse_duration("PT1M5S") == timedelta(minutes=1, seconds=5)
- assert utils.parse_duration("PT1H5M10S") == timedelta(hours=1, minutes=5, seconds=10)
- assert utils.parse_duration("P1DT1H5M10S") == timedelta(days=1, hours=1, minutes=5, seconds=10)
- assert utils.parse_duration("P1M") == timedelta(days=30)
- assert utils.parse_duration("-P1M") == timedelta(days=-30)
- assert utils.parse_duration("2W") == timedelta(days=14)
- with pytest.raises(ValueError) as err:
- utils.parse_duration("Hello")
- assert str(err.value) == f"The duration 'Hello' did not match the requested format"
- def test_parse_datetime():
- assert utils.parse_datetime("2020-12-15T11:29:34Z") == datetime(2020, 12, 15, 11, 29, 34, tzinfo=timezone.utc)
- assert utils.parse_datetime("2020-12-15T11:29:34.123456Z") == datetime(2020, 12, 15, 11, 29, 34, 123456, tzinfo=timezone.utc)
- assert utils.parse_datetime("2020-12-15T11:29:34.123Z") == datetime(2020, 12, 15, 11, 29, 34, 123000, tzinfo=timezone.utc)
- assert utils.parse_datetime("2020-12-15T11:29:34.123456789Z") == datetime(2020, 12, 15, 11, 29, 34, 123456, tzinfo=timezone.utc)
- @pytest.mark.asyncio
- async def test_await_if_required():
- def normal_func():
- return 123
- async def coro_func():
- return 456
- result = await utils.await_if_required(normal_func())
- assert result == 123
- result = await utils.await_if_required(coro_func())
- assert result == 456
- result = await utils.await_if_required(None)
- assert result == None
- @pytest.mark.asyncio
- async def test_gather_if_required():
- def normal_func():
- return 123
- async def coro_func():
- return 456
- raw_results = [normal_func(), normal_func(), normal_func()]
- results = await utils.gather_if_required(raw_results)
- assert results == [123, 123, 123]
- raw_results = [coro_func(), coro_func(), coro_func()]
- results = await utils.gather_if_required(raw_results)
- assert results == [456, 456, 456]
- raw_results = [coro_func(), normal_func(), None]
- results = await utils.gather_if_required(raw_results)
- assert results == [456, 123, None]
- raw_results = []
- results = await utils.gather_if_required(raw_results)
- assert results == []
- def test_order_events():
- now = datetime.now(timezone.utc)
- event_1_active_high_prio = objects.Event(event_descriptor=objects.EventDescriptor(event_id='event001',
- modification_number=0,
- created_date_time=now,
- event_status='far',
- priority=1,
- market_context='http://context01'),
- active_period=objects.ActivePeriod(dtstart=now - timedelta(minutes=5),
- duration=timedelta(minutes=10)),
- event_signals=[objects.EventSignal(intervals=[objects.Interval(dtstart=now,
- duration=timedelta(minutes=10),
- signal_payload=1)],
- signal_name='simple',
- signal_type='level',
- signal_id='signal001')],
- targets=[{'ven_id': 'ven001'}])
- event_2_active_low_prio = objects.Event(event_descriptor=objects.EventDescriptor(event_id='event001',
- modification_number=0,
- created_date_time=now,
- event_status='far',
- priority=2,
- market_context='http://context01'),
- active_period=objects.ActivePeriod(dtstart=now - timedelta(minutes=5),
- duration=timedelta(minutes=10)),
- event_signals=[objects.EventSignal(intervals=[objects.Interval(dtstart=now,
- duration=timedelta(minutes=10),
- signal_payload=1)],
- signal_name='simple',
- signal_type='level',
- signal_id='signal001')],
- targets=[{'ven_id': 'ven001'}])
- event_3_active_no_prio = objects.Event(event_descriptor=objects.EventDescriptor(event_id='event001',
- modification_number=0,
- created_date_time=now,
- event_status='far',
- market_context='http://context01'),
- active_period=objects.ActivePeriod(dtstart=now - timedelta(minutes=5),
- duration=timedelta(minutes=10)),
- event_signals=[objects.EventSignal(intervals=[objects.Interval(dtstart=now,
- duration=timedelta(minutes=10),
- signal_payload=1)],
- signal_name='simple',
- signal_type='level',
- signal_id='signal001')],
- targets=[{'ven_id': 'ven001'}])
- event_4_far_early = objects.Event(event_descriptor=objects.EventDescriptor(event_id='event001',
- modification_number=0,
- created_date_time=now,
- event_status='far',
- market_context='http://context01'),
- active_period=objects.ActivePeriod(dtstart=now + timedelta(minutes=5),
- duration=timedelta(minutes=10)),
- event_signals=[objects.EventSignal(intervals=[objects.Interval(dtstart=now,
- duration=timedelta(minutes=10),
- signal_payload=1)],
- signal_name='simple',
- signal_type='level',
- signal_id='signal001')],
- targets=[{'ven_id': 'ven001'}])
- event_5_far_later = objects.Event(event_descriptor=objects.EventDescriptor(event_id='event001',
- modification_number=0,
- created_date_time=now,
- event_status='far',
- market_context='http://context01'),
- active_period=objects.ActivePeriod(dtstart=now + timedelta(minutes=10),
- duration=timedelta(minutes=10)),
- event_signals=[objects.EventSignal(intervals=[objects.Interval(dtstart=now,
- duration=timedelta(minutes=10),
- signal_payload=1)],
- signal_name='simple',
- signal_type='level',
- signal_id='signal001')],
- targets=[{'ven_id': 'ven001'}])
- events = [event_5_far_later, event_4_far_early, event_3_active_no_prio, event_2_active_low_prio, event_1_active_high_prio]
- ordered_events = utils.order_events(events)
- assert ordered_events == [event_1_active_high_prio, event_2_active_low_prio, event_3_active_no_prio, event_4_far_early, event_5_far_later]
- ordered_events = utils.order_events(event_1_active_high_prio)
- assert ordered_events == [event_1_active_high_prio]
- event_1_as_dict = asdict(event_1_active_high_prio)
- ordered_events = utils.order_events(event_1_as_dict)
- assert ordered_events == [event_1_as_dict]
|