signature.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. # SPDX-License-Identifier: Apache-2.0
  2. # Copyright 2020 Contributors to OpenLEADR
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. # Unless required by applicable law or agreed to in writing, software
  8. # distributed under the License is distributed on an "AS IS" BASIS,
  9. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. # See the License for the specific language governing permissions and
  11. # limitations under the License.
  12. """
  13. Utility functions for XML Message Signatures
  14. """
  15. import hmac
  16. import hashlib
  17. from base64 import b64encode
  18. import re
  19. from datetime import datetime, timedelta, timezone
  20. import xmltodict
  21. from uuid import uuid4
  22. from .utils import datetimeformat, timedeltaformat, booleanformat
  23. from cryptography import x509
  24. from xml.etree.ElementTree import canonicalize
  25. from io import StringIO
  26. from jinja2 import Environment, PackageLoader, select_autoescape
  27. TEMPLATES = Environment(loader=PackageLoader('pyopenadr', 'templates'))
  28. TEMPLATES.filters['datetimeformat'] = datetimeformat
  29. TEMPLATES.filters['timedeltaformat'] = timedeltaformat
  30. TEMPLATES.filters['booleanformat'] = booleanformat
  31. REPLAY_PROTECT_MAX_TIME_DELTA = timedelta(seconds=5)
  32. with open("/home/stan/Ontwikkeling/ElaadNL/pyopenadr/cert.pem", "rb") as file:
  33. certificate = x509.load_pem_x509_certificate(file.read())
  34. MODULUS = b64encode(certificate.public_key().public_numbers().n.to_bytes(512, 'big')).decode('ascii')
  35. EXPONENT = b64encode(certificate.public_key().public_numbers().e.to_bytes(4, 'big')).decode('ascii')
  36. def create_signature(xml_message):
  37. """
  38. This creates the signature for the given object. It will return the complete
  39. Signature section that can be pasted into the XML message.
  40. """
  41. # Convert it to its canonical C14n form
  42. signed_object_canonical = canonicalize(xml_message)
  43. # Calculate the of this section
  44. print("Calculating the SHA256 hash of this object")
  45. print(signed_object_canonical)
  46. digest_value_signed_object = calculate_digest(signed_object_canonical)
  47. print()
  48. print("The signature value is")
  49. print(digest_value_signed_object)
  50. # Generate the prop and calculate the digest
  51. prop = render('signatures/prop.xml', timestamp=datetime.now(timezone.utc), nonce=uuid4().hex)
  52. digest_value_prop = calculate_digest(prop)
  53. # Construct the SignedInfo object
  54. references = [{'id': 'oadrSignedObject',
  55. 'digest_value': digest_value_signed_object},
  56. {'id': 'prop',
  57. 'digest_value': digest_value_prop}]
  58. signed_info = render('signatures/SignedInfo.xml', references=references)
  59. # Construct the KeyInfo object
  60. key_info = render('signatures/KeyInfo.xml', modulus=MODULUS, exponent=EXPONENT)
  61. # Calculate the signature for the SignedInfo object
  62. signature_value = calculate_digest(signed_info)
  63. # Render the complete Signature section
  64. signature = render('signatures/Signature.xml',
  65. signed_info=signed_info,
  66. signature_value=signature_value,
  67. prop=prop,
  68. key_info=key_info,
  69. canonicalize_output=False)
  70. return signature
  71. def validate_message(xml_message):
  72. """
  73. This validates the message.
  74. 1. Verify the digest of the SignedInfo element against the SignatureValue
  75. 2. Verify the digest of the oadrSignedObject against the value in the DigestValue field.
  76. 3. Verify the presence of a ReplayProtect field and that the time is no more different than 5 seconds.
  77. """
  78. # Extract the SignedInfo part
  79. signed_info = extract(xml_message, 'SignedInfo')
  80. signed_info_canonical = canonicalize(signed_info)
  81. signed_info_dict = xmltodict.parse(signed_info_canonical)
  82. # Verify the digest of the SignedInfo element
  83. signed_info_digest = calculate_digest(signed_info)
  84. # Verify the digest of the oadrSignedObject
  85. signed_object = extract(xml_message, 'oadrSignedObject')
  86. signed_object_canonical = canonicalize(signed_object)
  87. signed_object_id = re.search(r'id="(.*?)"', signed_object_canonical, flags=re.I).group(1)
  88. # breakpoint()
  89. signed_info_reference = re.search(fr'<(.*)?Reference.*? URI="#{signed_object_id}".*?>(.*?)</\1Reference>',
  90. signed_info,
  91. flags=re.S).group(2)
  92. signed_info_digest_method = re.search(r'<(.*)?DigestMethod.* Algorithm="(.*?)"', signed_info_reference).group(2)
  93. signed_info_digest_value = re.search(r'<(.*)?DigestValue.*?>(.*?)</\1DigestValue>', signed_info_reference).group(2)
  94. if signed_info_digest_method != "http://www.w3.org/2001/04/xmlenc#sha256":
  95. raise ValueError(f"Wrong digest method used: {signed_info_digest_method}")
  96. signed_object_digest = calculate_digest(signed_object_canonical)
  97. if signed_object_digest != signed_info_digest_value:
  98. raise ValueError(f"Digest values do not match for oadrSignedObject identified by #{signed_object_id}\n"
  99. f"Provided Digest: {signed_info_digest_value}\n"
  100. f"Calculated Digest: {signed_object_digest}")
  101. def calculate_digest(xml_part):
  102. """
  103. This calculates the digest for the given XML part
  104. and returns its base-64 encoded value
  105. """
  106. hash = hashlib.sha256()
  107. hash.update(xml_part.encode('utf-8'))
  108. return b64encode(hash.digest()).decode('ascii')
  109. def calculate_signature(xml_part):
  110. """
  111. This calculates the signature for the entire SignedInfo block.
  112. """
  113. def get_tag_id(xml_message, tag):
  114. tag = re.search(fr'<(.*)?{tag}.*?id="(.*?)".*?>',
  115. xml_message,
  116. flags=re.S|re.I).group(0)
  117. def extract(xml, tag):
  118. # Extract the first occurence of tag and its contents from the xml message
  119. section = re.search(fr'<([^:]*:?){tag}[^>]*>.*</\1{tag}>', xml, flags=re.S)
  120. if section:
  121. return section.group(0)
  122. else:
  123. return None
  124. def render(template, canonicalize_output=True, **data):
  125. t = TEMPLATES.get_template(template)
  126. xml = t.render(**data)
  127. if canonicalize_output:
  128. return canonicalize(xml)
  129. else:
  130. return xml