Skip to content

Commit

Permalink
fix: Require folders requested via a service to define its root. Refs #…
Browse files Browse the repository at this point in the history
  • Loading branch information
ecederstrand committed Apr 16, 2024
1 parent 17bc8d6 commit f65079f
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 99 deletions.
32 changes: 19 additions & 13 deletions exchangelib/folders/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,22 +214,11 @@ def tree(self):
return tree.strip()

@classmethod
def get_distinguished(cls, account):
"""Get the distinguished folder for this folder class.
:param account:
:return:
"""
def _get_distinguished(cls, folder):
if not cls.DISTINGUISHED_FOLDER_ID:
raise ValueError(f"Class {cls} must have a DISTINGUISHED_FOLDER_ID value")
try:
return cls.resolve(
account=account,
folder=DistinguishedFolderId(
id=cls.DISTINGUISHED_FOLDER_ID,
mailbox=Mailbox(email_address=account.primary_smtp_address),
),
)
return cls.resolve(account=folder.account, folder=folder)
except MISSING_FOLDER_ERRORS as e:
raise ErrorFolderNotFound(f"Could not find distinguished folder {cls.DISTINGUISHED_FOLDER_ID!r} ({e})")

Expand Down Expand Up @@ -902,6 +891,23 @@ def clean(self, version=None):
if self.root and not isinstance(self.root, RootOfHierarchy):
raise InvalidTypeError("root", self.root, RootOfHierarchy)

@classmethod
def get_distinguished(cls, root):
"""Get the distinguished folder for this folder class.
:param root:
:return:
"""
return cls._get_distinguished(
folder=cls(
_distinguished_id=DistinguishedFolderId(
id=cls.DISTINGUISHED_FOLDER_ID,
mailbox=Mailbox(email_address=root.account.primary_smtp_address),
),
root=root,
)
)

@classmethod
def from_xml_with_root(cls, elem, root):
folder = cls.from_xml(elem=elem, account=root.account)
Expand Down
14 changes: 13 additions & 1 deletion exchangelib/folders/queryset.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,23 @@ def get(self, *args, **kwargs):
"""Return the query result as exactly one item. Raises DoesNotExist if there are no results, and
MultipleObjectsReturned if there are multiple results.
"""
from .base import Folder
from .collections import FolderCollection

if not args and set(kwargs) in ({"id"}, {"id", "changekey"}):
roots = {f.root for f in self.folder_collection.folders}
if len(roots) != 1:
raise ValueError(f"All folders must have the same root hierarchy ({roots})")
folders = list(
FolderCollection(account=self.folder_collection.account, folders=[FolderId(**kwargs)]).resolve()
FolderCollection(
account=self.folder_collection.account,
folders=[
Folder(
_id=FolderId(**kwargs),
root=roots.pop(),
)
],
).resolve()
)
elif args or kwargs:
folders = list(self.filter(*args, **kwargs))
Expand Down
30 changes: 25 additions & 5 deletions exchangelib/folders/roots.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,16 @@ def get_default_folder(self, folder_cls):
return f
try:
log.debug("Requesting distinguished %s folder explicitly", folder_cls)
return folder_cls.get_distinguished(account=self.account)
return folder_cls.get_distinguished(root=self)
except ErrorAccessDenied:
# Maybe we just don't have GetFolder access? Try FindItem instead
log.debug("Testing default %s folder with FindItem", folder_cls)
fld = folder_cls(
root=self,
_distinguished_id=DistinguishedFolderId(
id=folder_cls.DISTINGUISHED_FOLDER_ID,
mailbox=Mailbox(email_address=self.account.primary_smtp_address),
),
root=self,
)
fld.test_access()
return self._folders_map.get(fld.id, fld) # Use cached instance if available
Expand All @@ -135,6 +135,23 @@ def get_default_folder(self, folder_cls):
pass
raise ErrorFolderNotFound(f"No usable default {folder_cls} folders")

@classmethod
def get_distinguished(cls, account):
"""Get the distinguished folder for this folder class.
:param account:
:return:
"""
return cls._get_distinguished(
folder=cls(
_distinguished_id=DistinguishedFolderId(
id=cls.DISTINGUISHED_FOLDER_ID,
mailbox=Mailbox(email_address=account.primary_smtp_address),
),
account=account,
)
)

