From 64249f7c0541dd9aed690e7a8f69833a8d7cadcb Mon Sep 17 00:00:00 2001 From: Tomas Pazderka Date: Mon, 7 May 2018 12:15:02 +0200 Subject: [PATCH 1/9] Parse SignedParts for bindings and assign --- src/zeep/ns.py | 2 ++ src/zeep/wsdl/definitions.py | 5 +++++ src/zeep/wsdl/wsdl.py | 27 ++++++++++++++++++++++++++- 3 files changed, 33 insertions(+), 1 deletion(-) diff --git a/src/zeep/ns.py b/src/zeep/ns.py index c339331b5..f1c69b26a 100644 --- a/src/zeep/ns.py +++ b/src/zeep/ns.py @@ -11,6 +11,8 @@ MIME = "http://schemas.xmlsoap.org/wsdl/mime/" WSA = "http://www.w3.org/2005/08/addressing" +WSP = "http://schemas.xmlsoap.org/ws/2004/09/policy" +SP = "http://docs.oasis-open.org/ws-sx/ws-securitypolicy/200702" DS = "http://www.w3.org/2000/09/xmldsig#" diff --git a/src/zeep/wsdl/definitions.py b/src/zeep/wsdl/definitions.py index deb33ea99..75fe9cea8 100644 --- a/src/zeep/wsdl/definitions.py +++ b/src/zeep/wsdl/definitions.py @@ -135,6 +135,11 @@ def __init__(self, wsdl, name, port_name): self.port_type = None self.wsdl = wsdl self._operations = {} + self.signatures = { + 'header': [], # Elements of header, that should be signed + 'body': False, # If body should be signed + 'everything': False, # If every header should be signed + } def resolve(self, definitions: Definition) -> None: self.port_type = definitions.get("port_types", self.port_name.text) diff --git a/src/zeep/wsdl/wsdl.py b/src/zeep/wsdl/wsdl.py index f842fa439..44b0cd0a9 100644 --- a/src/zeep/wsdl/wsdl.py +++ b/src/zeep/wsdl/wsdl.py @@ -14,6 +14,7 @@ from lxml import etree +from zeep import ns from zeep.exceptions import IncompleteMessage from zeep.loader import ( absolute_location, @@ -30,7 +31,12 @@ if typing.TYPE_CHECKING: from zeep.transports import Transport -NSMAP = {"wsdl": "http://schemas.xmlsoap.org/wsdl/"} +NSMAP = { + 'wsdl': ns.WSDL, + 'wsp': ns.WSP, + 'sp': ns.SP, + 'wsu': ns.WSU, +} logger = logging.getLogger(__name__) @@ -440,6 +446,25 @@ def parse_binding( logger.debug("Ignoring binding: %s", exc) continue + # Begin heuristics for signed parts... + binding_policy = binding.name.localname + '_policy' + signed_parts = doc.xpath('wsp:Policy[@wsu:Id="{}"]//sp:SignedParts'.format(binding_policy), + namespaces=NSMAP) + for sign in signed_parts: + if len(sign.getchildren()) == 0: + # No children, we should sign everything + binding.signatures['body'] = True + binding.signatures['everything'] = True + break + + for child in sign.iterchildren(): + if len(child.items()) > 0: + # Header ... + part = {attr: value for attr, value in child.items()} + binding.signatures['header'].append(part) + elif child.tag.split('}')[-1].lower() == 'body': + # Body ... + binding.signatures['body'] = True logger.debug("Adding binding: %s", binding.name.text) result[binding.name.text] = binding break From d159a5779cdbf62fb885c610079f36de6ba98c4d Mon Sep 17 00:00:00 2001 From: Tomas Pazderka Date: Mon, 7 May 2018 13:39:33 +0200 Subject: [PATCH 2/9] Sign elements required by WSDL --- src/zeep/wsdl/bindings/soap.py | 8 ++++-- src/zeep/wsdl/definitions.py | 6 ++--- src/zeep/wsdl/wsdl.py | 20 ++++++++------ src/zeep/wsse/signature.py | 49 ++++++++++++++++++++++++++-------- src/zeep/wsse/username.py | 2 +- tests/test_wsse_signature.py | 19 ++++++++++--- 6 files changed, 75 insertions(+), 29 deletions(-) diff --git a/src/zeep/wsdl/bindings/soap.py b/src/zeep/wsdl/bindings/soap.py index 3a5e5433c..480742cc1 100644 --- a/src/zeep/wsdl/bindings/soap.py +++ b/src/zeep/wsdl/bindings/soap.py @@ -95,9 +95,13 @@ def _create(self, operation, args, kwargs, client=None, options=None): if client.wsse: if isinstance(client.wsse, list): for wsse in client.wsse: - envelope, http_headers = wsse.apply(envelope, http_headers) + envelope, http_headers = wsse.apply( + envelope, http_headers, operation_obj.binding.signatures + ) else: - envelope, http_headers = client.wsse.apply(envelope, http_headers) + envelope, http_headers = client.wsse.apply( + envelope, http_headers, operation_obj.binding.signatures + ) # Add extra http headers from the setings object if client.settings.extra_http_headers: diff --git a/src/zeep/wsdl/definitions.py b/src/zeep/wsdl/definitions.py index 75fe9cea8..5eb52ed4d 100644 --- a/src/zeep/wsdl/definitions.py +++ b/src/zeep/wsdl/definitions.py @@ -136,9 +136,9 @@ def __init__(self, wsdl, name, port_name): self.wsdl = wsdl self._operations = {} self.signatures = { - 'header': [], # Elements of header, that should be signed - 'body': False, # If body should be signed - 'everything': False, # If every header should be signed + "header": [], # Elements of header, that should be signed + "body": False, # If body should be signed + "everything": False, # If every header should be signed } def resolve(self, definitions: Definition) -> None: diff --git a/src/zeep/wsdl/wsdl.py b/src/zeep/wsdl/wsdl.py index 44b0cd0a9..0e69981c8 100644 --- a/src/zeep/wsdl/wsdl.py +++ b/src/zeep/wsdl/wsdl.py @@ -447,24 +447,28 @@ def parse_binding( continue # Begin heuristics for signed parts... - binding_policy = binding.name.localname + '_policy' - signed_parts = doc.xpath('wsp:Policy[@wsu:Id="{}"]//sp:SignedParts'.format(binding_policy), - namespaces=NSMAP) + binding_policy = binding.name.localname + "_policy" + signed_parts = doc.xpath( + 'wsp:Policy[@wsu:Id="{}"]//sp:SignedParts'.format( + binding_policy + ), + namespaces=NSMAP, + ) for sign in signed_parts: if len(sign.getchildren()) == 0: # No children, we should sign everything - binding.signatures['body'] = True - binding.signatures['everything'] = True + binding.signatures["body"] = True + binding.signatures["everything"] = True break for child in sign.iterchildren(): if len(child.items()) > 0: # Header ... part = {attr: value for attr, value in child.items()} - binding.signatures['header'].append(part) - elif child.tag.split('}')[-1].lower() == 'body': + binding.signatures["header"].append(part) + elif child.tag.split("}")[-1].lower() == "body": # Body ... - binding.signatures['body'] = True + binding.signatures["body"] = True logger.debug("Adding binding: %s", binding.name.text) result[binding.name.text] = binding break diff --git a/src/zeep/wsse/signature.py b/src/zeep/wsse/signature.py index c4aec758e..71580b90a 100644 --- a/src/zeep/wsse/signature.py +++ b/src/zeep/wsse/signature.py @@ -14,6 +14,7 @@ from zeep import ns from zeep.exceptions import SignatureVerificationFailed from zeep.utils import detect_soap_env +from zeep.wsdl.utils import get_or_create_header from zeep.wsse.utils import ensure_id, get_security_header try: @@ -61,10 +62,10 @@ def __init__( self.digest_method = digest_method self.signature_method = signature_method - def apply(self, envelope, headers): + def apply(self, envelope, headers, signatures=None): key = _make_sign_key(self.key_data, self.cert_data, self.password) _sign_envelope_with_key( - envelope, key, self.signature_method, self.digest_method + envelope, key, self.signature_method, self.digest_method, signatures ) return envelope, headers @@ -99,10 +100,10 @@ class BinarySignature(Signature): Place the key information into BinarySecurityElement.""" - def apply(self, envelope, headers): + def apply(self, envelope, headers, signatures=None): key = _make_sign_key(self.key_data, self.cert_data, self.password) _sign_envelope_with_key_binary( - envelope, key, self.signature_method, self.digest_method + envelope, key, self.signature_method, self.digest_method, signatures ) return envelope, headers @@ -123,6 +124,7 @@ def sign_envelope( password=None, signature_method=None, digest_method=None, + signatures=None, ): """Sign given SOAP envelope with WSSE sig using given key and cert. @@ -213,10 +215,12 @@ def sign_envelope( """ # Load the signing key and certificate. key = _make_sign_key(_read_file(keyfile), _read_file(certfile), password) - return _sign_envelope_with_key(envelope, key, signature_method, digest_method) + return _sign_envelope_with_key( + envelope, key, signature_method, digest_method, signatures + ) -def _signature_prepare(envelope, key, signature_method, digest_method): +def _signature_prepare(envelope, key, signature_method, digest_method, signatures=None): """Prepare envelope and sign.""" soap_env = detect_soap_env(envelope) @@ -241,10 +245,29 @@ def _signature_prepare(envelope, key, signature_method, digest_method): # Perform the actual signing. ctx = xmlsec.SignatureContext() ctx.key = key - _sign_node(ctx, signature, envelope.find(QName(soap_env, "Body")), digest_method) + # Sign default elements if present timestamp = security.find(QName(ns.WSU, "Timestamp")) if timestamp != None: _sign_node(ctx, signature, timestamp, digest_method) + + # Sign extra elements defined in WSDL + if signatures is not None: + if signatures["body"] or signatures["everything"]: + _sign_node( + ctx, signature, envelope.find(QName(soap_env, "Body")), digest_method + ) + header = get_or_create_header(envelope) + if signatures["everything"]: + for node in header.iterchildren(): + _sign_node(ctx, signature, node, digest_method) + else: + for node in signatures["header"]: + _sign_node( + ctx, + signature, + header.find(QName(node["Namespace"], node["Name"])), + digest_method, + ) ctx.sign(signature) # Place the X509 data inside a WSSE SecurityTokenReference within @@ -255,16 +278,20 @@ def _signature_prepare(envelope, key, signature_method, digest_method): return security, sec_token_ref, x509_data -def _sign_envelope_with_key(envelope, key, signature_method, digest_method): +def _sign_envelope_with_key( + envelope, key, signature_method, digest_method, signatures=None +): _, sec_token_ref, x509_data = _signature_prepare( - envelope, key, signature_method, digest_method + envelope, key, signature_method, digest_method, signatures=signatures ) sec_token_ref.append(x509_data) -def _sign_envelope_with_key_binary(envelope, key, signature_method, digest_method): +def _sign_envelope_with_key_binary( + envelope, key, signature_method, digest_method, signatures=None +): security, sec_token_ref, x509_data = _signature_prepare( - envelope, key, signature_method, digest_method + envelope, key, signature_method, digest_method, signatures=signatures ) ref = etree.SubElement( sec_token_ref, diff --git a/src/zeep/wsse/username.py b/src/zeep/wsse/username.py index da448b445..104ae0eb4 100644 --- a/src/zeep/wsse/username.py +++ b/src/zeep/wsse/username.py @@ -65,7 +65,7 @@ def __init__( self.zulu_timestamp = zulu_timestamp self.hash_password = hash_password - def apply(self, envelope, headers): + def apply(self, envelope, headers, operation_obj=None): security = utils.get_security_header(envelope) # The token placeholder might already exists since it is specified in diff --git a/tests/test_wsse_signature.py b/tests/test_wsse_signature.py index 487b83542..8440a2aaf 100644 --- a/tests/test_wsse_signature.py +++ b/tests/test_wsse_signature.py @@ -118,12 +118,15 @@ def test_sign( """ ) + # Force body signature + signatures = {"everything": False, "body": True, "header": []} signature.sign_envelope( envelope, KEY_FILE, KEY_FILE, signature_method=getattr(xmlsec.Transform, signature_method), digest_method=getattr(xmlsec.Transform, digest_method), + signatures=signatures, ) signature.verify_envelope(envelope, KEY_FILE) @@ -156,7 +159,9 @@ def test_sign_pw(): """ ) - signature.sign_envelope(envelope, KEY_FILE_PW, KEY_FILE_PW, "geheim") + # Force body signature + signatures = {"everything": False, "body": True, "header": []} + signature.sign_envelope(envelope, KEY_FILE_PW, KEY_FILE_PW, "geheim", signatures=signatures) signature.verify_envelope(envelope, KEY_FILE_PW) @@ -179,7 +184,9 @@ def test_verify_error(): """ ) - signature.sign_envelope(envelope, KEY_FILE, KEY_FILE) + # Force body signature + signatures = {"everything": False, "body": True, "header": []} + signature.sign_envelope(envelope, KEY_FILE, KEY_FILE, signatures=signatures) nsmap = {"tns": "http://tests.python-zeep.org/"} for elm in envelope.xpath("//tns:Argument", namespaces=nsmap): @@ -208,8 +215,10 @@ def test_signature(): """ ) + # Force body signature + signatures = {"everything": False, "body": True, "header": []} plugin = wsse.Signature(KEY_FILE_PW, KEY_FILE_PW, "geheim") - envelope, headers = plugin.apply(envelope, {}) + envelope, headers = plugin.apply(envelope, {}, signatures=signatures) plugin.verify(envelope) @@ -238,6 +247,8 @@ def test_signature_binary( """ ) + # Force body signature + signatures = {"everything": False, "body": True, "header": []} plugin = wsse.BinarySignature( KEY_FILE_PW, KEY_FILE_PW, @@ -245,7 +256,7 @@ def test_signature_binary( signature_method=getattr(xmlsec.Transform, signature_method), digest_method=getattr(xmlsec.Transform, digest_method), ) - envelope, headers = plugin.apply(envelope, {}) + envelope, headers = plugin.apply(envelope, {}, signatures=signatures) plugin.verify(envelope) # Test the reference bintok = envelope.xpath( From c758ce2bb7d4f757381e5ef43a1ec1c5e1731b42 Mon Sep 17 00:00:00 2001 From: Tomas Pazderka Date: Mon, 27 May 2019 17:25:53 +0200 Subject: [PATCH 3/9] Added tests --- tests/test_wsdl.py | 119 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 119 insertions(+) diff --git a/tests/test_wsdl.py b/tests/test_wsdl.py index 5013d79f3..627d72e90 100644 --- a/tests/test_wsdl.py +++ b/tests/test_wsdl.py @@ -1313,3 +1313,122 @@ def test_import_no_location(): document = wsdl.Document( wsdl_content, transport, "https://tests.python-zeep.org/content.wsdl" ) + + +BASE_WSDL = """ + + + + {policy} + + + + + + + + + + + + + + + + + + + + + + + + + + + Test service + + + + + + """ + + +def test_parse_bindings_signed_unknown(): + policy = """ + + + + + + """ + content = StringIO(BASE_WSDL.format(policy=policy).strip()) + document = wsdl.Document(content, None) + assert document.bindings[ + "{http://tests.python-zeep.org/xsd-main}TestBinding" + ].signatures == {"body": False, "everything": False, "header": []} + +def test_parse_bindings_signed_body(): + policy = """ + + + + + + """ + content = StringIO(BASE_WSDL.format(policy=policy).strip()) + document = wsdl.Document(content, None) + assert document.bindings[ + "{http://tests.python-zeep.org/xsd-main}TestBinding" + ].signatures == {"body": True, "everything": False, "header": []} + + +def test_parse_bindings_signed_everything(): + policy = """ + + + + """ + content = StringIO(BASE_WSDL.format(policy=policy).strip()) + document = wsdl.Document(content, None) + assert document.bindings[ + "{http://tests.python-zeep.org/xsd-main}TestBinding" + ].signatures == {"body": True, "everything": True, "header": []} + + +def test_parse_bindings_signed_headers(): + policy = """ + + + + + + """ + content = StringIO(BASE_WSDL.format(policy=policy).strip()) + document = wsdl.Document(content, None) + assert document.bindings[ + "{http://tests.python-zeep.org/xsd-main}TestBinding" + ].signatures == { + "body": False, + "everything": False, + "header": [{"Name": "To", "Namespace": "http://www.w3.org/2005/08/addressing"}], + } + + +def test_parse_bindings_signed_nothing(): + content = StringIO(BASE_WSDL.format(policy="").strip()) + document = wsdl.Document(content, None) + assert document.bindings[ + "{http://tests.python-zeep.org/xsd-main}TestBinding" + ].signatures == {"body": False, "everything": False, "header": []} From 4c276d631d93ee6fa6bc53859874c6b4d029409e Mon Sep 17 00:00:00 2001 From: Tomas Pazderka Date: Tue, 28 May 2019 11:16:54 +0200 Subject: [PATCH 4/9] Omitt WSSE header elements from signature --- src/zeep/wsse/signature.py | 6 +- tests/test_wsse_signature.py | 122 ++++++++++++++++++++++++++++++++++- 2 files changed, 126 insertions(+), 2 deletions(-) diff --git a/src/zeep/wsse/signature.py b/src/zeep/wsse/signature.py index 71580b90a..7ba8b5b2d 100644 --- a/src/zeep/wsse/signature.py +++ b/src/zeep/wsse/signature.py @@ -25,6 +25,8 @@ # SOAP envelope SOAP_NS = "http://schemas.xmlsoap.org/soap/envelope/" +# Namespaces omitted from signing +OMITTED_HEADERS = [ns.WSSE] def _read_file(f_name): @@ -259,7 +261,9 @@ def _signature_prepare(envelope, key, signature_method, digest_method, signature header = get_or_create_header(envelope) if signatures["everything"]: for node in header.iterchildren(): - _sign_node(ctx, signature, node, digest_method) + # Everything doesn't mean everything ... + if node.nsmap.get(node.prefix) not in OMITTED_HEADERS: + _sign_node(ctx, signature, node, digest_method) else: for node in signatures["header"]: _sign_node( diff --git a/tests/test_wsse_signature.py b/tests/test_wsse_signature.py index 8440a2aaf..4d9e83bf5 100644 --- a/tests/test_wsse_signature.py +++ b/tests/test_wsse_signature.py @@ -140,6 +140,124 @@ def test_sign( assert sig.get("Algorithm") == expected_signature_href +@skip_if_no_xmlsec +@pytest.mark.parametrize("digest_method,expected_digest_href", DIGEST_METHODS_TESTDATA) +@pytest.mark.parametrize( + "signature_method,expected_signature_href", SIGNATURE_METHODS_TESTDATA +) +def test_sign_element( + digest_method, signature_method, expected_digest_href, expected_signature_href +): + envelope = load_xml( + """ + + + + + 2015-06-25T21:53:25.246276+00:00 + 2015-06-25T21:58:25.246276+00:00 + + + OK + + + + OK + + + + """ + ) + + # Force header element + signatures = { + "everything": False, + "body": False, + "header": [{"Namespace": "http://tests.python-zeep.org/", "Name": "Some"}], + } + signature.sign_envelope( + envelope, + KEY_FILE, + KEY_FILE, + signature_method=getattr(xmlsec_installed.Transform, signature_method), + digest_method=getattr(xmlsec_installed.Transform, digest_method), + signatures=signatures, + ) + signature.verify_envelope(envelope, KEY_FILE) + + digests = envelope.xpath("//ds:DigestMethod", namespaces={"ds": ns.DS}) + assert len(digests) + for digest in digests: + assert digest.get("Algorithm") == expected_digest_href + signatures = envelope.xpath("//ds:SignatureMethod", namespaces={"ds": ns.DS}) + assert len(signatures) + for sig in signatures: + assert sig.get("Algorithm") == expected_signature_href + + +@skip_if_no_xmlsec +@pytest.mark.parametrize("digest_method,expected_digest_href", DIGEST_METHODS_TESTDATA) +@pytest.mark.parametrize( + "signature_method,expected_signature_href", SIGNATURE_METHODS_TESTDATA +) +def test_sign_everything( + digest_method, signature_method, expected_digest_href, expected_signature_href +): + envelope = load_xml( + """ + + + + + 2015-06-25T21:53:25.246276+00:00 + 2015-06-25T21:58:25.246276+00:00 + + + OK + + + + OK + + + + """ + ) + + # Force header element and body signature + signatures = {"everything": True, "body": True, "header": []} + signature.sign_envelope( + envelope, + KEY_FILE, + KEY_FILE, + signature_method=getattr(xmlsec_installed.Transform, signature_method), + digest_method=getattr(xmlsec_installed.Transform, digest_method), + signatures=signatures, + ) + signature.verify_envelope(envelope, KEY_FILE) + + digests = envelope.xpath("//ds:DigestMethod", namespaces={"ds": ns.DS}) + assert len(digests) + for digest in digests: + assert digest.get("Algorithm") == expected_digest_href + signatures = envelope.xpath("//ds:SignatureMethod", namespaces={"ds": ns.DS}) + assert len(signatures) + for sig in signatures: + assert sig.get("Algorithm") == expected_signature_href + + @skip_if_no_xmlsec def test_sign_pw(): envelope = load_xml( @@ -161,7 +279,9 @@ def test_sign_pw(): # Force body signature signatures = {"everything": False, "body": True, "header": []} - signature.sign_envelope(envelope, KEY_FILE_PW, KEY_FILE_PW, "geheim", signatures=signatures) + signature.sign_envelope( + envelope, KEY_FILE_PW, KEY_FILE_PW, "geheim", signatures=signatures + ) signature.verify_envelope(envelope, KEY_FILE_PW) From 950140cf710efa8f8becbb92ee5edbca9ae60cf6 Mon Sep 17 00:00:00 2001 From: David Date: Sat, 21 Sep 2024 17:51:29 +0200 Subject: [PATCH 5/9] Fix tests after refactor on main --- tests/test_wsse_signature.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_wsse_signature.py b/tests/test_wsse_signature.py index 4d9e83bf5..4bfa52188 100644 --- a/tests/test_wsse_signature.py +++ b/tests/test_wsse_signature.py @@ -185,8 +185,8 @@ def test_sign_element( envelope, KEY_FILE, KEY_FILE, - signature_method=getattr(xmlsec_installed.Transform, signature_method), - digest_method=getattr(xmlsec_installed.Transform, digest_method), + signature_method=getattr(xmlsec.Transform, signature_method), + digest_method=getattr(xmlsec.Transform, digest_method), signatures=signatures, ) signature.verify_envelope(envelope, KEY_FILE) @@ -242,8 +242,8 @@ def test_sign_everything( envelope, KEY_FILE, KEY_FILE, - signature_method=getattr(xmlsec_installed.Transform, signature_method), - digest_method=getattr(xmlsec_installed.Transform, digest_method), + signature_method=getattr(xmlsec.Transform, signature_method), + digest_method=getattr(xmlsec.Transform, digest_method), signatures=signatures, ) signature.verify_envelope(envelope, KEY_FILE) From 74526d1c9dedeb2c22bf708b0e7cbe881753d4a3 Mon Sep 17 00:00:00 2001 From: David Date: Sat, 21 Sep 2024 18:48:10 +0200 Subject: [PATCH 6/9] Implement Strict Layout Rules for WSS 1.0 --- src/zeep/wsse/signature.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/zeep/wsse/signature.py b/src/zeep/wsse/signature.py index 7ba8b5b2d..31a17c1e1 100644 --- a/src/zeep/wsse/signature.py +++ b/src/zeep/wsse/signature.py @@ -242,7 +242,7 @@ def _signature_prepare(envelope, key, signature_method, digest_method, signature # Insert the Signature node in the wsse:Security header. security = get_security_header(envelope) - security.insert(0, signature) + security.append(signature) # Perform the actual signing. ctx = xmlsec.SignatureContext() @@ -316,7 +316,7 @@ def _sign_envelope_with_key_binary( ) ref.attrib["URI"] = "#" + ensure_id(bintok) bintok.text = x509_data.find(QName(ns.DS, "X509Certificate")).text - security.insert(1, bintok) + security.insert(0, bintok) x509_data.getparent().remove(x509_data) From 9670129c6a9a766028c09f67436ffc8f85725a28 Mon Sep 17 00:00:00 2001 From: David Date: Sat, 21 Sep 2024 22:22:37 +0200 Subject: [PATCH 7/9] Restore backwards compatible behaviour if signatures aren't specified --- src/zeep/wsse/signature.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/zeep/wsse/signature.py b/src/zeep/wsse/signature.py index 31a17c1e1..8e47bd09d 100644 --- a/src/zeep/wsse/signature.py +++ b/src/zeep/wsse/signature.py @@ -247,13 +247,13 @@ def _signature_prepare(envelope, key, signature_method, digest_method, signature # Perform the actual signing. ctx = xmlsec.SignatureContext() ctx.key = key - # Sign default elements if present - timestamp = security.find(QName(ns.WSU, "Timestamp")) - if timestamp != None: - _sign_node(ctx, signature, timestamp, digest_method) - - # Sign extra elements defined in WSDL - if signatures is not None: + # Preserve the previous behaviour for backwards compatibility + if signatures is None: + _sign_node(ctx, signature, envelope.find(QName(soap_env, "Body")), digest_method) + timestamp = security.find(QName(ns.WSU, "Timestamp")) + if timestamp != None: + _sign_node(ctx, signature, timestamp, digest_method) + else: if signatures["body"] or signatures["everything"]: _sign_node( ctx, signature, envelope.find(QName(soap_env, "Body")), digest_method From 758dd298dfea25fb002a64ae01acef307767a1bc Mon Sep 17 00:00:00 2001 From: David Date: Sat, 21 Sep 2024 22:24:31 +0200 Subject: [PATCH 8/9] Improve spec fidelity by creating union of all SignedParts --- src/zeep/wsdl/definitions.py | 2 +- src/zeep/wsdl/wsdl.py | 11 +++++++++-- src/zeep/wsse/signature.py | 6 +++--- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/src/zeep/wsdl/definitions.py b/src/zeep/wsdl/definitions.py index 5eb52ed4d..59e399f8a 100644 --- a/src/zeep/wsdl/definitions.py +++ b/src/zeep/wsdl/definitions.py @@ -136,7 +136,7 @@ def __init__(self, wsdl, name, port_name): self.wsdl = wsdl self._operations = {} self.signatures = { - "header": [], # Elements of header, that should be signed + "header": [], # Parts of header, that should be signed "body": False, # If body should be signed "everything": False, # If every header should be signed } diff --git a/src/zeep/wsdl/wsdl.py b/src/zeep/wsdl/wsdl.py index 0e69981c8..097529342 100644 --- a/src/zeep/wsdl/wsdl.py +++ b/src/zeep/wsdl/wsdl.py @@ -454,6 +454,9 @@ def parse_binding( ), namespaces=NSMAP, ) + # Initialize a set to keep track of all unique headers + all_headers = set() + for sign in signed_parts: if len(sign.getchildren()) == 0: # No children, we should sign everything @@ -464,11 +467,15 @@ def parse_binding( for child in sign.iterchildren(): if len(child.items()) > 0: # Header ... - part = {attr: value for attr, value in child.items()} - binding.signatures["header"].append(part) + part = frozenset({attr: value for attr, value in child.items()}.items()) + all_headers.add(part) elif child.tag.split("}")[-1].lower() == "body": # Body ... binding.signatures["body"] = True + + # If we didn't set "everything" to True, update the headers + if not binding.signatures.get("everything", False): + binding.signatures["header"] = [dict(header) for header in all_headers] logger.debug("Adding binding: %s", binding.name.text) result[binding.name.text] = binding break diff --git a/src/zeep/wsse/signature.py b/src/zeep/wsse/signature.py index 8e47bd09d..3459fa209 100644 --- a/src/zeep/wsse/signature.py +++ b/src/zeep/wsse/signature.py @@ -254,18 +254,18 @@ def _signature_prepare(envelope, key, signature_method, digest_method, signature if timestamp != None: _sign_node(ctx, signature, timestamp, digest_method) else: - if signatures["body"] or signatures["everything"]: + if signatures.get("body") or signatures.get("everything"): _sign_node( ctx, signature, envelope.find(QName(soap_env, "Body")), digest_method ) header = get_or_create_header(envelope) - if signatures["everything"]: + if signatures.get("everything"): for node in header.iterchildren(): # Everything doesn't mean everything ... if node.nsmap.get(node.prefix) not in OMITTED_HEADERS: _sign_node(ctx, signature, node, digest_method) else: - for node in signatures["header"]: + for node in signatures.get("header", []): _sign_node( ctx, signature, From 223938ed3623db1500fdbddd23103f4a071f2a6d Mon Sep 17 00:00:00 2001 From: David Date: Sat, 21 Sep 2024 22:25:02 +0200 Subject: [PATCH 9/9] Add support for sp:SignedElements --- src/zeep/wsdl/definitions.py | 1 + src/zeep/wsdl/wsdl.py | 33 +++++++++++++++++++++++++++++++++ src/zeep/wsse/signature.py | 25 +++++++++++++++++++++++++ tests/test_wsdl.py | 28 ++++++++++++++++++++++++---- 4 files changed, 83 insertions(+), 4 deletions(-) diff --git a/src/zeep/wsdl/definitions.py b/src/zeep/wsdl/definitions.py index 59e399f8a..1dbee0e6c 100644 --- a/src/zeep/wsdl/definitions.py +++ b/src/zeep/wsdl/definitions.py @@ -137,6 +137,7 @@ def __init__(self, wsdl, name, port_name): self._operations = {} self.signatures = { "header": [], # Parts of header, that should be signed + "elements": [], # Arbitrary XPath elements that should be signed "body": False, # If body should be signed "everything": False, # If every header should be signed } diff --git a/src/zeep/wsdl/wsdl.py b/src/zeep/wsdl/wsdl.py index 097529342..4ea551990 100644 --- a/src/zeep/wsdl/wsdl.py +++ b/src/zeep/wsdl/wsdl.py @@ -476,6 +476,39 @@ def parse_binding( # If we didn't set "everything" to True, update the headers if not binding.signatures.get("everything", False): binding.signatures["header"] = [dict(header) for header in all_headers] + + # Begin parsing SignedElements assertions + signed_elements = doc.xpath( + 'wsp:Policy[@wsu:Id="{}"]//sp:SignedElements'.format(binding_policy), + namespaces=NSMAP, + ) + + for signed_element in signed_elements: + xpath_version = signed_element.get('XPathVersion', 'http://www.w3.org/TR/1999/REC-xpath-19991116') # Default to XPath 1.0 if not specified + + xpath_expressions = signed_element.xpath('sp:XPath', namespaces=NSMAP) + + for xpath in xpath_expressions: + xpath_string = xpath.text + if xpath_string: + # Store the XPath expression and its version + binding.signatures.setdefault('elements', []).append({ + 'xpath': xpath_string, + 'xpath_version': xpath_version + }) + + # If you want to merge multiple SignedElements assertions as per the specification + if 'elements' in binding.signatures: + # Remove duplicates while preserving order + unique_elements = [] + seen = set() + for element in binding.signatures['elements']: + element_tuple = (element['xpath'], element['xpath_version']) + if element_tuple not in seen: + seen.add(element_tuple) + unique_elements.append(element) + binding.signatures['elements'] = unique_elements + logger.debug("Adding binding: %s", binding.name.text) result[binding.name.text] = binding break diff --git a/src/zeep/wsse/signature.py b/src/zeep/wsse/signature.py index 3459fa209..25a7fda78 100644 --- a/src/zeep/wsse/signature.py +++ b/src/zeep/wsse/signature.py @@ -272,6 +272,17 @@ def _signature_prepare(envelope, key, signature_method, digest_method, signature header.find(QName(node["Namespace"], node["Name"])), digest_method, ) + # Sign elements specified by XPath expressions + for element in signatures.get("elements", []): + _sign_node_by_xpath( + ctx, + signature, + envelope, + element["xpath"], + element["xpath_version"], + digest_method + ) + ctx.sign(signature) # Place the X509 data inside a WSSE SecurityTokenReference within @@ -281,6 +292,20 @@ def _signature_prepare(envelope, key, signature_method, digest_method, signature sec_token_ref = etree.SubElement(key_info, QName(ns.WSSE, "SecurityTokenReference")) return security, sec_token_ref, x509_data +def _sign_node_by_xpath(ctx, signature, envelope, xpath, xpath_version, digest_method): + # Create an XPath evaluator with the appropriate version + if xpath_version == '1.0': + evaluator = etree.XPath(xpath, namespaces=envelope.nsmap) + else: + evaluator = etree.XPath(xpath, namespaces=envelope.nsmap, extension={('http://www.w3.org/TR/1999/REC-xpath-19991116', 'version'): xpath_version}) + + # Evaluate the XPath expression + nodes = evaluator(envelope) + + # Sign each node found by the XPath expression + for node in nodes: + _sign_node(ctx, signature, node, digest_method) + def _sign_envelope_with_key( envelope, key, signature_method, digest_method, signatures=None diff --git a/tests/test_wsdl.py b/tests/test_wsdl.py index 627d72e90..22baf606d 100644 --- a/tests/test_wsdl.py +++ b/tests/test_wsdl.py @@ -1377,7 +1377,7 @@ def test_parse_bindings_signed_unknown(): document = wsdl.Document(content, None) assert document.bindings[ "{http://tests.python-zeep.org/xsd-main}TestBinding" - ].signatures == {"body": False, "everything": False, "header": []} + ].signatures == {"body": False, "everything": False, "header": [], "elements": []} def test_parse_bindings_signed_body(): policy = """ @@ -1391,7 +1391,7 @@ def test_parse_bindings_signed_body(): document = wsdl.Document(content, None) assert document.bindings[ "{http://tests.python-zeep.org/xsd-main}TestBinding" - ].signatures == {"body": True, "everything": False, "header": []} + ].signatures == {"body": True, "everything": False, "header": [], "elements": []} def test_parse_bindings_signed_everything(): @@ -1404,7 +1404,7 @@ def test_parse_bindings_signed_everything(): document = wsdl.Document(content, None) assert document.bindings[ "{http://tests.python-zeep.org/xsd-main}TestBinding" - ].signatures == {"body": True, "everything": True, "header": []} + ].signatures == {"body": True, "everything": True, "header": [], "elements": []} def test_parse_bindings_signed_headers(): @@ -1423,12 +1423,32 @@ def test_parse_bindings_signed_headers(): "body": False, "everything": False, "header": [{"Name": "To", "Namespace": "http://www.w3.org/2005/08/addressing"}], + "elements": [] } +def test_parse_bindings_signed_elements(): + policy = """ + + + //wsse:Security/wsu:Timestamp + + + """ + content = StringIO(BASE_WSDL.format(policy=policy).strip()) + document = wsdl.Document(content, None) + assert document.bindings[ + "{http://tests.python-zeep.org/xsd-main}TestBinding" + ].signatures == { + "body": False, + "everything": False, + "header": [], + "elements": [{"xpath": "//wsse:Security/wsu:Timestamp", "xpath_version": "http://www.w3.org/TR/1999/REC-xpath-19991116"}] + } + def test_parse_bindings_signed_nothing(): content = StringIO(BASE_WSDL.format(policy="").strip()) document = wsdl.Document(content, None) assert document.bindings[ "{http://tests.python-zeep.org/xsd-main}TestBinding" - ].signatures == {"body": False, "everything": False, "header": []} + ].signatures == {"body": False, "everything": False, "header": [], "elements": []}