123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635 |
- from openleadr import OpenADRClient, OpenADRServer, enable_default_logging
- import asyncio
- import pytest
- import aiohttp
- from datetime import datetime, timedelta
- from functools import partial
- import logging
- from random import random
- import time
- from openleadr.messaging import create_message
- loop = asyncio.get_event_loop()
- loop.set_debug(True)
- enable_default_logging()
- async def collect_data(future=None):
- print("Collect Data")
- value = 100 * random()
- if future:
- future.set_result(value)
- return value
- async def lookup_ven(ven_name=None, ven_id=None):
- """
- Look up a ven by its name or ID
- """
- return {'ven_id': '1234'}
- async def receive_data(data, future=None):
- if future:
- future.set_result(data)
- pass
- async def on_update_report(report, futures=None):
- if futures:
- for future in futures:
- if future.done() is False:
- future.set_result(report)
- break
- pass
- async def on_register_report(ven_id, resource_id, measurement, unit, scale,
- min_sampling_interval, max_sampling_interval, bundling=1, futures=None, receive_futures=None):
- """
- Deal with this report.
- """
- print(f"Called on register report {ven_id}, {resource_id}, {measurement}, {unit}, {scale}, {min_sampling_interval}, {max_sampling_interval}")
- assert resource_id in ('Device001', 'Device002')
- if futures:
- futures.pop(0).set_result(True)
- if receive_futures:
- callback = partial(receive_data, future=receive_futures.pop(0))
- else:
- callback = receive_data
- if bundling > 1:
- print(f"Returning from on register report {callback}, {min_sampling_interval}, {bundling * min_sampling_interval}")
- return callback, min_sampling_interval, bundling * min_sampling_interval
- print(f"Returning from on register report {callback}, {min_sampling_interval}")
- return callback, min_sampling_interval
- async def on_register_report_full(report, futures=None):
- """
- Deal with this report.
- """
- if futures:
- futures.pop().set_result(True)
- granularity = min(*[rd['sampling_rate']['min_period'] for rd in report['report_descriptions']])
- report_requests = [(rd['r_id'], on_update_report, granularity) for rd in report['report_descriptions'] if report['report_name'] == 'METADATA_TELEMETRY_USAGE']
- return report_requests
- async def on_create_party_registration(ven_name, future=None):
- if future:
- future.set_result(True)
- ven_id = '1234'
- registration_id = 'abcd'
- return ven_id, registration_id
- @pytest.mark.asyncio
- async def test_report_registration():
- """
- Test the registration of two reports with two r_ids each.
- """
- # Create a server
- logger = logging.getLogger('openleadr')
- logger.setLevel(logging.DEBUG)
- server = OpenADRServer(vtn_id='testvtn')
- server.add_handler('on_register_report', on_register_report)
- server.add_handler('on_create_party_registration', on_create_party_registration)
- # Create a client
- client = OpenADRClient(ven_name='myven', vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b',)
- # Add 4 reports
- client.add_report(callback=collect_data,
- report_specifier_id='CurrentReport',
- resource_id='Device001',
- measurement='current',
- unit='A')
- client.add_report(callback=collect_data,
- report_specifier_id='CurrentReport',
- resource_id='Device002',
- measurement='current',
- unit='A')
- client.add_report(callback=collect_data,
- report_specifier_id='VoltageReport',
- resource_id='Device001',
- measurement='voltage',
- unit='V')
- client.add_report(callback=collect_data,
- report_specifier_id='VoltageReport',
- resource_id='Device002',
- measurement='voltage',
- unit='V')
- asyncio.create_task(server.run_async())
- await asyncio.sleep(1)
- # Register the client
- await client.create_party_registration()
- # Register the reports
- await client.register_reports(client.reports)
- assert len(client.report_requests) == 2
- assert len(server.services['report_service'].report_callbacks) == 4
- await client.stop()
- await server.stop()
- async def collect_status():
- return 1
- @pytest.mark.asyncio
- async def test_report_registration_with_status_report():
- """
- Test the registration of two reports with two r_ids each.
- """
- # Create a server
- logger = logging.getLogger('openleadr')
- logger.setLevel(logging.DEBUG)
- server = OpenADRServer(vtn_id='testvtn')
- server.add_handler('on_register_report', on_register_report)
- server.add_handler('on_create_party_registration', on_create_party_registration)
- # Create a client
- client = OpenADRClient(ven_name='myven', vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b',)
- # Add 4 reports
- client.add_report(callback=collect_data,
- report_specifier_id='CurrentReport',
- resource_id='Device001',
- measurement='current',
- unit='A')
- client.add_report(callback=collect_data,
- report_specifier_id='CurrentReport',
- resource_id='Device002',
- measurement='current',
- unit='A')
- client.add_report(callback=collect_data,
- report_specifier_id='VoltageReport',
- resource_id='Device001',
- measurement='voltage',
- unit='V')
- client.add_report(callback=collect_data,
- report_specifier_id='VoltageReport',
- resource_id='Device002',
- measurement='voltage',
- unit='V')
- client.add_report(callback=collect_status,
- report_name='TELEMETRY_STATUS',
- report_specifier_id='StatusReport',
- resource_id='Device001')
- asyncio.create_task(server.run_async())
- await asyncio.sleep(1)
- # Register the client
- await client.create_party_registration()
- # Register the reports
- await client.register_reports(client.reports)
- assert len(client.report_requests) == 3
- assert len(server.services['report_service'].report_callbacks) == 5
- await client.stop()
- await server.stop()
- @pytest.mark.asyncio
- async def test_report_registration_full():
- """
- Test the registration of two reports with two r_ids each.
- """
- # Create a server
- logger = logging.getLogger('openleadr')
- logger.setLevel(logging.DEBUG)
- server = OpenADRServer(vtn_id='testvtn')
- server.add_handler('on_register_report', on_register_report_full)
- server.add_handler('on_create_party_registration', on_create_party_registration)
- # Create a client
- client = OpenADRClient(ven_name='myven', vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
- # Add 4 reports
- client.add_report(callback=collect_data,
- report_specifier_id='PowerReport',
- resource_id='Device001',
- measurement='power_real',
- unit='W')
- client.add_report(callback=collect_data,
- report_specifier_id='PowerReport',
- resource_id='Device002',
- measurement='power_real',
- unit='W')
- client.add_report(callback=collect_data,
- report_specifier_id='VoltageReport',
- resource_id='Device001',
- measurement='voltage',
- unit='V')
- client.add_report(callback=collect_data,
- report_specifier_id='VoltageReport',
- resource_id='Device002',
- measurement='voltage',
- unit='V')
- await server.run_async()
- await asyncio.sleep(0.1)
- # Register the client
- await client.create_party_registration()
- # Register the reports
- await client.register_reports(client.reports)
- assert len(client.report_requests) == 2
- assert len(server.services['report_service'].report_callbacks) == 4
- await client.stop()
- await server.stop()
- @pytest.mark.asyncio
- async def test_update_reports():
- """
- Tests the timely delivery of requested reports
- """
- # Create a server
- logger = logging.getLogger('openleadr')
- logger.setLevel(logging.DEBUG)
- loop = asyncio.get_event_loop()
- server = OpenADRServer(vtn_id='testvtn')
- register_report_future_1 = loop.create_future()
- register_report_future_2 = loop.create_future()
- register_report_futures = [register_report_future_1, register_report_future_2]
- receive_report_future_1 = loop.create_future()
- receive_report_future_2 = loop.create_future()
- receive_report_future_3 = loop.create_future()
- receive_report_future_4 = loop.create_future()
- receive_report_futures = [receive_report_future_1, receive_report_future_2, receive_report_future_3, receive_report_future_4]
- server.add_handler('on_register_report', partial(on_register_report, futures=register_report_futures, receive_futures=receive_report_futures))
- party_future = loop.create_future()
- server.add_handler('on_create_party_registration', partial(on_create_party_registration, future=party_future))
- # Create a client
- client = OpenADRClient(ven_name='myven', vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
- # Add 4 reports
- future_1 = loop.create_future()
- client.add_report(callback=partial(collect_data, future=future_1),
- report_specifier_id='PowerReport',
- resource_id='Device001',
- measurement='power_real',
- sampling_rate=timedelta(seconds=2),
- unit='W')
- future_2 = loop.create_future()
- client.add_report(callback=partial(collect_data, future=future_2),
- report_specifier_id='PowerReport',
- resource_id='Device002',
- measurement='power_real',
- sampling_rate=timedelta(seconds=2),
- unit='W')
- future_3 = loop.create_future()
- client.add_report(callback=partial(collect_data, future=future_3),
- report_specifier_id='VoltageReport',
- resource_id='Device001',
- measurement='voltage',
- sampling_rate=timedelta(seconds=2),
- unit='V')
- future_4 = loop.create_future()
- client.add_report(callback=partial(collect_data, future=future_4),
- report_specifier_id='VoltageReport',
- resource_id='Device002',
- measurement='voltage',
- sampling_rate=timedelta(seconds=2),
- unit='V')
- assert len(client.reports) == 2
- asyncio.create_task(server.run_async())
- await asyncio.sleep(1)
- # Run the client asynchronously
- print("Running the client")
- asyncio.create_task(client.run())
- print("Awaiting party future")
- await party_future
- print("Awaiting report futures")
- await asyncio.gather(register_report_future_1, register_report_future_2)
- await asyncio.sleep(0.1)
- assert len(server.services['report_service'].report_callbacks) == 4
- print("Awaiting data collection futures")
- await future_1
- await future_2
- await future_3
- await future_4
- print("Awaiting update report futures")
- await asyncio.gather(receive_report_future_1, receive_report_future_2, receive_report_future_3, receive_report_future_4)
- print("Done gathering")
- assert receive_report_future_1.result()[0][1] == future_1.result()
- assert receive_report_future_2.result()[0][1] == future_2.result()
- assert receive_report_future_3.result()[0][1] == future_3.result()
- assert receive_report_future_4.result()[0][1] == future_4.result()
- await client.stop()
- await server.stop()
- async def get_historic_data(date_from, date_to):
- pass
- async def collect_data_multi(futures=None):
- print("Data Collected")
- if futures:
- for i, future in enumerate(futures):
- if future.done() is False:
- print(f"Marking future {i} as done")
- future.set_result(True)
- break
- return 3.14
- @pytest.mark.asyncio
- async def test_incremental_reports():
- loop = asyncio.get_event_loop()
- client = OpenADRClient(ven_name='myven', vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
- collect_futures = [loop.create_future() for i in range(2)]
- client.add_report(callback=partial(collect_data_multi, futures=collect_futures),
- report_specifier_id='myhistory',
- measurement='voltage',
- resource_id='Device001',
- sampling_rate=timedelta(seconds=2))
- server = OpenADRServer(vtn_id='myvtn')
- register_report_future = loop.create_future()
- update_report_future = loop.create_future()
- server.add_handler('on_register_report', partial(on_register_report,
- bundling=2,
- futures=[register_report_future],
- receive_futures=[update_report_future]))
- party_future = loop.create_future()
- server.add_handler('on_create_party_registration',
- partial(on_create_party_registration, future=party_future))
- loop.create_task(server.run_async())
- await asyncio.sleep(1)
- await client.run()
- print("Awaiting party future")
- await party_future
- print("Awaiting register report future")
- await register_report_future
- print("Awaiting first data collection future... ", end="")
- await collect_futures[0]
- print("check")
- await asyncio.sleep(1)
- print("Checking that the report was not yet sent... ", end="")
- assert update_report_future.done() is False
- print("check")
- print("Awaiting data collection second future... ", end="")
- await collect_futures[1]
- print("check")
- print("Awaiting report update future")
- result = await update_report_future
- assert len(result) == 2
- await server.stop()
- await client.stop()
- await asyncio.sleep(0)
- async def collect_data_history(date_from, date_to, sampling_interval, futures):
- data = [(date_from, 1.0), (date_to, 2.0)]
- if futures:
- for future in futures:
- if future.done() is False:
- future.set_result(data)
- break
- return data
- @pytest.mark.asyncio
- async def test_update_report_data_collection_mode_full():
- loop = asyncio.get_event_loop()
- client = OpenADRClient(ven_name='myven', vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
- data_collection_future = loop.create_future()
- client.add_report(callback=partial(collect_data_history, futures=[data_collection_future]),
- resource_id='Device001',
- measurement='power_real',
- data_collection_mode='full',
- sampling_rate=timedelta(seconds=1),
- unit='W')
- report_register_future = loop.create_future()
- report_received_future = loop.create_future()
- party_registration_future = loop.create_future()
- server = OpenADRServer(vtn_id='myvtn')
- server.add_handler('on_create_party_registration', partial(on_create_party_registration, future=party_registration_future))
- server.add_handler('on_register_report', partial(on_register_report,
- bundling=2,
- futures=[report_register_future],
- receive_futures=[report_received_future]))
- await server.run_async()
- await asyncio.sleep(0.1)
- print(f"The time is now {datetime.now()}")
- t = time.time()
- wait_for = int(t/2) * 2 + 2 - t
- await asyncio.sleep(wait_for)
- print(f"The time is now {datetime.now()}, running client")
- await client.run()
- await party_registration_future
- await report_register_future
- await asyncio.sleep(1)
- print(f"The time is now {datetime.now()}, checking if report was triggered")
- assert data_collection_future.done() is False
- print("Waiting for the data collection to occur")
- await data_collection_future
- print("Waiting for the report to be received")
- await report_received_future
- print("Done")
- await server.stop()
- await client.stop()
- def test_add_report_invalid_unit(caplog):
- client = OpenADRClient(ven_name='myven', vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
- client.add_report(callback=print,
- report_specifier_id='myreport',
- measurement='voltage',
- resource_id='Device001',
- sampling_rate=timedelta(seconds=10),
- unit='A')
- assert caplog.record_tuples == [("openleadr", logging.WARNING, f"The supplied unit A for measurement voltage will be ignored, V will be used instead. Allowed units for this measurement are: V")]
- def test_add_report_invalid_scale():
- client = OpenADRClient(ven_name='myven', vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
- with pytest.raises(ValueError):
- client.add_report(callback=print,
- report_specifier_id='myreport',
- measurement='power_real',
- resource_id='Device001',
- sampling_rate=timedelta(seconds=10),
- unit='W',
- scale='xxx')
- def test_add_report_invalid_description(caplog):
- client = OpenADRClient(ven_name='myven', vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
- client.add_report(callback=print,
- report_specifier_id='myreport',
- measurement={'name': 'voltage', 'description': 'SomethingWrong', 'unit': 'V'},
- resource_id='Device001',
- sampling_rate=timedelta(seconds=10))
- msg = create_message('oadrRegisterReport', reports=client.reports)
- def test_add_report_invalid_description(caplog):
- client = OpenADRClient(ven_name='myven', vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
- with pytest.raises(ValueError):
- client.add_report(callback=print,
- report_specifier_id='myreport',
- measurement={'name': 'voltage', 'description': 'SomethingWrong', 'unit': 'V'},
- resource_id='Device001',
- sampling_rate=timedelta(seconds=10))
- def test_add_report_non_standard_measurement():
- client = OpenADRClient(ven_name='myven', vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
- client.add_report(callback=print,
- report_specifier_id='myreport',
- measurement='rainbows',
- resource_id='Device001',
- sampling_rate=timedelta(seconds=10),
- unit='A')
- assert len(client.reports) == 1
- assert client.reports[0].report_descriptions[0].measurement.name == 'customUnit'
- assert client.reports[0].report_descriptions[0].measurement.description == 'rainbows'
- async def test_report_registration_broken_handlers(caplog):
- msg = """<?xml version="1.0" encoding="UTF-8" standalone="no" ?>
- <p1:oadrPayload xmlns:p1="http://openadr.org/oadr-2.0b/2012/07">
- <p1:oadrSignedObject>
- <p1:oadrRegisterReport xmlns:p3="http://docs.oasis-open.org/ns/energyinterop/201110" p3:schemaVersion="2.0b" xmlns:p2="http://docs.oasis-open.org/ns/energyinterop/201110/payloads">
- <p2:requestID>B8A6E0D2D4</p2:requestID>
- <p1:oadrReport xmlns:p3="urn:ietf:params:xml:ns:icalendar-2.0" xmlns:p4="http://docs.oasis-open.org/ns/energyinterop/201110">
- <p3:duration>
- <p3:duration>PT120M</p3:duration>
- </p3:duration>
- <p1:oadrReportDescription xmlns:p4="http://docs.oasis-open.org/ns/energyinterop/201110" xmlns:p5="http://docs.oasis-open.org/ns/emix/2011/06/power" xmlns:p6="http://docs.oasis-open.org/ns/emix/2011/06">
- <p4:rID>rid_energy_4184bb93</p4:rID>
- <p4:reportDataSource>
- <p4:resourceID>DEVICE1</p4:resourceID>
- </p4:reportDataSource>
- <p4:reportType>reading</p4:reportType>
- <p5:energyReal xmlns:p6="http://docs.oasis-open.org/ns/emix/2011/06/siscale">
- <p5:itemDescription/>
- <p5:itemUnits>Wh</p5:itemUnits>
- <p6:siScaleCode>none</p6:siScaleCode>
- </p5:energyReal>
- <p4:readingType>Direct Read</p4:readingType>
- <p6:marketContext/>
- <p1:oadrSamplingRate>
- <p1:oadrMinPeriod>PT1M</p1:oadrMinPeriod>
- <p1:oadrMaxPeriod>PT1M</p1:oadrMaxPeriod>
- <p1:oadrOnChange>false</p1:oadrOnChange>
- </p1:oadrSamplingRate>
- </p1:oadrReportDescription>
- <p1:oadrReportDescription xmlns:p4="http://docs.oasis-open.org/ns/energyinterop/201110" xmlns:p5="http://docs.oasis-open.org/ns/emix/2011/06/power" xmlns:p6="http://docs.oasis-open.org/ns/emix/2011/06">
- <p4:rID>rid_power_4184bb93</p4:rID>
- <p4:reportDataSource>
- <p4:resourceID>DEVICE1</p4:resourceID>
- </p4:reportDataSource>
- <p4:reportType>reading</p4:reportType>
- <p5:powerReal xmlns:p6="http://docs.oasis-open.org/ns/emix/2011/06/siscale">
- <p5:itemDescription/>
- <p5:itemUnits>W</p5:itemUnits>
- <p6:siScaleCode>none</p6:siScaleCode>
- <p5:powerAttributes>
- <p5:hertz>60</p5:hertz>
- <p5:voltage>120</p5:voltage>
- <p5:ac>true</p5:ac>
- </p5:powerAttributes>
- </p5:powerReal>
- <p4:readingType>Direct Read</p4:readingType>
- <p6:marketContext/>
- <p1:oadrSamplingRate>
- <p1:oadrMinPeriod>PT1M</p1:oadrMinPeriod>
- <p1:oadrMaxPeriod>PT1M</p1:oadrMaxPeriod>
- <p1:oadrOnChange>false</p1:oadrOnChange>
- </p1:oadrSamplingRate>
- </p1:oadrReportDescription>
- <p4:reportRequestID>0</p4:reportRequestID>
- <p4:reportSpecifierID>DEMO_TELEMETRY_USAGE</p4:reportSpecifierID>
- <p4:reportName>METADATA_TELEMETRY_USAGE</p4:reportName>
- <p4:createdDateTime>2020-12-15T14:10:32Z</p4:createdDateTime>
- </p1:oadrReport>
- <p3:venID>ven_id</p3:venID>
- </p1:oadrRegisterReport>
- </p1:oadrSignedObject>
- </p1:oadrPayload>"""
- server = OpenADRServer(vtn_id='myvtn')
- await server.run()
- # Test with no configured callbacks
- from aiohttp import ClientSession
- async with ClientSession() as session:
- async with session.post("http://localhost:8080/OpenADR2/Simple/2.0b/EiReport",
- headers={'content-type': 'Application/XML'},
- data=msg.encode('utf-8')) as resp:
- assert resp.status == 200
- # Test with a working callback
- def report_callback(data):
- print(data)
- def working_on_register_report(ven_id, resource_id, measurement, unit, scale, min_sampling_interval, max_sampling_interval):
- return report_callback, min_sampling_interval
- server.add_handler('on_register_report', working_on_register_report)
- async with ClientSession() as session:
- async with session.post("http://localhost:8080/OpenADR2/Simple/2.0b/EiReport",
- headers={'content-type': 'Application/XML'},
- data=msg.encode('utf-8')) as resp:
- assert resp.status == 200
- # Test with a broken callback
- def broken_on_register_report(ven_id, resource_id, measurement, unit, scale, min_sampling_interval, max_sampling_interval):
- return "Hello There"
- server.add_handler('on_register_report', broken_on_register_report)
- async with ClientSession() as session:
- async with session.post("http://localhost:8080/OpenADR2/Simple/2.0b/EiReport",
- headers={'content-type': 'Application/XML'},
- data=msg.encode('utf-8')) as resp:
- assert resp.status == 200
- # assert "Your on_register_report handler must return a tuple; it returned 'Hello There' (str)." in caplog.messages
- # Test with a broken full callback
- def broken_on_register_report_full(report):
- return "Hello There Again"
- server.add_handler('on_register_report', broken_on_register_report_full)
- async with ClientSession() as session:
- async with session.post("http://localhost:8080/OpenADR2/Simple/2.0b/EiReport",
- headers={'content-type': 'Application/XML'},
- data=msg.encode('utf-8')) as resp:
- assert resp.status == 200
- assert f"Your on_register_report handler must return a list of tuples. It returned 'Hello There Again' (str)." in caplog.messages
- await server.stop()
- if __name__ == "__main__":
- asyncio.run(test_update_reports())
|