123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399 |
- from openleadr import OpenADRClient, OpenADRServer, enable_default_logging, utils, messaging
- import pytest
- from functools import partial
- import asyncio
- from datetime import datetime, timedelta, timezone
- import logging
- enable_default_logging()
- async def on_create_party_registration(ven_name):
- return 'venid', 'regid'
- async def on_event_accepted(ven_id, event_id, opt_type, future=None):
- if future and future.done() is False:
- future.set_result(opt_type)
- async def good_on_event(event):
- return 'optIn'
- async def faulty_on_event(event):
- return None
- async def broken_on_event(event):
- raise KeyError("BOOM")
- @pytest.mark.asyncio
- async def test_client_no_event_handler(caplog):
- caplog.set_level(logging.WARNING)
- logger = logging.getLogger('openleadr')
- logger.setLevel(logging.DEBUG)
- client = OpenADRClient(ven_name='myven',
- vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
- server = OpenADRServer(vtn_id='myvtn', requested_poll_freq=timedelta(seconds=1))
- server.add_handler('on_create_party_registration', on_create_party_registration)
- print("Running server")
- await server.run_async()
- # await asyncio.sleep(0.1)
- print("Running client")
- await client.run()
- event_confirm_future = asyncio.get_event_loop().create_future()
- print("Adding event")
- server.add_event(ven_id='venid',
- event_id='test_client_no_event_handler',
- signal_name='simple',
- signal_type='level',
- intervals=[{'dtstart': datetime.now(timezone.utc),
- 'duration': timedelta(seconds=1),
- 'signal_payload': 1.1}],
- target={'ven_id': 'venid'},
- callback=partial(on_event_accepted, future=event_confirm_future))
- print("Waiting for a response to the event")
- result = await event_confirm_future
- assert result == 'optOut'
- assert ("You should implement your own on_event handler. This handler receives "
- "an Event dict and should return either 'optIn' or 'optOut' based on your "
- "choice. Will opt out of the event for now.") in [rec.message for rec in caplog.records]
- await client.stop()
- await server.stop()
- await asyncio.gather(*[t for t in asyncio.all_tasks()][1:])
- @pytest.mark.asyncio
- async def test_client_faulty_event_handler(caplog):
- caplog.set_level(logging.WARNING)
- logger = logging.getLogger('openleadr')
- logger.setLevel(logging.DEBUG)
- client = OpenADRClient(ven_name='myven',
- vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
- client.add_handler('on_event', faulty_on_event)
- server = OpenADRServer(vtn_id='myvtn', requested_poll_freq=timedelta(seconds=1))
- server.add_handler('on_create_party_registration', on_create_party_registration)
- print("Running server")
- await server.run_async()
- # await asyncio.sleep(0.1)
- print("Running client")
- await client.run()
- event_confirm_future = asyncio.get_event_loop().create_future()
- print("Adding event")
- server.add_event(ven_id='venid',
- event_id='test_client_faulty_event_handler',
- signal_name='simple',
- signal_type='level',
- intervals=[{'dtstart': datetime.now(timezone.utc),
- 'duration': timedelta(seconds=1),
- 'signal_payload': 1.1}],
- target={'ven_id': 'venid'},
- callback=partial(on_event_accepted, future=event_confirm_future))
- print("Waiting for a response to the event")
- result = await event_confirm_future
- assert result == 'optOut'
- assert ("Your on_event or on_update_event handler must return 'optIn' or 'optOut'; "
- f"you supplied {None}. Please fix your on_event handler.") in [rec.message for rec in caplog.records]
- await client.stop()
- await server.stop()
- await asyncio.gather(*[t for t in asyncio.all_tasks()][1:])
- @pytest.mark.asyncio
- async def test_client_exception_event_handler(caplog):
- caplog.set_level(logging.WARNING)
- logger = logging.getLogger('openleadr')
- logger.setLevel(logging.DEBUG)
- client = OpenADRClient(ven_name='myven',
- vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
- client.add_handler('on_event', broken_on_event)
- server = OpenADRServer(vtn_id='myvtn', requested_poll_freq=timedelta(seconds=1))
- server.add_handler('on_create_party_registration', on_create_party_registration)
- print("Running server")
- await server.run_async()
- # await asyncio.sleep(0.1)
- print("Running client")
- await client.run()
- event_confirm_future = asyncio.get_event_loop().create_future()
- print("Adding event")
- server.add_event(ven_id='venid',
- event_id='test_client_exception_event_handler',
- signal_name='simple',
- signal_type='level',
- intervals=[{'dtstart': datetime.now(timezone.utc),
- 'duration': timedelta(seconds=1),
- 'signal_payload': 1.1}],
- target={'ven_id': 'venid'},
- callback=partial(on_event_accepted, future=event_confirm_future))
- print("Waiting for a response to the event")
- result = await event_confirm_future
- assert result == 'optOut'
- err = KeyError("BOOM")
- assert ("Your on_event handler encountered an error. Will Opt Out of the event. "
- f"The error was {err.__class__.__name__}: {str(err)}") in [rec.message for rec in caplog.records]
- await client.stop()
- await server.stop()
- await asyncio.gather(*[t for t in asyncio.all_tasks()][1:])
- @pytest.mark.asyncio
- async def test_client_good_event_handler(caplog):
- caplog.set_level(logging.WARNING)
- logger = logging.getLogger('openleadr')
- logger.setLevel(logging.DEBUG)
- client = OpenADRClient(ven_name='myven',
- vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
- client.add_handler('on_event', good_on_event)
- server = OpenADRServer(vtn_id='myvtn', requested_poll_freq=timedelta(seconds=1))
- server.add_handler('on_create_party_registration', on_create_party_registration)
- print("Running server")
- await server.run_async()
- # await asyncio.sleep(0.1)
- print("Running client")
- await client.run()
- event_confirm_future = asyncio.get_event_loop().create_future()
- print("Adding event")
- server.add_event(ven_id='venid',
- event_id='test_client_good_event_handler',
- signal_name='simple',
- signal_type='level',
- intervals=[{'dtstart': datetime.now(timezone.utc),
- 'duration': timedelta(seconds=1),
- 'signal_payload': 1.1}],
- target={'ven_id': 'venid'},
- callback=partial(on_event_accepted, future=event_confirm_future))
- print("Waiting for a response to the event")
- result = await event_confirm_future
- assert result == 'optIn'
- assert len(caplog.records) == 0
- await client.stop()
- await server.stop()
- # await asyncio.sleep(1)
- @pytest.mark.asyncio
- async def test_server_warning_conflicting_poll_methods(caplog):
- caplog.set_level(logging.WARNING)
- logger = logging.getLogger('openleadr')
- logger.setLevel(logging.DEBUG)
- server = OpenADRServer(vtn_id='myvtn', requested_poll_freq=timedelta(seconds=1))
- server.add_handler('on_poll', print)
- server.add_event(ven_id='venid',
- event_id='test_server_warning_conflicting_poll_methods',
- signal_name='simple',
- signal_type='level',
- intervals=[{'dtstart': datetime.now(timezone.utc),
- 'duration': timedelta(seconds=1),
- 'signal_payload': 1.1}],
- target={'ven_id': 'venid'},
- callback=on_event_accepted)
- assert ("You cannot use the add_event method after you assign your own on_poll "
- "handler. If you use your own on_poll handler, you are responsible for "
- "delivering events from that handler. If you want to use OpenLEADRs "
- "message queuing system, you should not assign an on_poll handler. "
- "Your Event will NOT be added.") in [record.msg for record in caplog.records]
- @pytest.mark.asyncio
- async def test_server_warning_naive_datetimes_in_event(caplog):
- caplog.set_level(logging.WARNING)
- logger = logging.getLogger('openleadr')
- logger.setLevel(logging.DEBUG)
- server = OpenADRServer(vtn_id='myvtn', requested_poll_freq=timedelta(seconds=1))
- server.add_event(ven_id='venid',
- event_id='test_server_warning_naive_datetimes_in_event',
- signal_name='simple',
- signal_type='level',
- intervals=[{'dtstart': datetime.now(),
- 'duration': timedelta(seconds=1),
- 'signal_payload': 1.1}],
- target={'ven_id': 'venid'},
- callback=on_event_accepted)
- assert ("You supplied a naive datetime object to your interval's dtstart. "
- "This will be interpreted as a timestamp in your local timezone "
- "and then converted to UTC before sending. Please supply timezone-"
- "aware timestamps like datetime.datetime.new(timezone.utc) or "
- "datetime.datetime(..., tzinfo=datetime.timezone.utc)") in [record.msg for record in caplog.records]
- def test_event_with_wrong_response_required(caplog):
- now = datetime.now(timezone.utc)
- event = {'active_period': {'dtstart': now, 'duration': timedelta(seconds=10)},
- 'event_descriptor': {'event_id': 'event123',
- 'modification_number': 1,
- 'priority': 0,
- 'event_status': 'far',
- 'created_date_time': now},
- 'event_signals': [{'signal_name': 'simple',
- 'signal_type': 'level',
- 'intervals': [{'dtstart': now,
- 'duration': timedelta(seconds=10),
- 'signal_payload': 1}]}],
- 'targets': [{'ven_id': 'ven123'}],
- 'response_required': 'blabla'}
- msg = messaging.create_message('oadrDistributeEvent', events=[event])
- assert ("The response_required property in an Event should be "
- "'never' or 'always', not blabla. Changing to 'always'.") in caplog.messages
- message_type, message_payload= messaging.parse_message(msg)
- assert message_payload['events'][0]['response_required'] == 'always'
- def test_event_missing_created_date_time(caplog):
- now = datetime.now(timezone.utc)
- event = {'active_period': {'dtstart': now, 'duration': timedelta(seconds=10)},
- 'event_descriptor': {'event_id': 'event123',
- 'modification_number': 1,
- 'priority': 0,
- 'event_status': 'far'},
- 'event_signals': [{'signal_name': 'simple',
- 'signal_type': 'level',
- 'intervals': [{'dtstart': now,
- 'duration': timedelta(seconds=10),
- 'signal_payload': 1}]}],
- 'targets': [{'ven_id': 'ven123'}],
- 'response_required': 'always'}
- msg = messaging.create_message('oadrDistributeEvent', events=[event])
- assert ("Your event descriptor did not contain a created_date_time. "
- "This will be automatically added.") in caplog.messages
- def test_event_incongruent_targets(caplog):
- now = datetime.now(timezone.utc)
- event = {'active_period': {'dtstart': now, 'duration': timedelta(seconds=10)},
- 'event_descriptor': {'event_id': 'event123',
- 'modification_number': 1,
- 'priority': 0,
- 'event_status': 'far',
- 'created_date_time': now},
- 'event_signals': [{'signal_name': 'simple',
- 'signal_type': 'level',
- 'intervals': [{'dtstart': now,
- 'duration': timedelta(seconds=10),
- 'signal_payload': 1}]}],
- 'targets': [{'ven_id': 'ven123'}],
- 'targets_by_type': {'ven_id': ['ven456']},
- 'response_required': 'always'}
- with pytest.raises(ValueError) as err:
- msg = messaging.create_message('oadrDistributeEvent', events=[event])
- assert str(err.value) == ("You assigned both 'targets' and 'targets_by_type' in your event, "
- "but the two were not consistent with each other. "
- f"You supplied 'targets' = {event['targets']} and "
- f"'targets_by_type' = {event['targets_by_type']}")
- def test_event_only_targets_by_type(caplog):
- now = datetime.now(timezone.utc)
- event = {'active_period': {'dtstart': now, 'duration': timedelta(seconds=10)},
- 'event_descriptor': {'event_id': 'event123',
- 'modification_number': 1,
- 'priority': 0,
- 'event_status': 'far',
- 'created_date_time': now},
- 'event_signals': [{'signal_name': 'simple',
- 'signal_type': 'level',
- 'intervals': [{'dtstart': now,
- 'duration': timedelta(seconds=10),
- 'signal_payload': 1}]}],
- 'targets_by_type': {'ven_id': ['ven456']},
- 'response_required': 'always'}
- msg = messaging.create_message('oadrDistributeEvent', events=[event])
- message_type, message_payload = messaging.parse_message(msg)
- assert message_payload['events'][0]['targets'] == [{'ven_id': 'ven456'}]
- @pytest.mark.asyncio
- async def test_client_warning_no_update_event_handler(caplog):
- caplog.set_level(logging.WARNING)
- logger = logging.getLogger('openleadr')
- logger.setLevel(logging.DEBUG)
- server = OpenADRServer(vtn_id='myvtn', requested_poll_freq=timedelta(seconds=1))
- server.add_handler('on_create_party_registration', on_create_party_registration)
- event_accepted_future = asyncio.get_event_loop().create_future()
- server.add_event(ven_id='venid',
- event_id='test_client_warning_no_update_event_handler',
- signal_name='simple',
- signal_type='level',
- intervals=[{'dtstart': datetime.now(timezone.utc),
- 'duration': timedelta(seconds=1),
- 'signal_payload': 1.1}],
- target={'ven_id': 'venid'},
- callback=event_accepted_future)
- client = OpenADRClient(ven_name='myven',
- vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
- client.add_handler('on_event', good_on_event)
- print("Starting server")
- await server.run()
- await client.run()
- print("Waiting for first event to be accepted...")
- await event_accepted_future
- # Manually update the event
- server.events['venid'][0].event_descriptor.modification_number = 1
- server.events_updated['venid'] = True
- await asyncio.sleep(1)
- assert ("You should implement your own on_update_event handler. This handler receives "
- "an Event dict and should return either 'optIn' or 'optOut' based on your "
- "choice. Will re-use the previous opt status for this event_id for now") in [record.msg for record in caplog.records]
- await client.stop()
- await server.stop()
- @pytest.mark.asyncio
- async def test_server_add_event_with_wrong_callback_signature(caplog):
- def dummy_callback(some_param):
- pass
- caplog.set_level(logging.WARNING)
- logger = logging.getLogger('openleadr')
- logger.setLevel(logging.DEBUG)
- server = OpenADRServer(vtn_id='myvtn', requested_poll_freq=timedelta(seconds=1))
- with pytest.raises(ValueError) as err:
- server.add_event(ven_id='venid',
- event_id='test_server_add_event_with_wrong_callback_signature',
- signal_name='simple',
- signal_type='level',
- intervals=[{'dtstart': datetime.now(timezone.utc),
- 'duration': timedelta(seconds=1),
- 'signal_payload': 1.1}],
- target={'ven_id': 'venid'},
- callback=dummy_callback)
- @pytest.mark.asyncio
- async def test_server_add_event_with_no_callback(caplog):
- def dummy_callback(some_param):
- pass
- caplog.set_level(logging.WARNING)
- logger = logging.getLogger('openleadr')
- logger.setLevel(logging.DEBUG)
- server = OpenADRServer(vtn_id='myvtn')
- server.add_event(ven_id='venid',
- event_id='test_server_add_event_with_no_callback',
- signal_name='simple',
- signal_type='level',
- intervals=[{'dtstart': datetime.now(timezone.utc),
- 'duration': timedelta(seconds=1),
- 'signal_payload': 1.1}],
- target={'ven_id': 'venid'})
- assert ("You did not provide a 'callback', which means you won't know if the "
- "VEN will opt in or opt out of your event. You should consider adding "
- "a callback for this.") in caplog.messages
- @pytest.mark.asyncio
- async def test_server_add_event_with_no_callback_response_never_required(caplog):
- caplog.set_level(logging.WARNING)
- logger = logging.getLogger('openleadr')
- logger.setLevel(logging.DEBUG)
- server = OpenADRServer(vtn_id='myvtn')
- server.add_event(ven_id='venid',
- event_id='test_server_add_event_with_no_callback_response_never_required',
- signal_name='simple',
- signal_type='level',
- intervals=[{'dtstart': datetime.now(timezone.utc),
- 'duration': timedelta(seconds=1),
- 'signal_payload': 1.1}],
- target={'ven_id': 'venid'},
- response_required='never')
- await server.run()
- await server.stop()
- assert ("You did not provide a 'callback', which means you won't know if the "
- "VEN will opt in or opt out of your event. You should consider adding "
- "a callback for this.") not in caplog.messages
|