test_conformance_006.py 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. import pytest
  2. from pyopenadr import OpenADRClient, OpenADRServer, enums
  3. from pyopenadr.utils import generate_id, create_message, parse_message
  4. from datetime import datetime, timezone, timedelta
  5. from pprint import pprint
  6. @pytest.mark.asyncio
  7. async def test_conformance_006():
  8. """
  9. The presence of any string except “false” in the oadrDistributeEvent
  10. testEvent element MUST be treated as a trigger for a test event.
  11. """
  12. event_id = generate_id()
  13. event = {'event_descriptor':
  14. {'event_id': event_id,
  15. 'modification_number': 0,
  16. 'modification_date': datetime.now(),
  17. 'priority': 0,
  18. 'market_context': 'MarketContext001',
  19. 'created_date_time': datetime.now(),
  20. 'event_status': enums.EVENT_STATUS.FAR,
  21. 'test_event': "HelloThere",
  22. 'vtn_comment': 'No Comment'},
  23. 'active_period':
  24. {'dtstart': datetime.now(),
  25. 'duration': timedelta(minutes=5)},
  26. 'event_signals':
  27. [{'intervals': [{'duration': timedelta(minutes=10),
  28. 'signal_payload': 100},
  29. {'duration': timedelta(minutes=10),
  30. 'signal_payload': 200},
  31. {'duration': timedelta(minutes=10),
  32. 'signal_payload': 300}],
  33. 'signal_name': enums.SIGNAL_NAME.SIMPLE,
  34. 'signal_type': enums.SIGNAL_TYPE.DELTA,
  35. 'signal_id': generate_id()
  36. }]
  37. }
  38. # Create a message with this event
  39. msg = create_message('oadrDistributeEvent',
  40. response={'response_code': 200,
  41. 'response_description': 'OK',
  42. 'request_id': generate_id()},
  43. request_id=generate_id(),
  44. vtn_id=generate_id(),
  45. events=[event])
  46. parsed_type, parsed_message = parse_message(msg)
  47. assert parsed_type == 'oadrDistributeEvent'
  48. assert parsed_message['events'][0]['event_descriptor']['test_event'] == True