Skip to content

Signed parts #747

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/zeep/ns.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

SOAP_11 = "http://schemas.xmlsoap.org/wsdl/soap/"
SOAP_12 = "http://schemas.xmlsoap.org/wsdl/soap12/"
SOAP_ENV_11 = "http://schemas.xmlsoap.org/soap/envelope/"
Expand All @@ -12,6 +11,8 @@
MIME = "http://schemas.xmlsoap.org/wsdl/mime/"

WSA = "http://www.w3.org/2005/08/addressing"
WSP = "http://schemas.xmlsoap.org/ws/2004/09/policy"
SP = "http://docs.oasis-open.org/ws-sx/ws-securitypolicy/200702"


DS = "http://www.w3.org/2000/09/xmldsig#"
Expand Down
8 changes: 6 additions & 2 deletions src/zeep/wsdl/bindings/soap.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,13 @@ def _create(self, operation, args, kwargs, client=None, options=None):
if client.wsse:
if isinstance(client.wsse, list):
for wsse in client.wsse:
envelope, http_headers = wsse.apply(envelope, http_headers)
envelope, http_headers = wsse.apply(
envelope, http_headers, operation_obj.binding.signatures
)
else:
envelope, http_headers = client.wsse.apply(envelope, http_headers)
envelope, http_headers = client.wsse.apply(
envelope, http_headers, operation_obj.binding.signatures
)

# Add extra http headers from the setings object
if client.settings.extra_http_headers:
Expand Down
5 changes: 5 additions & 0 deletions src/zeep/wsdl/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ def __init__(self, wsdl, name, port_name):
self.port_type = None
self.wsdl = wsdl
self._operations = {}
self.signatures = {
"header": [], # Elements of header, that should be signed
"body": False, # If body should be signed
"everything": False, # If every header should be signed
}

def resolve(self, definitions):
self.port_type = definitions.get("port_types", self.port_name.text)
Expand Down
26 changes: 25 additions & 1 deletion src/zeep/wsdl/wsdl.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@
import six
from lxml import etree

from zeep import ns
from zeep.exceptions import IncompleteMessage
from zeep.loader import absolute_location, is_relative_path, load_external
from zeep.settings import Settings
from zeep.utils import findall_multiple_ns
from zeep.wsdl import parse
from zeep.xsd import Schema

NSMAP = {"wsdl": "http://schemas.xmlsoap.org/wsdl/"}
NSMAP = {"wsdl": ns.WSDL, "wsp": ns.WSP, "sp": ns.SP, "wsu": ns.WSU}

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -423,6 +424,29 @@ def parse_binding(self, doc):
logger.debug("Ignoring binding: %s", exc)
continue

# Begin heuristics for signed parts...
binding_policy = binding.name.localname + "_policy"
signed_parts = doc.xpath(
'wsp:Policy[@wsu:Id="{}"]//sp:SignedParts'.format(
binding_policy
),
namespaces=NSMAP,
)
for sign in signed_parts:
if len(sign.getchildren()) == 0:
# No children, we should sign everything
binding.signatures["body"] = True
binding.signatures["everything"] = True
break

for child in sign.iterchildren():
if len(child.items()) > 0:
# Header ...
part = {attr: value for attr, value in child.items()}
binding.signatures["header"].append(part)
elif child.tag.split("}")[-1].lower() == "body":
# Body ...
binding.signatures["body"] = True
logger.debug("Adding binding: %s", binding.name.text)
result[binding.name.text] = binding
break
Expand Down
55 changes: 43 additions & 12 deletions src/zeep/wsse/signature.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from zeep import ns
from zeep.exceptions import SignatureVerificationFailed
from zeep.utils import detect_soap_env
from zeep.wsdl.utils import get_or_create_header
from zeep.wsse.utils import ensure_id, get_security_header

try:
Expand All @@ -24,6 +25,8 @@

# SOAP envelope
SOAP_NS = "http://schemas.xmlsoap.org/soap/envelope/"
# Namespaces omitted from signing
OMITTED_HEADERS = [ns.WSSE]


