|
@@ -1,5 +1,5 @@
|
|
from openleadr import OpenADRClient, OpenADRServer
|
|
from openleadr import OpenADRClient, OpenADRServer
|
|
-from openleadr.utils import generate_id
|
|
|
|
|
|
+from openleadr.utils import generate_id, certificate_fingerprint
|
|
from openleadr import messaging, errors
|
|
from openleadr import messaging, errors
|
|
import pytest
|
|
import pytest
|
|
from aiohttp import web
|
|
from aiohttp import web
|
|
@@ -67,7 +67,7 @@ async def test_invalid_signature_error(start_server_with_signatures, caplog):
|
|
vtn_url=f'https://localhost:{SERVER_PORT}/OpenADR2/Simple/2.0b',
|
|
vtn_url=f'https://localhost:{SERVER_PORT}/OpenADR2/Simple/2.0b',
|
|
cert=VEN_CERT,
|
|
cert=VEN_CERT,
|
|
key=VEN_KEY,
|
|
key=VEN_KEY,
|
|
- vtn_fingerprint='EE:44:C5:78:7E:4B:B8:DC:84:1F')
|
|
|
|
|
|
+ vtn_fingerprint=VTN_FINGERPRINT)
|
|
message = client._create_message('oadrPoll', ven_id='ven123')
|
|
message = client._create_message('oadrPoll', ven_id='ven123')
|
|
fake_sig = b64encode("HelloThere".encode('utf-8')).decode('utf-8')
|
|
fake_sig = b64encode("HelloThere".encode('utf-8')).decode('utf-8')
|
|
message = re.sub(r'<ds:SignatureValue>.*?</ds:SignatureValue>', f'<ds:SignatureValue>{fake_sig}</ds:SignatureValue>', message)
|
|
message = re.sub(r'<ds:SignatureValue>.*?</ds:SignatureValue>', f'<ds:SignatureValue>{fake_sig}</ds:SignatureValue>', message)
|
|
@@ -127,7 +127,7 @@ async def test_invalid_signature_error(start_server_with_signatures, caplog):
|
|
vtn_url=f'https://localhost:{SERVER_PORT}/OpenADR2/Simple/2.0b',
|
|
vtn_url=f'https://localhost:{SERVER_PORT}/OpenADR2/Simple/2.0b',
|
|
cert=VEN_CERT,
|
|
cert=VEN_CERT,
|
|
key=VEN_KEY,
|
|
key=VEN_KEY,
|
|
- vtn_fingerprint='EE:44:C5:78:7E:4B:B8:DC:84:1F')
|
|
|
|
|
|
+ vtn_fingerprint=VTN_FINGERPRINT)
|
|
message = client._create_message('oadrPoll', ven_id='ven123')
|
|
message = client._create_message('oadrPoll', ven_id='ven123')
|
|
fake_sig = b64encode("HelloThere".encode('utf-8')).decode('utf-8')
|
|
fake_sig = b64encode("HelloThere".encode('utf-8')).decode('utf-8')
|
|
message = re.sub(r'<ds:SignatureValue>.*?</ds:SignatureValue>', f'<ds:SignatureValue>{fake_sig}</ds:SignatureValue>', message)
|
|
message = re.sub(r'<ds:SignatureValue>.*?</ds:SignatureValue>', f'<ds:SignatureValue>{fake_sig}</ds:SignatureValue>', message)
|
|
@@ -147,7 +147,7 @@ def test_replay_protect_message_too_old(caplog):
|
|
vtn_url=f'https://localhost:{SERVER_PORT}/OpenADR2/Simple/2.0b',
|
|
vtn_url=f'https://localhost:{SERVER_PORT}/OpenADR2/Simple/2.0b',
|
|
cert=VEN_CERT,
|
|
cert=VEN_CERT,
|
|
key=VEN_KEY,
|
|
key=VEN_KEY,
|
|
- vtn_fingerprint='EE:44:C5:78:7E:4B:B8:DC:84:1F')
|
|
|
|
|
|
+ vtn_fingerprint=VTN_FINGERPRINT)
|
|
_temp = messaging.REPLAY_PROTECT_MAX_TIME_DELTA
|
|
_temp = messaging.REPLAY_PROTECT_MAX_TIME_DELTA
|
|
messaging.REPLAY_PROTECT_MAX_TIME_DELTA = timedelta(seconds=0)
|
|
messaging.REPLAY_PROTECT_MAX_TIME_DELTA = timedelta(seconds=0)
|
|
message = client._create_message('oadrPoll', ven_id='ven123')
|
|
message = client._create_message('oadrPoll', ven_id='ven123')
|
|
@@ -162,7 +162,7 @@ def test_replay_protect_repeated_message(caplog):
|
|
vtn_url=f'https://localhost:{SERVER_PORT}/OpenADR2/Simple/2.0b',
|
|
vtn_url=f'https://localhost:{SERVER_PORT}/OpenADR2/Simple/2.0b',
|
|
cert=VEN_CERT,
|
|
cert=VEN_CERT,
|
|
key=VEN_KEY,
|
|
key=VEN_KEY,
|
|
- vtn_fingerprint='EE:44:C5:78:7E:4B:B8:DC:84:1F')
|
|
|
|
|
|
+ vtn_fingerprint=VTN_FINGERPRINT)
|
|
message = client._create_message('oadrPoll', ven_id='ven123')
|
|
message = client._create_message('oadrPoll', ven_id='ven123')
|
|
tree = etree.fromstring(message.encode('utf-8'))
|
|
tree = etree.fromstring(message.encode('utf-8'))
|
|
messaging._verify_replay_protect(tree)
|
|
messaging._verify_replay_protect(tree)
|
|
@@ -176,7 +176,7 @@ def test_replay_protect_missing_nonce(caplog):
|
|
vtn_url=f'https://localhost:{SERVER_PORT}/OpenADR2/Simple/2.0b',
|
|
vtn_url=f'https://localhost:{SERVER_PORT}/OpenADR2/Simple/2.0b',
|
|
cert=VEN_CERT,
|
|
cert=VEN_CERT,
|
|
key=VEN_KEY,
|
|
key=VEN_KEY,
|
|
- vtn_fingerprint='EE:44:C5:78:7E:4B:B8:DC:84:1F')
|
|
|
|
|
|
+ vtn_fingerprint=VTN_FINGERPRINT)
|
|
message = client._create_message('oadrPoll', ven_id='ven123')
|
|
message = client._create_message('oadrPoll', ven_id='ven123')
|
|
message = re.sub('<dsp:nonce>.*?</dsp:nonce>', '', message)
|
|
message = re.sub('<dsp:nonce>.*?</dsp:nonce>', '', message)
|
|
tree = etree.fromstring(message.encode('utf-8'))
|
|
tree = etree.fromstring(message.encode('utf-8'))
|
|
@@ -190,7 +190,7 @@ def test_replay_protect_malformed_nonce(caplog):
|
|
vtn_url=f'https://localhost:{SERVER_PORT}/OpenADR2/Simple/2.0b',
|
|
vtn_url=f'https://localhost:{SERVER_PORT}/OpenADR2/Simple/2.0b',
|
|
cert=VEN_CERT,
|
|
cert=VEN_CERT,
|
|
key=VEN_KEY,
|
|
key=VEN_KEY,
|
|
- vtn_fingerprint='EE:44:C5:78:7E:4B:B8:DC:84:1F')
|
|
|
|
|
|
+ vtn_fingerprint=VTN_FINGERPRINT)
|
|
message = client._create_message('oadrPoll', ven_id='ven123')
|
|
message = client._create_message('oadrPoll', ven_id='ven123')
|
|
message = re.sub('<dsp:timestamp>.*?</dsp:timestamp>', '', message)
|
|
message = re.sub('<dsp:timestamp>.*?</dsp:timestamp>', '', message)
|
|
tree = etree.fromstring(message.encode('utf-8'))
|
|
tree = etree.fromstring(message.encode('utf-8'))
|
|
@@ -229,6 +229,12 @@ VTN_CERT = os.path.join(os.path.dirname(os.path.dirname(__file__)), "certificate
|
|
VTN_KEY = os.path.join(os.path.dirname(os.path.dirname(__file__)), "certificates", "dummy_vtn.key")
|
|
VTN_KEY = os.path.join(os.path.dirname(os.path.dirname(__file__)), "certificates", "dummy_vtn.key")
|
|
CA_FILE = os.path.join(os.path.dirname(os.path.dirname(__file__)), "certificates", "dummy_ca.crt")
|
|
CA_FILE = os.path.join(os.path.dirname(os.path.dirname(__file__)), "certificates", "dummy_ca.crt")
|
|
|
|
|
|
|
|
+with open(VEN_CERT) as file:
|
|
|
|
+ VEN_FINGERPRINT = certificate_fingerprint(file.read())
|
|
|
|
+
|
|
|
|
+with open(VTN_CERT) as file:
|
|
|
|
+ VTN_FINGERPRINT = certificate_fingerprint(file.read())
|
|
|
|
+
|
|
async def _on_create_party_registration(payload):
|
|
async def _on_create_party_registration(payload):
|
|
registration_id = generate_id()
|
|
registration_id = generate_id()
|
|
payload = {'response': {'response_code': 200,
|
|
payload = {'response': {'response_code': 200,
|
|
@@ -248,7 +254,7 @@ async def _client_on_report(report):
|
|
pass
|
|
pass
|
|
|
|
|
|
def fingerprint_lookup(ven_id):
|
|
def fingerprint_lookup(ven_id):
|
|
- return "6B:C8:4E:47:15:AA:30:EB:CE:0E"
|
|
|
|
|
|
+ return VEN_FINGERPRINT
|
|
|
|
|
|
@pytest.fixture
|
|
@pytest.fixture
|
|
async def start_server():
|
|
async def start_server():
|