Skip to content

Commit

Permalink
Merged tag and namespace stacks; closes #7
Browse files Browse the repository at this point in the history
  • Loading branch information
filipsalo committed Apr 11, 2010
1 parent 21716b1 commit fc3e46f
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 62 deletions.
129 changes: 71 additions & 58 deletions streamxmlwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def asort(pairs, tag):
sort order for the given `tag` name."""
def key(item):
"""Return a sort key for a ``(key, value)`` pair."""
name, _ = item
(_, (_, name)), _ = item
if tag not in attrib_order:
return name
keys = attrib_order[tag]
Expand Down Expand Up @@ -113,7 +113,6 @@ class XMLSyntaxError(Exception):
class XMLWriter(object):
"""Stream XML writer"""
def __init__(self, file, encoding="utf-8",
nsmap=None, default_namespace=None,
pretty_print=False, sort=True):
"""
Create an `XMLWriter` that writes its output to `file`.
Expand Down Expand Up @@ -148,29 +147,29 @@ def __init__(self, file, encoding="utf-8",
self._sort = sorter_factory(sort)
self._tags = []
self._start_tag_open = False
nsmap = nsmap or {}
nsmap = nsmap.copy()
if default_namespace:
nsmap[default_namespace] = ""
self._namespace_maps = [nsmap]
self._new_namespaces = nsmap.items()
self._new_namespaces = {}
if self.encoding not in ("us-ascii", "utf-8"):
self.declaration()
self._wrote_data = False

def _cname(self, name):
def _cname(self, name, nsmap, cnames):
"""Return a cname from its {ns}tag form."""
if name[0] == "{":
uri, name = name[1:].split("}", 1)
nsmap = self._namespace_maps[-1]
if uri not in nsmap:
prefix = "ns" + str(len(nsmap)+1)
self.start_ns(prefix, uri)
else:
prefix = nsmap[uri]
if prefix:
name = prefix + ":" + name
return name
if name in cnames:
return cnames[name]
if not name[0] == "{":
name = "{}" + name
uri, ncname = name[1:].split("}", 1)
if uri not in nsmap:
prefix = "ns" + str(len(nsmap)+1)
nsmap[uri] = prefix
else:
prefix = nsmap[uri]
if prefix:
cname = prefix + ":" + ncname
else:
cname = ncname
cnames[name] = cname
return cname, (uri, ncname)

def start(self, tag, attributes=None, nsmap=None, **kwargs):
"""Open a new `tag` element.
Expand All @@ -184,41 +183,59 @@ def start(self, tag, attributes=None, nsmap=None, **kwargs):
self._start_tag_open = False
if self._pretty_print and self._tags and not self._wrote_data:
self.write("\n" + INDENT * len(self._tags))
# Generate start-ns events for all new namespaces
if nsmap is not None:
for (prefix, uri) in nsmap.iteritems():
if self._namespace_maps[-1].get(uri) != prefix:
self.start_ns(prefix, uri)
tag = self._cname(tag)

# Copy old namespaces and cnames
if self._tags:
_, old_namespaces, _ = self._tags[-1]
else:
old_namespaces = {'': ''}
namespaces = old_namespaces.copy()
if nsmap:
self._new_namespaces.update(nsmap)

values = self._new_namespaces.values()
for uri, prefix in namespaces.items():
if prefix in values:
del namespaces[uri]

namespaces.update(self._new_namespaces)
cnames = {}

# Write tag name (cname)
tag, _ = self._cname(tag, namespaces, cnames)
self.write("<" + tag)
# Write attributes, including namespace declarations
# TODO: Handle ns declarations separately
if attributes or kwargs or self._new_namespaces:
if attributes is None:
attributes = {}
else:
attributes = dict(attributes)
for (uri, prefix) in self._new_namespaces:

# Make cnames for the attributes
if attributes:
kwargs.update(attributes)
attributes = sorted([(self._cname(name, namespaces, cnames), value)
for (name, value) in kwargs.iteritems()])

# Write namespace declarations for all new mappings
for (uri, prefix) in sorted(namespaces.iteritems()):
if old_namespaces.get(uri) != prefix:
if prefix:
attributes["xmlns:" + prefix] = uri
self.write(" xmlns:" + prefix + "=\""
+ escape_attribute(uri, self.encoding)
+ "\"")
else:
attributes["xmlns"] = uri
self._new_namespaces = []
attributes = attributes.items() + kwargs.items()
# Sort them, if requested
if callable(self._sort):
attributes = self._sort(attributes, tag)
elif self._sort:
attributes = sorted(attributes)
# Do the actual writing
for name, value in attributes:
name = self._cname(name)
self.write(" " + name + "=\""
+ escape_attribute(value, self.encoding)
+ "\"")
self.write(" xmlns=\""
+ escape_attribute(uri, self.encoding)
+ "\"")

# Write the attributes
if callable(self._sort):
attributes = self._sort(attributes, tag)
elif self._sort:
attributes.sort(key=lambda x: x[0][1])
for ((cname, name), value) in attributes:
self.write(" " + cname + "=\""
+ escape_attribute(value, self.encoding)
+ "\"")

self._start_tag_open = True
self._wrote_data = False
self._tags.append(tag)
self._tags.append((tag, namespaces, cnames))

def end(self, tag=None):
"""Close the most recently opened element.
Expand All @@ -227,9 +244,9 @@ def end(self, tag=None):
element, or an `XMLSyntaxError will be raised.
"""
open_tag = self._tags.pop()
open_tag, namespaces, cnames = self._tags.pop()
if tag is not None:
tag = self._cname(tag)
tag = self._cname(tag, namespaces, cnames)
if open_tag != tag:
raise XMLSyntaxError("Start and end tag mismatch: %s and /%s."
% (open_tag, tag))
Expand All @@ -247,15 +264,11 @@ def end(self, tag=None):
def start_ns(self, prefix, uri):
"""Add a namespace declaration to the scope of the next
element."""
new_map = self._namespace_maps[-1].copy()
new_map[uri] = prefix
self._namespace_maps.append(new_map)
self._new_namespaces.append((uri, prefix))
self._new_namespaces[uri] = prefix

def end_ns(self):
"""End a namespace scope."""
# TODO: remove this, as it should be handled automatically
del self._namespace_maps[-1]
pass

def data(self, data):
"""Add character data."""
Expand Down
9 changes: 5 additions & 4 deletions test/test_streamxmlwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def testPrefixedElement(self):
def testDefaultUnbinding(self):
w, out = writer_and_output()
w.start_ns("", "http://example.org/ns")
w.start("foo")
w.start("{http://example.org/ns}foo")
w.start_ns("", "")
w.start("foo")
w.close()
Expand Down Expand Up @@ -188,9 +188,10 @@ def testAttributesSameLocalOnePrefixedOneDefault(self):
w, out = writer_and_output()
w.start_ns("", "http://example.org/ns1")
w.start_ns("a", "http://example.org/ns2")
w.start("foo")
w.start("bar", {"{http://example.org/ns1}attr": "1",
"{http://example.org/ns2}attr": "2"})
w.start("{http://example.org/ns1}foo")
w.start("{http://example.org/ns1}bar",
{"{http://example.org/ns1}attr": "1",
"{http://example.org/ns2}attr": "2"})
w.close()
self.assertEquals(out.getvalue(),
'<foo xmlns="http://example.org/ns1"'
Expand Down

0 comments on commit fc3e46f

Please sign in to comment.