# SPDX-License-Identifier: Apache-2.0 # Copyright 2020 Contributors to OpenLEADR # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # http://www.apache.org/licenses/LICENSE-2.0 # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import pytest from openleadr import OpenADRClient, OpenADRServer, enums from openleadr.utils import generate_id from openleadr.messaging import create_message, parse_message from openleadr.objects import Event, EventDescriptor, ActivePeriod, EventSignal, Interval from datetime import datetime, timezone, timedelta from pprint import pprint import warnings from test.fixtures.simple_server import start_server, add_event @pytest.mark.asyncio async def test_conformance_021(start_server): """ If venID, vtnID, or eventID value is included in the payload, the receiving entity MUST validate that the ID value is as expected and generate an error if an unexpected value is received. Exception: A VEN MUST NOT generate an error upon receipt of a canceled event whose eventID is not previously known. """ client = OpenADRClient(ven_name="TestVEN", vtn_url="http://localhost:8001/OpenADR2/Simple/2.0b") await client.create_party_registration() event = {'event_descriptor': {'event_id': generate_id(), 'modification_number': 0, 'modification_date': datetime.now(), 'priority': 0, 'market_context': 'MarketContext001', 'created_date_time': datetime.now(), 'event_status': enums.EVENT_STATUS.FAR, 'test_event': False, 'vtn_comment': 'No Comment'}, 'active_period': {'dtstart': datetime.now() + timedelta(minutes=30), 'duration': timedelta(minutes=30)}, 'event_signals': [{'intervals': [{'duration': timedelta(minutes=10), 'signal_payload': 1}, {'duration': timedelta(minutes=10), 'signal_payload': 2}, {'duration': timedelta(minutes=10), 'signal_payload': 3}], 'signal_name': enums.SIGNAL_NAME.SIMPLE, 'signal_type': enums.SIGNAL_TYPE.DELTA, 'signal_id': generate_id(), 'current_value': 123 }] } add_event(ven_id=client.ven_id, event_id = event['event_descriptor']['event_id'], event=event) message_type, message_payload = await client.poll() assert message_type == 'oadrDistributeEvent'