123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286 |
- from openleadr import OpenADRClient, OpenADRServer, enable_default_logging, objects, utils
- import pytest
- import asyncio
- import datetime
- from functools import partial
- enable_default_logging()
- def on_create_party_registration(registration_info):
- print("Registered party")
- return 'ven123', 'reg123'
- async def on_event(event):
- return 'optIn'
- async def on_event_opt_in(event, future=None):
- if future and future.done() is False:
- future.set_result(event)
- return 'optIn'
- async def on_update_event(event, futures):
- for future in futures:
- if future.done() is False:
- future.set_result(event)
- break
- return 'optIn'
- async def on_event_opt_out(event, futures):
- for future in futures:
- if future.done() is False:
- future.set_result(event)
- break
- return 'optOut'
- async def event_callback(ven_id, event_id, opt_type, future):
- if future.done() is False:
- future.set_result(opt_type)
- @pytest.mark.asyncio
- async def test_internal_message_queue():
- loop = asyncio.get_event_loop()
- client = OpenADRClient(ven_name='myven',
- vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
- client.add_handler('on_event', on_event)
- server = OpenADRServer(vtn_id='myvtn', requested_poll_freq=datetime.timedelta(seconds=1))
- server.add_handler('on_create_party_registration', on_create_party_registration)
- event_callback_future = loop.create_future()
- server.add_event(ven_id='ven123',
- signal_name='simple',
- signal_type='level',
- intervals=[{'dtstart': datetime.datetime.now(datetime.timezone.utc),
- 'duration': datetime.timedelta(seconds=3),
- 'signal_payload': 1}],
- callback=partial(event_callback, future=event_callback_future))
- await server.run_async()
- #await asyncio.sleep(1)
- await client.run()
- #await asyncio.sleep(1)
- status = await event_callback_future
- assert status == 'optIn'
- message_type, message_payload = await asyncio.wait_for(client.poll(), 0.5)
- assert message_type == 'oadrResponse'
- message_type, message_payload = await asyncio.wait_for(client.poll(), 0.5)
- assert message_type == 'oadrResponse'
- #await asyncio.sleep(1) # Wait for the event to be completed
- await client.stop()
- await server.stop()
- @pytest.mark.asyncio
- async def test_request_event():
- loop = asyncio.get_event_loop()
- client = OpenADRClient(ven_name='myven',
- vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
- server = OpenADRServer(vtn_id='myvtn', requested_poll_freq=datetime.timedelta(seconds=1))
- server.add_handler('on_create_party_registration', on_create_party_registration)
- event_id = server.add_event(ven_id='ven123',
- signal_name='simple',
- signal_type='level',
- intervals=[{'dtstart': datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(seconds=4),
- 'duration': datetime.timedelta(seconds=2),
- 'signal_payload': 1}],
- ramp_up_period=datetime.timedelta(seconds=2),
- callback=partial(event_callback))
- assert server.events['ven123'][0].event_descriptor.event_status == 'far'
- await server.run_async()
- await client.create_party_registration()
- message_type, message_payload = await client.request_event()
- assert message_type == 'oadrDistributeEvent'
- message_type, message_payload = await client.request_event()
- assert message_type == 'oadrDistributeEvent'
- await client.stop()
- await server.stop()
- @pytest.mark.asyncio
- async def test_raw_event():
- now = datetime.datetime.now(datetime.timezone.utc)
- server = OpenADRServer(vtn_id='myvtn')
- server.add_handler('on_create_party_registration', on_create_party_registration)
- event = objects.Event(event_descriptor=objects.EventDescriptor(event_id='event001',
- modification_number=0,
- event_status='far',
- market_context='http://marketcontext01'),
- event_signals=[objects.EventSignal(signal_id='signal001',
- signal_type='level',
- signal_name='simple',
- intervals=[objects.Interval(dtstart=now,
- duration=datetime.timedelta(minutes=10),
- signal_payload=1)]),
- objects.EventSignal(signal_id='signal002',
- signal_type='price',
- signal_name='ELECTRICITY_PRICE',
- intervals=[objects.Interval(dtstart=now,
- duration=datetime.timedelta(minutes=10),
- signal_payload=1)])],
- targets=[objects.Target(ven_id='ven123')])
- loop = asyncio.get_event_loop()
- event_callback_future = loop.create_future()
- server.add_raw_event(ven_id='ven123', event=event, callback=partial(event_callback, future=event_callback_future))
- on_event_future = loop.create_future()
- client = OpenADRClient(ven_name='myven',
- vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
- client.add_handler('on_event', partial(on_event_opt_in, future=on_event_future))
- await server.run_async()
- await client.run()
- event = await on_event_future
- assert len(event['event_signals']) == 2
- result = await event_callback_future
- assert result == 'optIn'
- await client.stop()
- await server.stop()
- @pytest.mark.asyncio
- async def test_create_event_with_future_as_callback():
- now = datetime.datetime.now(datetime.timezone.utc)
- server = OpenADRServer(vtn_id='myvtn')
- server.add_handler('on_create_party_registration', on_create_party_registration)
- event = objects.Event(event_descriptor=objects.EventDescriptor(event_id='event001',
- modification_number=0,
- event_status='far',
- market_context='http://marketcontext01'),
- event_signals=[objects.EventSignal(signal_id='signal001',
- signal_type='level',
- signal_name='simple',
- intervals=[objects.Interval(dtstart=now,
- duration=datetime.timedelta(minutes=10),
- signal_payload=1)]),
- objects.EventSignal(signal_id='signal002',
- signal_type='price',
- signal_name='ELECTRICITY_PRICE',
- intervals=[objects.Interval(dtstart=now,
- duration=datetime.timedelta(minutes=10),
- signal_payload=1)])],
- targets=[objects.Target(ven_id='ven123')])
- loop = asyncio.get_event_loop()
- event_callback_future = loop.create_future()
- server.add_raw_event(ven_id='ven123', event=event, callback=event_callback_future)
- on_event_future = loop.create_future()
- client = OpenADRClient(ven_name='myven',
- vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
- client.add_handler('on_event', partial(on_event_opt_in, future=on_event_future))
- await server.run_async()
- await client.run()
- event = await on_event_future
- assert len(event['event_signals']) == 2
- result = await event_callback_future
- assert result == 'optIn'
- await client.stop()
- await server.stop()
- @pytest.mark.asyncio
- async def test_multiple_events_in_queue():
- now = datetime.datetime.now(datetime.timezone.utc)
- server = OpenADRServer(vtn_id='myvtn')
- server.add_handler('on_create_party_registration', on_create_party_registration)
- loop = asyncio.get_event_loop()
- event_1_callback_future = loop.create_future()
- event_2_callback_future = loop.create_future()
- server.add_event(ven_id='ven123',
- signal_name='simple',
- signal_type='level',
- intervals=[objects.Interval(dtstart=now,
- duration=datetime.timedelta(seconds=1),
- signal_payload=1)],
- callback=event_1_callback_future)
- await server.run()
- on_event_future = loop.create_future()
- client = OpenADRClient(ven_name='ven123',
- vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
- await client.create_party_registration()
- response_type, response_payload = await client.request_event()
- assert response_type == 'oadrDistributeEvent'
- events = response_payload['events']
- assert len(events) == 1
- event_id = events[0]['event_descriptor']['event_id']
- request_id = response_payload['request_id']
- await client.created_event(request_id=request_id,
- event_id=event_id,
- opt_type='optIn',
- modification_number=0)
- server.add_event(ven_id='ven123',
- signal_name='simple',
- signal_type='level',
- intervals=[objects.Interval(dtstart=now + datetime.timedelta(seconds=1),
- duration=datetime.timedelta(seconds=1),
- signal_payload=1)],
- callback=event_2_callback_future)
- response_type, response_payload = await client.request_event()
- assert response_type == 'oadrDistributeEvent'
- events = response_payload['events']
- # Assert that we still have two events in the response
- assert len(events) == 2
- # Wait one second and retrieve the events again
- await asyncio.sleep(1)
- response_type, response_payload = await client.request_event()
- assert response_type == 'oadrDistributeEvent'
- events = response_payload['events']
- assert len(events) == 2
- assert events[1]['event_descriptor']['event_status'] == 'completed'
- response_type, response_payload = await client.request_event()
- assert response_type == 'oadrDistributeEvent'
- events = response_payload['events']
- assert len(events) == 1
- await asyncio.sleep(1)
- response_type, response_payload = await client.request_event()
- assert response_type == 'oadrDistributeEvent'
- response_type, response_payload = await client.request_event()
- assert response_type == 'oadrResponse'
- await server.stop()
- @pytest.mark.asyncio
- async def test_client_event_cleanup():
- now = datetime.datetime.now(datetime.timezone.utc)
- server = OpenADRServer(vtn_id='myvtn')
- server.add_handler('on_create_party_registration', on_create_party_registration)
- loop = asyncio.get_event_loop()
- event_1_callback_future = loop.create_future()
- event_2_callback_future = loop.create_future()
- server.add_event(ven_id='ven123',
- signal_name='simple',
- signal_type='level',
- intervals=[objects.Interval(dtstart=now,
- duration=datetime.timedelta(seconds=1),
- signal_payload=1)],
- callback=event_1_callback_future)
- await server.run()
- client = OpenADRClient(ven_name='ven123',
- vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
- client.add_handler('on_event', on_event_opt_in)
- await client.run()
- await asyncio.sleep(0.5)
- assert len(client.received_events) == 1
- await asyncio.sleep(0.5)
- await client._event_cleanup()
- assert len(client.received_events) == 0
- await server.stop()
- await client.stop()
|