diff --git a/src/zeep/ns.py b/src/zeep/ns.py
index b175d574f..f1c69b26a 100644
--- a/src/zeep/ns.py
+++ b/src/zeep/ns.py
@@ -1,4 +1,3 @@
-
SOAP_11 = "http://schemas.xmlsoap.org/wsdl/soap/"
SOAP_12 = "http://schemas.xmlsoap.org/wsdl/soap12/"
SOAP_ENV_11 = "http://schemas.xmlsoap.org/soap/envelope/"
@@ -12,6 +11,8 @@
MIME = "http://schemas.xmlsoap.org/wsdl/mime/"
WSA = "http://www.w3.org/2005/08/addressing"
+WSP = "http://schemas.xmlsoap.org/ws/2004/09/policy"
+SP = "http://docs.oasis-open.org/ws-sx/ws-securitypolicy/200702"
DS = "http://www.w3.org/2000/09/xmldsig#"
diff --git a/src/zeep/wsdl/bindings/soap.py b/src/zeep/wsdl/bindings/soap.py
index 233ca3faf..589c87f7e 100644
--- a/src/zeep/wsdl/bindings/soap.py
+++ b/src/zeep/wsdl/bindings/soap.py
@@ -90,9 +90,13 @@ def _create(self, operation, args, kwargs, client=None, options=None):
if client.wsse:
if isinstance(client.wsse, list):
for wsse in client.wsse:
- envelope, http_headers = wsse.apply(envelope, http_headers)
+ envelope, http_headers = wsse.apply(
+ envelope, http_headers, operation_obj.binding.signatures
+ )
else:
- envelope, http_headers = client.wsse.apply(envelope, http_headers)
+ envelope, http_headers = client.wsse.apply(
+ envelope, http_headers, operation_obj.binding.signatures
+ )
# Add extra http headers from the setings object
if client.settings.extra_http_headers:
diff --git a/src/zeep/wsdl/definitions.py b/src/zeep/wsdl/definitions.py
index cb7d2f2dd..65a86f89b 100644
--- a/src/zeep/wsdl/definitions.py
+++ b/src/zeep/wsdl/definitions.py
@@ -126,6 +126,11 @@ def __init__(self, wsdl, name, port_name):
self.port_type = None
self.wsdl = wsdl
self._operations = {}
+ self.signatures = {
+ "header": [], # Elements of header, that should be signed
+ "body": False, # If body should be signed
+ "everything": False, # If every header should be signed
+ }
def resolve(self, definitions):
self.port_type = definitions.get("port_types", self.port_name.text)
diff --git a/src/zeep/wsdl/wsdl.py b/src/zeep/wsdl/wsdl.py
index 3eb6ea36e..abd4e95d5 100644
--- a/src/zeep/wsdl/wsdl.py
+++ b/src/zeep/wsdl/wsdl.py
@@ -14,6 +14,7 @@
import six
from lxml import etree
+from zeep import ns
from zeep.exceptions import IncompleteMessage
from zeep.loader import absolute_location, is_relative_path, load_external
from zeep.settings import Settings
@@ -21,7 +22,7 @@
from zeep.wsdl import parse
from zeep.xsd import Schema
-NSMAP = {"wsdl": "http://schemas.xmlsoap.org/wsdl/"}
+NSMAP = {"wsdl": ns.WSDL, "wsp": ns.WSP, "sp": ns.SP, "wsu": ns.WSU}
logger = logging.getLogger(__name__)
@@ -423,6 +424,29 @@ def parse_binding(self, doc):
logger.debug("Ignoring binding: %s", exc)
continue
+ # Begin heuristics for signed parts...
+ binding_policy = binding.name.localname + "_policy"
+ signed_parts = doc.xpath(
+ 'wsp:Policy[@wsu:Id="{}"]//sp:SignedParts'.format(
+ binding_policy
+ ),
+ namespaces=NSMAP,
+ )
+ for sign in signed_parts:
+ if len(sign.getchildren()) == 0:
+ # No children, we should sign everything
+ binding.signatures["body"] = True
+ binding.signatures["everything"] = True
+ break
+
+ for child in sign.iterchildren():
+ if len(child.items()) > 0:
+ # Header ...
+ part = {attr: value for attr, value in child.items()}
+ binding.signatures["header"].append(part)
+ elif child.tag.split("}")[-1].lower() == "body":
+ # Body ...
+ binding.signatures["body"] = True
logger.debug("Adding binding: %s", binding.name.text)
result[binding.name.text] = binding
break
diff --git a/src/zeep/wsse/signature.py b/src/zeep/wsse/signature.py
index 9c6d11879..8831e61d8 100644
--- a/src/zeep/wsse/signature.py
+++ b/src/zeep/wsse/signature.py
@@ -14,6 +14,7 @@
from zeep import ns
from zeep.exceptions import SignatureVerificationFailed
from zeep.utils import detect_soap_env
+from zeep.wsdl.utils import get_or_create_header
from zeep.wsse.utils import ensure_id, get_security_header
try:
@@ -24,6 +25,8 @@
# SOAP envelope
SOAP_NS = "http://schemas.xmlsoap.org/soap/envelope/"
+# Namespaces omitted from signing
+OMITTED_HEADERS = [ns.WSSE]
def _read_file(f_name):
@@ -61,10 +64,10 @@ def __init__(
self.digest_method = digest_method
self.signature_method = signature_method
- def apply(self, envelope, headers):
+ def apply(self, envelope, headers, signatures=None):
key = _make_sign_key(self.key_data, self.cert_data, self.password)
_sign_envelope_with_key(
- envelope, key, self.signature_method, self.digest_method
+ envelope, key, self.signature_method, self.digest_method, signatures
)
return envelope, headers
@@ -99,10 +102,10 @@ class BinarySignature(Signature):
Place the key information into BinarySecurityElement."""
- def apply(self, envelope, headers):
+ def apply(self, envelope, headers, signatures=None):
key = _make_sign_key(self.key_data, self.cert_data, self.password)
_sign_envelope_with_key_binary(
- envelope, key, self.signature_method, self.digest_method
+ envelope, key, self.signature_method, self.digest_method, signatures
)
return envelope, headers
@@ -123,6 +126,7 @@ def sign_envelope(
password=None,
signature_method=None,
digest_method=None,
+ signatures=None,
):
"""Sign given SOAP envelope with WSSE sig using given key and cert.
@@ -213,10 +217,12 @@ def sign_envelope(
"""
# Load the signing key and certificate.
key = _make_sign_key(_read_file(keyfile), _read_file(certfile), password)
- return _sign_envelope_with_key(envelope, key, signature_method, digest_method)
+ return _sign_envelope_with_key(
+ envelope, key, signature_method, digest_method, signatures
+ )
-def _signature_prepare(envelope, key, signature_method, digest_method):
+def _signature_prepare(envelope, key, signature_method, digest_method, signatures=None):
"""Prepare envelope and sign."""
soap_env = detect_soap_env(envelope)
@@ -241,10 +247,31 @@ def _signature_prepare(envelope, key, signature_method, digest_method):
# Perform the actual signing.
ctx = xmlsec.SignatureContext()
ctx.key = key
- _sign_node(ctx, signature, envelope.find(QName(soap_env, "Body")), digest_method)
+ # Sign default elements if present
timestamp = security.find(QName(ns.WSU, "Timestamp"))
if timestamp != None:
- _sign_node(ctx, signature, timestamp)
+ _sign_node(ctx, signature, timestamp, digest_method)
+
+ # Sign extra elements defined in WSDL
+ if signatures is not None:
+ if signatures["body"] or signatures["everything"]:
+ _sign_node(
+ ctx, signature, envelope.find(QName(soap_env, "Body")), digest_method
+ )
+ header = get_or_create_header(envelope)
+ if signatures["everything"]:
+ for node in header.iterchildren():
+ # Everything doesn't mean everything ...
+ if node.nsmap.get(node.prefix) not in OMITTED_HEADERS:
+ _sign_node(ctx, signature, node, digest_method)
+ else:
+ for node in signatures["header"]:
+ _sign_node(
+ ctx,
+ signature,
+ header.find(QName(node["Namespace"], node["Name"])),
+ digest_method,
+ )
ctx.sign(signature)
# Place the X509 data inside a WSSE SecurityTokenReference within
@@ -255,16 +282,20 @@ def _signature_prepare(envelope, key, signature_method, digest_method):
return security, sec_token_ref, x509_data
-def _sign_envelope_with_key(envelope, key, signature_method, digest_method):
+def _sign_envelope_with_key(
+ envelope, key, signature_method, digest_method, signatures=None
+):
_, sec_token_ref, x509_data = _signature_prepare(
- envelope, key, signature_method, digest_method
+ envelope, key, signature_method, digest_method, signatures=signatures
)
sec_token_ref.append(x509_data)
-def _sign_envelope_with_key_binary(envelope, key, signature_method, digest_method):
+def _sign_envelope_with_key_binary(
+ envelope, key, signature_method, digest_method, signatures=None
+):
security, sec_token_ref, x509_data = _signature_prepare(
- envelope, key, signature_method, digest_method
+ envelope, key, signature_method, digest_method, signatures=signatures
)
ref = etree.SubElement(
sec_token_ref,
diff --git a/src/zeep/wsse/username.py b/src/zeep/wsse/username.py
index ca9d19c3e..f54e8e6eb 100644
--- a/src/zeep/wsse/username.py
+++ b/src/zeep/wsse/username.py
@@ -56,7 +56,7 @@ def __init__(
self.use_digest = use_digest
self.timestamp_token = timestamp_token
- def apply(self, envelope, headers):
+ def apply(self, envelope, headers, operation_obj=None):
security = utils.get_security_header(envelope)
# The token placeholder might already exists since it is specified in
diff --git a/tests/test_wsdl.py b/tests/test_wsdl.py
index d545b2148..33ab44bd7 100644
--- a/tests/test_wsdl.py
+++ b/tests/test_wsdl.py
@@ -1313,3 +1313,122 @@ def test_import_no_location():
document = wsdl.Document(
wsdl_content, transport, "https://tests.python-zeep.org/content.wsdl"
)
+
+
+BASE_WSDL = """
+
+
+
+ {policy}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Test service
+
+
+
+
+
+ """
+
+
+def test_parse_bindings_signed_unknown():
+ policy = """
+
+
+
+
+
+ """
+ content = StringIO(BASE_WSDL.format(policy=policy).strip())
+ document = wsdl.Document(content, None)
+ assert document.bindings[
+ "{http://tests.python-zeep.org/xsd-main}TestBinding"
+ ].signatures == {"body": False, "everything": False, "header": []}
+
+def test_parse_bindings_signed_body():
+ policy = """
+
+
+
+
+
+ """
+ content = StringIO(BASE_WSDL.format(policy=policy).strip())
+ document = wsdl.Document(content, None)
+ assert document.bindings[
+ "{http://tests.python-zeep.org/xsd-main}TestBinding"
+ ].signatures == {"body": True, "everything": False, "header": []}
+
+
+def test_parse_bindings_signed_everything():
+ policy = """
+
+
+
+ """
+ content = StringIO(BASE_WSDL.format(policy=policy).strip())
+ document = wsdl.Document(content, None)
+ assert document.bindings[
+ "{http://tests.python-zeep.org/xsd-main}TestBinding"
+ ].signatures == {"body": True, "everything": True, "header": []}
+
+
+def test_parse_bindings_signed_headers():
+ policy = """
+
+
+
+
+
+ """
+ content = StringIO(BASE_WSDL.format(policy=policy).strip())
+ document = wsdl.Document(content, None)
+ assert document.bindings[
+ "{http://tests.python-zeep.org/xsd-main}TestBinding"
+ ].signatures == {
+ "body": False,
+ "everything": False,
+ "header": [{"Name": "To", "Namespace": "http://www.w3.org/2005/08/addressing"}],
+ }
+
+
+def test_parse_bindings_signed_nothing():
+ content = StringIO(BASE_WSDL.format(policy="").strip())
+ document = wsdl.Document(content, None)
+ assert document.bindings[
+ "{http://tests.python-zeep.org/xsd-main}TestBinding"
+ ].signatures == {"body": False, "everything": False, "header": []}
diff --git a/tests/test_wsse_signature.py b/tests/test_wsse_signature.py
index 72d877c11..3889dd7d3 100644
--- a/tests/test_wsse_signature.py
+++ b/tests/test_wsse_signature.py
@@ -92,12 +92,133 @@ def test_sign(
"""
)
+ # Force body signature
+ signatures = {"everything": False, "body": True, "header": []}
signature.sign_envelope(
envelope,
KEY_FILE,
KEY_FILE,
signature_method=getattr(xmlsec_installed.Transform, signature_method),
digest_method=getattr(xmlsec_installed.Transform, digest_method),
+ signatures=signatures,
+ )
+ signature.verify_envelope(envelope, KEY_FILE)
+
+ digests = envelope.xpath("//ds:DigestMethod", namespaces={"ds": ns.DS})
+ assert len(digests)
+ for digest in digests:
+ assert digest.get("Algorithm") == expected_digest_href
+ signatures = envelope.xpath("//ds:SignatureMethod", namespaces={"ds": ns.DS})
+ assert len(signatures)
+ for sig in signatures:
+ assert sig.get("Algorithm") == expected_signature_href
+
+
+@skip_if_no_xmlsec
+@pytest.mark.parametrize("digest_method,expected_digest_href", DIGEST_METHODS_TESTDATA)
+@pytest.mark.parametrize(
+ "signature_method,expected_signature_href", SIGNATURE_METHODS_TESTDATA
+)
+def test_sign_element(
+ digest_method, signature_method, expected_digest_href, expected_signature_href
+):
+ envelope = load_xml(
+ """
+
+
+
+
+ 2015-06-25T21:53:25.246276+00:00
+ 2015-06-25T21:58:25.246276+00:00
+
+
+ OK
+
+
+
+ OK
+
+
+
+ """
+ )
+
+ # Force header element
+ signatures = {
+ "everything": False,
+ "body": False,
+ "header": [{"Namespace": "http://tests.python-zeep.org/", "Name": "Some"}],
+ }
+ signature.sign_envelope(
+ envelope,
+ KEY_FILE,
+ KEY_FILE,
+ signature_method=getattr(xmlsec_installed.Transform, signature_method),
+ digest_method=getattr(xmlsec_installed.Transform, digest_method),
+ signatures=signatures,
+ )
+ signature.verify_envelope(envelope, KEY_FILE)
+
+ digests = envelope.xpath("//ds:DigestMethod", namespaces={"ds": ns.DS})
+ assert len(digests)
+ for digest in digests:
+ assert digest.get("Algorithm") == expected_digest_href
+ signatures = envelope.xpath("//ds:SignatureMethod", namespaces={"ds": ns.DS})
+ assert len(signatures)
+ for sig in signatures:
+ assert sig.get("Algorithm") == expected_signature_href
+
+
+@skip_if_no_xmlsec
+@pytest.mark.parametrize("digest_method,expected_digest_href", DIGEST_METHODS_TESTDATA)
+@pytest.mark.parametrize(
+ "signature_method,expected_signature_href", SIGNATURE_METHODS_TESTDATA
+)
+def test_sign_everything(
+ digest_method, signature_method, expected_digest_href, expected_signature_href
+):
+ envelope = load_xml(
+ """
+
+
+
+
+ 2015-06-25T21:53:25.246276+00:00
+ 2015-06-25T21:58:25.246276+00:00
+
+
+ OK
+
+
+
+ OK
+
+
+
+ """
+ )
+
+ # Force header element and body signature
+ signatures = {"everything": True, "body": True, "header": []}
+ signature.sign_envelope(
+ envelope,
+ KEY_FILE,
+ KEY_FILE,
+ signature_method=getattr(xmlsec_installed.Transform, signature_method),
+ digest_method=getattr(xmlsec_installed.Transform, digest_method),
+ signatures=signatures,
)
signature.verify_envelope(envelope, KEY_FILE)
@@ -130,7 +251,11 @@ def test_sign_pw():
"""
)
- signature.sign_envelope(envelope, KEY_FILE_PW, KEY_FILE_PW, "geheim")
+ # Force body signature
+ signatures = {"everything": False, "body": True, "header": []}
+ signature.sign_envelope(
+ envelope, KEY_FILE_PW, KEY_FILE_PW, "geheim", signatures=signatures
+ )
signature.verify_envelope(envelope, KEY_FILE_PW)
@@ -153,7 +278,9 @@ def test_verify_error():
"""
)
- signature.sign_envelope(envelope, KEY_FILE, KEY_FILE)
+ # Force body signature
+ signatures = {"everything": False, "body": True, "header": []}
+ signature.sign_envelope(envelope, KEY_FILE, KEY_FILE, signatures=signatures)
nsmap = {"tns": "http://tests.python-zeep.org/"}
for elm in envelope.xpath("//tns:Argument", namespaces=nsmap):
@@ -182,8 +309,10 @@ def test_signature():
"""
)
+ # Force body signature
+ signatures = {"everything": False, "body": True, "header": []}
plugin = wsse.Signature(KEY_FILE_PW, KEY_FILE_PW, "geheim")
- envelope, headers = plugin.apply(envelope, {})
+ envelope, headers = plugin.apply(envelope, {}, signatures=signatures)
plugin.verify(envelope)
@@ -212,6 +341,8 @@ def test_signature_binary(
"""
)
+ # Force body signature
+ signatures = {"everything": False, "body": True, "header": []}
plugin = wsse.BinarySignature(
KEY_FILE_PW,
KEY_FILE_PW,
@@ -219,7 +350,7 @@ def test_signature_binary(
signature_method=getattr(xmlsec_installed.Transform, signature_method),
digest_method=getattr(xmlsec_installed.Transform, digest_method),
)
- envelope, headers = plugin.apply(envelope, {})
+ envelope, headers = plugin.apply(envelope, {}, signatures=signatures)
plugin.verify(envelope)
# Test the reference
bintok = envelope.xpath(