@property
def _folders_map(self):
if self._subfolders is not None:
Expand All @@ -145,9 +162,12 @@ def _folders_map(self):
# so we are sure to apply the correct Folder class, then fetch all sub-folders of this root.
folders_map = {self.id: self}
distinguished_folders = [
DistinguishedFolderId(
id=cls.DISTINGUISHED_FOLDER_ID,
mailbox=Mailbox(email_address=self.account.primary_smtp_address),
cls(
_distinguished_id=DistinguishedFolderId(
id=cls.DISTINGUISHED_FOLDER_ID,
mailbox=Mailbox(email_address=self.account.primary_smtp_address),
),
root=self,
)
for cls in self.WELLKNOWN_FOLDERS
if cls.get_folder_allowed and cls.supports_version(self.account.version)
Expand Down
41 changes: 7 additions & 34 deletions exchangelib/services/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,9 @@
SOAPError,
TransportError,
)
from ..folders import ArchiveRoot, BaseFolder, Folder, PublicFoldersRoot, Root, RootOfHierarchy
from ..folders import BaseFolder, Folder, RootOfHierarchy
from ..items import BaseItem
from ..properties import (
BaseItemId,
DistinguishedFolderId,
ExceptionFieldURI,
ExtendedFieldURI,
FieldURI,
FolderId,
IndexedFieldURI,
ItemId,
)
from ..properties import BaseItemId, ExceptionFieldURI, ExtendedFieldURI, FieldURI, FolderId, IndexedFieldURI, ItemId
from ..transport import DEFAULT_ENCODING
from ..util import (
ENS,
Expand Down Expand Up @@ -978,27 +969,9 @@ def attachment_ids_element(items, version, tag="m:AttachmentIds"):
return _ids_element(items, AttachmentId, version, tag)


def parse_folder_elem(elem, folder, account):
def parse_folder_elem(elem, folder):
if isinstance(folder, RootOfHierarchy):
f = folder.from_xml(elem=elem, account=folder.account)
elif isinstance(folder, Folder):
f = folder.from_xml_with_root(elem=elem, root=folder.root)
elif isinstance(folder, DistinguishedFolderId):
# We don't know the root or even account, but we need to attach the folder to something if we want to make
# future requests with this folder. Use 'account' but make sure to always use the distinguished folder ID going
# forward, instead of referencing anything connected to 'account'.
roots = (Root, ArchiveRoot, PublicFoldersRoot)
for cls in roots + tuple(chain(*(r.WELLKNOWN_FOLDERS for r in roots))):
if cls.DISTINGUISHED_FOLDER_ID == folder.id:
folder_cls = cls
break
else:
raise ValueError(f"Unknown distinguished folder ID: {folder.id}")
if folder_cls in roots:
f = folder_cls.from_xml(elem=elem, account=account)
else:
f = folder_cls.from_xml_with_root(elem=elem, root=account.root)
else:
# 'folder' is a generic FolderId instance. We don't know the root so assume account.root.
f = Folder.from_xml_with_root(elem=elem, root=account.root)
return f
return folder.from_xml(elem=elem, account=folder.account)
if isinstance(folder, Folder):
return folder.from_xml_with_root(elem=elem, root=folder.root)
raise ValueError(f"Unsupported folder class: {folder}")
2 changes: 1 addition & 1 deletion exchangelib/services/create_folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def _elems_to_objs(self, elems):
if isinstance(elem, Exception):
yield elem
continue
yield parse_folder_elem(elem=elem, folder=folder, account=self.account)
yield parse_folder_elem(elem=elem, folder=folder)

def get_payload(self, folders, parent_folder):
payload = create_element(f"m:{self.SERVICE_NAME}")
Expand Down
2 changes: 1 addition & 1 deletion exchangelib/services/get_folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def _elems_to_objs(self, elems):
if isinstance(elem, Exception):
yield elem
continue
yield parse_folder_elem(elem=elem, folder=folder, account=self.account)
yield parse_folder_elem(elem=elem, folder=folder)

def get_payload(self, folders, additional_fields, shape):
payload = create_element(f"m:{self.SERVICE_NAME}")
Expand Down
2 changes: 1 addition & 1 deletion exchangelib/services/sync_folder_hierarchy.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def _elem_to_obj(self, elem):
# We can't find() the element because we don't know which tag to look for. The change element can
# contain multiple folder types, each with their own tag.
folder_elem = elem[0]
folder = parse_folder_elem(elem=folder_elem, folder=self.folder, account=self.account)
folder = parse_folder_elem(elem=folder_elem, folder=self.folder)
return change_type, folder

def get_payload(self, folder, shape, additional_fields, sync_state):
Expand Down
2 changes: 1 addition & 1 deletion exchangelib/services/update_folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def _elems_to_objs(self, elems):
if isinstance(elem, Exception):
yield elem
continue
yield parse_folder_elem(elem=elem, folder=folder, account=self.account)
yield parse_folder_elem(elem=elem, folder=folder)

@staticmethod
def _target_elem(target):
Expand Down
4 changes: 2 additions & 2 deletions tests/test_account.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def test_get_default_folder(self):

class MockCalendar1(Calendar):
@classmethod
def get_distinguished(cls, account):
def get_distinguished(cls, root):
raise ErrorAccessDenied("foo")

# Test an indirect folder lookup with FindItem, when we're not allowed to do a GetFolder. We don't get the
Expand All @@ -132,7 +132,7 @@ def get_distinguished(cls, account):

class MockCalendar2(Calendar):
@classmethod
def get_distinguished(cls, account):
def get_distinguished(cls, root):
raise ErrorFolderNotFound("foo")

# Test using the one folder of this folder type
Expand Down
62 changes: 22 additions & 40 deletions tests/test_folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,10 @@ def test_public_folders_root(self, m):
],
)
# Test top-level .children
self.assertListEqual(
[f.name for f in self.account.public_folders_root.children], ["Sample Contacts", "Sample Folder"]
)
children = list(self.account.public_folders_root.children)
self.assertListEqual([f.name for f in children], ["Sample Contacts", "Sample Folder"])
for f in children:
self.assertIsInstance(f.root, PublicFoldersRoot)