def _read_file(f_name):
Expand Down Expand Up @@ -61,10 +64,10 @@ def __init__(
self.digest_method = digest_method
self.signature_method = signature_method

def apply(self, envelope, headers):
def apply(self, envelope, headers, signatures=None):
key = _make_sign_key(self.key_data, self.cert_data, self.password)
_sign_envelope_with_key(
envelope, key, self.signature_method, self.digest_method
envelope, key, self.signature_method, self.digest_method, signatures
)
return envelope, headers

Expand Down Expand Up @@ -99,10 +102,10 @@ class BinarySignature(Signature):

Place the key information into BinarySecurityElement."""

def apply(self, envelope, headers):
def apply(self, envelope, headers, signatures=None):
key = _make_sign_key(self.key_data, self.cert_data, self.password)
_sign_envelope_with_key_binary(
envelope, key, self.signature_method, self.digest_method
envelope, key, self.signature_method, self.digest_method, signatures
)
return envelope, headers

Expand All @@ -123,6 +126,7 @@ def sign_envelope(
password=None,
signature_method=None,
digest_method=None,
signatures=None,
):
"""Sign given SOAP envelope with WSSE sig using given key and cert.

Expand Down Expand Up @@ -213,10 +217,12 @@ def sign_envelope(
"""
# Load the signing key and certificate.
key = _make_sign_key(_read_file(keyfile), _read_file(certfile), password)
return _sign_envelope_with_key(envelope, key, signature_method, digest_method)
return _sign_envelope_with_key(
envelope, key, signature_method, digest_method, signatures
)


def _signature_prepare(envelope, key, signature_method, digest_method):
def _signature_prepare(envelope, key, signature_method, digest_method, signatures=None):
"""Prepare envelope and sign."""
soap_env = detect_soap_env(envelope)

Expand All @@ -241,10 +247,31 @@ def _signature_prepare(envelope, key, signature_method, digest_method):
# Perform the actual signing.
ctx = xmlsec.SignatureContext()
ctx.key = key
_sign_node(ctx, signature, envelope.find(QName(soap_env, "Body")), digest_method)
# Sign default elements if present
timestamp = security.find(QName(ns.WSU, "Timestamp"))
if timestamp != None:
_sign_node(ctx, signature, timestamp)
_sign_node(ctx, signature, timestamp, digest_method)

# Sign extra elements defined in WSDL
if signatures is not None:
if signatures["body"] or signatures["everything"]:
_sign_node(
ctx, signature, envelope.find(QName(soap_env, "Body")), digest_method
)
header = get_or_create_header(envelope)
if signatures["everything"]:
for node in header.iterchildren():
# Everything doesn't mean everything ...
if node.nsmap.get(node.prefix) not in OMITTED_HEADERS:
_sign_node(ctx, signature, node, digest_method)
else:
for node in signatures["header"]:
_sign_node(
ctx,
signature,
header.find(QName(node["Namespace"], node["Name"])),
digest_method,
)
ctx.sign(signature)

# Place the X509 data inside a WSSE SecurityTokenReference within
Expand All @@ -255,16 +282,20 @@ def _signature_prepare(envelope, key, signature_method, digest_method):
return security, sec_token_ref, x509_data


def _sign_envelope_with_key(envelope, key, signature_method, digest_method):
def _sign_envelope_with_key(
envelope, key, signature_method, digest_method, signatures=None
):
_, sec_token_ref, x509_data = _signature_prepare(
envelope, key, signature_method, digest_method
envelope, key, signature_method, digest_method, signatures=signatures
)
sec_token_ref.append(x509_data)


def _sign_envelope_with_key_binary(envelope, key, signature_method, digest_method):
def _sign_envelope_with_key_binary(
envelope, key, signature_method, digest_method, signatures=None
):
security, sec_token_ref, x509_data = _signature_prepare(
envelope, key, signature_method, digest_method
envelope, key, signature_method, digest_method, signatures=signatures
)
ref = etree.SubElement(
sec_token_ref,
Expand Down
2 changes: 1 addition & 1 deletion src/zeep/wsse/username.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def __init__(
self.use_digest = use_digest
self.timestamp_token = timestamp_token

def apply(self, envelope, headers):
def apply(self, envelope, headers, operation_obj=None):
security = utils.get_security_header(envelope)

# The token placeholder might already exists since it is specified in
Expand Down
119 changes: 119 additions & 0 deletions tests/test_wsdl.py
Original file line number Diff line number Diff line change
Expand Up @@ -1313,3 +1313,122 @@ def test_import_no_location():
document = wsdl.Document(
wsdl_content, transport, "https://tests.python-zeep.org/content.wsdl"
)


BASE_WSDL = """
<?xml version="1.0"?>
<wsdl:definitions
xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/"
xmlns:wsu="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd"
xmlns:wsp="http://schemas.xmlsoap.org/ws/2004/09/policy"
xmlns:tns="http://tests.python-zeep.org/xsd-main"
xmlns:sp="http://docs.oasis-open.org/ws-sx/ws-securitypolicy/200702"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:soap="http://schemas.xmlsoap.org/wsdl/soap/"
targetNamespace="http://tests.python-zeep.org/xsd-main">

{policy}

<wsdl:types>
<xsd:schema
targetNamespace="http://tests.python-zeep.org/xsd-main"
xmlns:tns="http://tests.python-zeep.org/xsd-main">
<xsd:element name="input" type="xsd:string"/>
</xsd:schema>
</wsdl:types>

<wsdl:message name="message-1">
<wsdl:part name="response" element="tns:input"/>
</wsdl:message>

<wsdl:portType name="TestPortType">
<wsdl:operation name="TestOperation1">
<wsdl:input message="message-1"/>
</wsdl:operation>
</wsdl:portType>

<wsdl:binding name="TestBinding" type="tns:TestPortType">
<wsp:PolicyReference URI="#TestBinding"/>
<soap:binding transport="http://schemas.xmlsoap.org/soap/http"/>
<wsdl:operation name="TestOperation1">
<soap:operation soapAction=""/>
</wsdl:operation>
</wsdl:binding>

<wsdl:service name="TestService">
<wsdl:documentation>Test service</wsdl:documentation>
<wsdl:port name="TestPortType" binding="tns:TestBinding">
<soap:address location="https://tests.python-zeep.org/tests"/>
</wsdl:port>
</wsdl:service>
</wsdl:definitions>
"""


def test_parse_bindings_signed_unknown():
policy = """
<wsp:Policy wsu:Id="TestBinding_policy">
<sp:SignedParts>
<sp:Other/>
</sp:SignedParts>
</wsp:Policy>
"""
content = StringIO(BASE_WSDL.format(policy=policy).strip())
document = wsdl.Document(content, None)
assert document.bindings[
"{http://tests.python-zeep.org/xsd-main}TestBinding"
].signatures == {"body": False, "everything": False, "header": []}

def test_parse_bindings_signed_body():
policy = """
<wsp:Policy wsu:Id="TestBinding_policy">
<sp:SignedParts>
<sp:Body/>
</sp:SignedParts>
</wsp:Policy>
"""
content = StringIO(BASE_WSDL.format(policy=policy).strip())
document = wsdl.Document(content, None)
assert document.bindings[
"{http://tests.python-zeep.org/xsd-main}TestBinding"
].signatures == {"body": True, "everything": False, "header": []}


def test_parse_bindings_signed_everything():
policy = """
<wsp:Policy wsu:Id="TestBinding_policy">
<sp:SignedParts/>
</wsp:Policy>
"""
content = StringIO(BASE_WSDL.format(policy=policy).strip())
document = wsdl.Document(content, None)
assert document.bindings[
"{http://tests.python-zeep.org/xsd-main}TestBinding"
].signatures == {"body": True, "everything": True, "header": []}


def test_parse_bindings_signed_headers():
policy = """
<wsp:Policy wsu:Id="TestBinding_policy">
<sp:SignedParts>
<sp:Header Name="To" Namespace="http://www.w3.org/2005/08/addressing"/>
</sp:SignedParts>
</wsp:Policy>
"""
content = StringIO(BASE_WSDL.format(policy=policy).strip())
document = wsdl.Document(content, None)
assert document.bindings[
"{http://tests.python-zeep.org/xsd-main}TestBinding"
].signatures == {
"body": False,
"everything": False,
"header": [{"Name": "To", "Namespace": "http://www.w3.org/2005/08/addressing"}],
}


def test_parse_bindings_signed_nothing():
content = StringIO(BASE_WSDL.format(policy="").strip())
document = wsdl.Document(content, None)
assert document.bindings[
"{http://tests.python-zeep.org/xsd-main}TestBinding"
].signatures == {"body": False, "everything": False, "header": []}
Loading