test_conformance_021.py 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. import pytest
  2. from pyopenadr import OpenADRClient, OpenADRServer, enums
  3. from pyopenadr.utils import generate_id
  4. from pyopenadr.messaging import create_message, parse_message
  5. from pyopenadr.objects import Event, EventDescriptor, ActivePeriod, EventSignal, Interval
  6. from datetime import datetime, timezone, timedelta
  7. from pprint import pprint
  8. import warnings
  9. from test.fixtures.simple_server import start_server, add_event
  10. @pytest.mark.asyncio
  11. async def test_conformance_021(start_server):
  12. """
  13. If venID, vtnID, or eventID value is included in the payload, the receiving
  14. entity MUST validate that the ID value is as expected and generate an error
  15. if an unexpected value is received.
  16. Exception: A VEN MUST NOT generate an error upon receipt of a canceled
  17. event whose eventID is not previously known.
  18. """
  19. client = OpenADRClient(ven_name="TestVEN",
  20. vtn_url="http://localhost:8001/OpenADR2/Simple/2.0b")
  21. await client.create_party_registration()
  22. event = {'event_descriptor':
  23. {'event_id': generate_id(),
  24. 'modification_number': 0,
  25. 'modification_date': datetime.now(),
  26. 'priority': 0,
  27. 'market_context': 'MarketContext001',
  28. 'created_date_time': datetime.now(),
  29. 'event_status': enums.EVENT_STATUS.FAR,
  30. 'test_event': False,
  31. 'vtn_comment': 'No Comment'},
  32. 'active_period':
  33. {'dtstart': datetime.now() + timedelta(minutes=30),
  34. 'duration': timedelta(minutes=30)},
  35. 'event_signals':
  36. [{'intervals': [{'duration': timedelta(minutes=10),
  37. 'signal_payload': 1},
  38. {'duration': timedelta(minutes=10),
  39. 'signal_payload': 2},
  40. {'duration': timedelta(minutes=10),
  41. 'signal_payload': 3}],
  42. 'signal_name': enums.SIGNAL_NAME.SIMPLE,
  43. 'signal_type': enums.SIGNAL_TYPE.DELTA,
  44. 'signal_id': generate_id(),
  45. 'current_value': 123
  46. }]
  47. }
  48. add_event(ven_id=client.ven_id,
  49. event_id = event['event_descriptor']['event_id'],
  50. event=event)
  51. message_type, message_payload = await client.poll()
  52. assert message_type == 'oadrDistributeEvent'