find_public_subfolder1_children_xml = b"""\
<?xml version="1.0" ?>
Expand Down Expand Up @@ -379,11 +380,12 @@ def test_public_folders_root(self, m):
# Test .get_children() on subfolders
f_1 = self.account.public_folders_root / "Sample Contacts"
f_2 = self.account.public_folders_root / "Sample Folder"
self.assertListEqual(
[f.name for f in self.account.public_folders_root.get_children(f_1)],
["Sample Subfolder1", "Sample Subfolder2"],
)
self.assertListEqual([f.name for f in self.account.public_folders_root.get_children(f_2)], [])
f_1_children = list(self.account.public_folders_root.get_children(f_1))
self.assertListEqual([f.name for f in f_1_children], ["Sample Subfolder1", "Sample Subfolder2"])
for f in f_1_children:
self.assertIsInstance(f.root, PublicFoldersRoot)
f_2_children = list(self.account.public_folders_root.get_children(f_2))
self.assertListEqual([f.name for f in f_2_children], [])

def test_invalid_deletefolder_args(self):
with self.assertRaises(ValueError) as e:
Expand Down Expand Up @@ -481,39 +483,19 @@ def test_get_folders(self):
folders = list(FolderCollection(account=self.account, folders=[self.account.root]).get_folders())
self.assertEqual(len(folders), 1, sorted(f.name for f in folders))

# Test that GetFolder can handle FolderId instances
folders = list(
FolderCollection(
account=self.account,
folders=[
DistinguishedFolderId(
id=Inbox.DISTINGUISHED_FOLDER_ID,
mailbox=Mailbox(email_address=self.account.primary_smtp_address),
)
],
).get_folders()
)
self.assertEqual(len(folders), 1, sorted(f.name for f in folders))

def test_get_folders_with_distinguished_id(self):
# Test that we return an Inbox instance and not a generic Messages or Folder instance when we call GetFolder
# with a DistinguishedFolderId instance with an ID of Inbox.DISTINGUISHED_FOLDER_ID.
inbox_folder_id = DistinguishedFolderId(
id=Inbox.DISTINGUISHED_FOLDER_ID,
mailbox=Mailbox(email_address=self.account.primary_smtp_address),
)
inbox = list(
GetFolder(account=self.account).call(
folders=[inbox_folder_id],
shape="IdOnly",
additional_fields=[],
# Test that GetFolder cannot handle FolderId instances
with self.assertRaises(ValueError):
list(
FolderCollection(
account=self.account,
folders=[
DistinguishedFolderId(
id=Inbox.DISTINGUISHED_FOLDER_ID,
mailbox=Mailbox(email_address=self.account.primary_smtp_address),
)
],
).get_folders()
)
)[0]
self.assertIsInstance(inbox, Inbox)

# Test via SingleFolderQuerySet
inbox = SingleFolderQuerySet(account=self.account, folder=inbox_folder_id).resolve()
self.assertIsInstance(inbox, Inbox)

def test_folder_grouping(self):
# If you get errors here, you probably need to fill out [folder class].LOCALIZED_NAMES for your locale.
Expand Down

0 comments on commit f65079f

Please sign in to comment.