Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 36 additions & 6 deletions src/uiprotect/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
import contextlib
import hashlib
import logging
import os
import random
import re
import sys
import tempfile
import time
import warnings
from datetime import datetime, timedelta
Expand Down Expand Up @@ -1035,14 +1037,44 @@ async def _update_last_token_cookie(self, response: aiohttp.ClientResponse) -> N
await self._update_auth_config(self._last_token_cookie)
self._last_token_cookie_decode = None

async def _write_session_config_atomic(self, config: dict[str, Any]) -> None:
"""Write session config atomically with owner-only permissions."""
await aos.makedirs(self.config_dir, exist_ok=True)
if sys.platform != "win32":
await asyncio.to_thread(os.chmod, self.config_dir, 0o700)

config_data = orjson.dumps(config, option=orjson.OPT_INDENT_2)

if sys.platform == "win32": # pragma: no cover
# Windows does not use POSIX permissions; write directly.
async with aiofiles.open(self.config_file, "wb") as f:
await f.write(config_data)
return

# mkstemp creates with mode 0o600 on POSIX — no explicit chmod needed.
fd, tmp_str = await asyncio.to_thread(
tempfile.mkstemp,
dir=self.config_dir,
prefix=".unifi_protect_",
suffix=".tmp",
)
tmp_path = Path(tmp_str)
try:
await asyncio.to_thread(os.close, fd) # release fd so aiofiles can open
async with aiofiles.open(tmp_path, "wb") as f:
await f.write(config_data)
await aos.replace(str(tmp_path), str(self.config_file))
except Exception:
with contextlib.suppress(OSError):
await aos.remove(str(tmp_path))
raise

async def _update_auth_config(self, cookie: Morsel[str]) -> None:
"""Updates auth cookie on disk for persistent sessions."""
username = self._username
if self._last_token_cookie is None or self._public_only or username is None:
return

await aos.makedirs(self.config_dir, exist_ok=True)

config: dict[str, Any] = {}
session_hash = get_user_hash(str(self._url), username)
try:
Expand All @@ -1064,8 +1096,7 @@ async def _update_auth_config(self, cookie: Morsel[str]) -> None:
"csrf": self.headers.get("x-csrf-token") if self.headers else None,
}

async with aiofiles.open(self.config_file, "wb") as f:
await f.write(orjson.dumps(config, option=orjson.OPT_INDENT_2))
await self._write_session_config_atomic(config)

async def _load_session(self) -> None:
if self._public_only:
Expand Down Expand Up @@ -1179,8 +1210,7 @@ async def clear_session(self) -> None:
if "sessions" in config and session_hash in config["sessions"]:
del config["sessions"][session_hash]

async with aiofiles.open(self.config_file, "wb") as f:
await f.write(orjson.dumps(config, option=orjson.OPT_INDENT_2))
await self._write_session_config_atomic(config)

_LOGGER.debug("Cleared session for %s", session_hash)

Expand Down
150 changes: 150 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import asyncio
import logging
import stat
import sys
from copy import deepcopy
from datetime import datetime, timedelta
from io import BytesIO
Expand Down Expand Up @@ -4189,3 +4191,151 @@ async def test_create_talkback_session_public(protect_client: ProtectApiClient):
method="post",
public_api=True,
)


# Session config file permission tests


@pytest.mark.asyncio
@pytest.mark.skipif(sys.platform == "win32", reason="POSIX-only permission check")
async def test_write_session_config_atomic_file_permissions(tmp_path: Path) -> None:
"""File written by _write_session_config_atomic must be owner-readable only (0o600)."""
client = ProtectApiClient(
"127.0.0.1",
0,
"test_user",
"test_pass",
verify_ssl=False,
store_sessions=True,
config_dir=tmp_path,
)

await client._write_session_config_atomic({"sessions": {}})

config_file = tmp_path / "unifi_protect.json"
assert await aos.path.exists(config_file)
file_stat = await aos.stat(config_file)
file_mode = stat.S_IMODE(file_stat.st_mode)
assert file_mode == 0o600, f"Expected 0o600, got {oct(file_mode)}"


