|
@@ -9,6 +9,7 @@ import asyncio
|
|
|
from datetime import timedelta
|
|
|
from base64 import b64encode
|
|
|
import re
|
|
|
+from lxml import etree
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
async def test_http_level_error(start_server):
|
|
@@ -141,8 +142,67 @@ async def test_invalid_signature_error(start_server_with_signatures, caplog):
|
|
|
else:
|
|
|
assert False
|
|
|
|
|
|
+def test_replay_protect_message_too_old(caplog):
|
|
|
+ client = OpenADRClient(ven_name='myven',
|
|
|
+ vtn_url=f'https://localhost:{SERVER_PORT}/OpenADR2/Simple/2.0b',
|
|
|
+ cert=VEN_CERT,
|
|
|
+ key=VEN_KEY,
|
|
|
+ vtn_fingerprint='EE:44:C5:78:7E:4B:B8:DC:84:1F')
|
|
|
+ _temp = messaging.REPLAY_PROTECT_MAX_TIME_DELTA
|
|
|
+ messaging.REPLAY_PROTECT_MAX_TIME_DELTA = timedelta(seconds=0)
|
|
|
+ message = client._create_message('oadrPoll', ven_id='ven123')
|
|
|
+ tree = etree.fromstring(message.encode('utf-8'))
|
|
|
+ with pytest.raises(ValueError) as err:
|
|
|
+ messaging._verify_replay_protect(tree)
|
|
|
+ assert str(err.value) == 'The message was signed too long ago.'
|
|
|
+ messaging.REPLAY_PROTECT_MAX_TIME_DELTA = _temp
|
|
|
+
|
|
|
+def test_replay_protect_repeated_message(caplog):
|
|
|
+ client = OpenADRClient(ven_name='myven',
|
|
|
+ vtn_url=f'https://localhost:{SERVER_PORT}/OpenADR2/Simple/2.0b',
|
|
|
+ cert=VEN_CERT,
|
|
|
+ key=VEN_KEY,
|
|
|
+ vtn_fingerprint='EE:44:C5:78:7E:4B:B8:DC:84:1F')
|
|
|
+ message = client._create_message('oadrPoll', ven_id='ven123')
|
|
|
+ tree = etree.fromstring(message.encode('utf-8'))
|
|
|
+ messaging._verify_replay_protect(tree)
|
|
|
+ with pytest.raises(ValueError) as err:
|
|
|
+ messaging._verify_replay_protect(tree)
|
|
|
+ assert str(err.value) == 'This combination of timestamp and nonce was already used.'
|
|
|
+
|
|
|
|
|
|
+def test_replay_protect_missing_nonce(caplog):
|
|
|
+ client = OpenADRClient(ven_name='myven',
|
|
|
+ vtn_url=f'https://localhost:{SERVER_PORT}/OpenADR2/Simple/2.0b',
|
|
|
+ cert=VEN_CERT,
|
|
|
+ key=VEN_KEY,
|
|
|
+ vtn_fingerprint='EE:44:C5:78:7E:4B:B8:DC:84:1F')
|
|
|
+ message = client._create_message('oadrPoll', ven_id='ven123')
|
|
|
+ message = re.sub('<dsp:nonce>.*?</dsp:nonce>', '', message)
|
|
|
+ tree = etree.fromstring(message.encode('utf-8'))
|
|
|
+ with pytest.raises(ValueError) as err:
|
|
|
+ messaging._verify_replay_protect(tree)
|
|
|
+ assert str(err.value) == "Missing 'nonce' element in ReplayProtect in incoming message."
|
|
|
+
|
|
|
+
|
|
|
+def test_replay_protect_malformed_nonce(caplog):
|
|
|
+ client = OpenADRClient(ven_name='myven',
|
|
|
+ vtn_url=f'https://localhost:{SERVER_PORT}/OpenADR2/Simple/2.0b',
|
|
|
+ cert=VEN_CERT,
|
|
|
+ key=VEN_KEY,
|
|
|
+ vtn_fingerprint='EE:44:C5:78:7E:4B:B8:DC:84:1F')
|
|
|
+ message = client._create_message('oadrPoll', ven_id='ven123')
|
|
|
+ message = re.sub('<dsp:timestamp>.*?</dsp:timestamp>', '', message)
|
|
|
+ tree = etree.fromstring(message.encode('utf-8'))
|
|
|
+ with pytest.raises(ValueError) as err:
|
|
|
+ messaging._verify_replay_protect(tree)
|
|
|
+ assert str(err.value) == "Missing or malformed ReplayProtect element in the message signature."
|
|
|
|
|
|
+ message = re.sub('<dsp:ReplayProtect>.*?</dsp:ReplayProtect>', '', message)
|
|
|
+ tree = etree.fromstring(message.encode('utf-8'))
|
|
|
+ with pytest.raises(ValueError) as err:
|
|
|
+ messaging._verify_replay_protect(tree)
|
|
|
+ assert str(err.value) == "Missing or malformed ReplayProtect element in the message signature."
|
|
|
|
|
|
##########################################################################################
|
|
|
|