@pytest.mark.asyncio
@pytest.mark.skipif(sys.platform == "win32", reason="POSIX-only permission check")
async def test_write_session_config_atomic_dir_permissions(tmp_path: Path) -> None:
"""Config directory is tightened to 0o700 by _write_session_config_atomic."""
config_dir = tmp_path / "ufp"
await aos.mkdir(config_dir, mode=0o755)

client = ProtectApiClient(
"127.0.0.1",
0,
"test_user",
"test_pass",
verify_ssl=False,
store_sessions=True,
config_dir=config_dir,
)

await client._write_session_config_atomic({"sessions": {}})

dir_stat = await aos.stat(config_dir)
dir_mode = stat.S_IMODE(dir_stat.st_mode)
assert dir_mode == 0o700, f"Expected 0o700, got {oct(dir_mode)}"


@pytest.mark.asyncio
@pytest.mark.skipif(sys.platform == "win32", reason="POSIX-only atomic-write check")
async def test_write_session_config_atomic_no_tmp_file_left(tmp_path: Path) -> None:
"""No .tmp file must remain in the config directory after a successful write."""
client = ProtectApiClient(
"127.0.0.1",
0,
"test_user",
"test_pass",
verify_ssl=False,
store_sessions=True,
config_dir=tmp_path,
)

await client._write_session_config_atomic({"sessions": {}})

# Use async listdir and filter rather than sync glob()
entries = await aos.listdir(tmp_path)
tmp_files = [
e for e in entries if e.startswith(".unifi_protect_") and e.endswith(".tmp")
]
assert tmp_files == [], f"Unexpected tmp files: {tmp_files}"


@pytest.mark.asyncio
@pytest.mark.skipif(sys.platform == "win32", reason="POSIX-only atomic-write check")
async def test_write_session_config_atomic_cleans_up_tmp_on_write_error(
tmp_path: Path,
) -> None:
"""Temp file is removed when the write fails, and the exception propagates."""
client = ProtectApiClient(
"127.0.0.1",
0,
"test_user",
"test_pass",
verify_ssl=False,
store_sessions=True,
config_dir=tmp_path,
)

# Patch aiofiles.open to raise after the temp file has been created via mkstemp.
with (
patch(
"uiprotect.api.aiofiles.open",
side_effect=OSError("simulated write failure"),
),
pytest.raises(OSError, match="simulated write failure"),
):
await client._write_session_config_atomic({"sessions": {}})

# No orphaned tmp file should remain — the except-block cleans it up.
entries = await aos.listdir(tmp_path)
tmp_files = [
e for e in entries if e.startswith(".unifi_protect_") and e.endswith(".tmp")
]
assert tmp_files == [], f"Orphaned tmp files: {tmp_files}"


@pytest.mark.asyncio
@pytest.mark.skipif(sys.platform == "win32", reason="POSIX-only permission check")
async def test_clear_session_file_permissions(tmp_path: Path) -> None:
"""clear_session must rewrite the config file with owner-only permissions (0o600)."""
client = ProtectApiClient(
"127.0.0.1",
0,
"test_user",
"test_pass",
verify_ssl=False,
store_sessions=True,
config_dir=tmp_path,
)

session_hash = get_user_hash(str(client._url), "test_user")
config = {
"sessions": {
session_hash: {
"metadata": {"path": "/"},
"cookiename": "TOKEN",
"value": "some_token",
"csrf": "some-csrf",
},
}
}

# Write an initial config file (permissions not yet controlled)
config_file = tmp_path / "unifi_protect.json"
await async_write_bytes(config_file, orjson.dumps(config))

await client.clear_session()

# After clear_session the file should still exist (sessions key now empty)
# and its permissions should be 0o600.
assert await aos.path.exists(config_file)
file_stat = await aos.stat(config_file)
file_mode = stat.S_IMODE(file_stat.st_mode)
assert file_mode == 0o600, f"Expected 0o600, got {oct(file_mode)}"