Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions tests/test_routes_app_permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,20 @@ async def test_grant_network_origin_capability_allowed(client):
assert resp.status_code == 200, resp.text


@pytest.mark.asyncio
@pytest.mark.parametrize("bad", ["network:", "network:not an origin", "network:ftp://x.com"])
async def test_grant_malformed_network_origin_returns_400(client, bad):
# The bare network: prefix passes is_known_capability, but a grant must carry
# a well-formed origin, so a malformed one is rejected and not recorded.
resp = await client.post(
"/api/apps/a/permissions", json={"capability": bad, "decision": "granted"}
)
assert resp.status_code == 400
assert "invalid network origin" in resp.json()["error"]
data = (await client.get("/api/apps/a/permissions")).json()
assert data["grants"] == []


@pytest.mark.asyncio
async def test_empty_capability_returns_400(client):
resp = await client.post(
Expand Down
29 changes: 29 additions & 0 deletions tests/test_routes_userspace_apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,3 +606,32 @@ async def test_broker_gated_cap_denied_when_ledger_grant_absent(client):
)
assert resp.status_code == 200
assert resp.json().get("error") == "permission_denied"


@pytest.mark.asyncio
async def test_broker_ledger_error_falls_back_to_per_app_grants(client):
# Decision-24 merge is best-effort: if the app_grants ledger lookup raises
# (e.g. an uninitialised store or a query error), the broker must fall back
# to the per-app granted set rather than 500.
app = client._transport.app
await _init_userspace_stores(app, app.state.data_dir)
store = app.state.userspace_apps
await _install_test_app(store, permissions=["app.net"])
await store.set_permissions_granted("test-app", ["app.net"])

class _Boom:
async def granted_capabilities(self, *a, **k):
raise RuntimeError("ledger down")

original = app.state.app_grants
app.state.app_grants = _Boom()
try:
resp = await client.post(
"/api/userspace-apps/test-app/broker",
json={"capability": "app.net.fetch", "args": {"url": "http://example.com"}},
)
finally:
app.state.app_grants = original
# No 500; and the per-app grant still authorises the capability.
assert resp.status_code == 200
assert resp.json().get("error") != "permission_denied"
9 changes: 5 additions & 4 deletions tests/userspace/test_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,11 @@ def test_network_permission_origins_validated():
with pytest.raises(PackageError):
parse_manifest(base + "permissions: ['" + bad + "']\n")
# gitar finding: `$` would match before a trailing newline; \Z must not.
from tinyagentos.userspace.package import _NET_ORIGIN_RE
assert _NET_ORIGIN_RE.match("wss://irc-ws.chat.twitch.tv")
assert not _NET_ORIGIN_RE.match("wss://evil.com\n")
assert not _NET_ORIGIN_RE.match("wss://evil.com\n; script-src 'unsafe-inline'")
# NET_ORIGIN_RE is the canonical pattern in capabilities.py, reused here.
from tinyagentos.userspace.capabilities import NET_ORIGIN_RE
assert NET_ORIGIN_RE.match("wss://irc-ws.chat.twitch.tv")
assert not NET_ORIGIN_RE.match("wss://evil.com\n")
assert not NET_ORIGIN_RE.match("wss://evil.com\n; script-src 'unsafe-inline'")


def test_parse_valid_tui_manifest():
Expand Down
12 changes: 11 additions & 1 deletion tinyagentos/routes/app_permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
from pydantic import BaseModel

from tinyagentos.auth_context import CurrentUser, current_user
from tinyagentos.userspace.capabilities import is_known_capability
from tinyagentos.userspace.capabilities import (
NET_PREFIX,
is_known_capability,
is_valid_network_grant,
)

router = APIRouter()

Expand Down Expand Up @@ -56,6 +60,12 @@ async def set_app_permission(
return JSONResponse(
{"error": f"unknown capability: {cap}"}, status_code=400
)
# The parametrized network:<origin> form must carry a well-formed origin; the
# bare prefix is_known_capability accepts is not enough to record a grant.
if cap.startswith(NET_PREFIX) and not is_valid_network_grant(cap):
return JSONResponse(
{"error": f"invalid network origin: {cap}"}, status_code=400
)
try:
rec = await store.set_decision(user.user_id, app_id, cap, decision=body.decision)
except ValueError as e:
Expand Down
15 changes: 14 additions & 1 deletion tinyagentos/routes/userspace_apps.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import logging
import shutil
from pathlib import Path
from urllib.parse import urlparse
Expand All @@ -12,6 +13,8 @@
from tinyagentos.userspace.package import extract_package, PackageError
from tinyagentos.userspace.url_guard import resolve_safe_public_ip

logger = logging.getLogger(__name__)

router = APIRouter()

_SDK_PATH = Path(__file__).resolve().parent.parent / "userspace" / "sdk" / "taos-app-sdk.js"
Expand Down Expand Up @@ -256,7 +259,17 @@ async def broker(request: Request, app_id: str):
uid = getattr(request.state, "user_id", None)
grants_store = getattr(request.app.state, "app_grants", None)
if uid and grants_store is not None:
granted |= await grants_store.granted_capabilities(uid, app_id)
try:
granted |= await grants_store.granted_capabilities(uid, app_id)
except Exception:
# Genuinely best-effort: an uninitialised store or a query error
# must not turn a previously-working broker call into a 500. Fall
# back to the per-app granted set.
logger.warning(
"app_grants lookup failed for app %s; using per-app grants only",
app_id,
exc_info=True,
)
out = await handle_capability(
app_id, body.get("capability", ""), body.get("args") or {},
granted=granted,
Expand Down
23 changes: 20 additions & 3 deletions tinyagentos/userspace/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
"""
from __future__ import annotations

import re

# Granted to every app without consent.
FREE_CAPS = frozenset({"app.kv", "app.table", "app.files", "app.notify", "app.window"})
# Require an explicit granted permission (runtime consent via the Decisions flow).
Expand All @@ -28,13 +30,28 @@
# Parametrized capability prefix: `network:<origin>` allowlists a single origin.
NET_PREFIX = "network:"

# Strict origin format for a `network:<origin>` grant: scheme://host with an
# optional leading "*." subdomain wildcard and an optional :port, nothing else
# (no spaces, semicolons, quotes, paths, or newlines) so it can never inject
# extra sandbox CSP directives. \A and \Z anchor the WHOLE string. This is the
# single source of truth, reused by the package parser and the grant API.
NET_ORIGIN_RE = re.compile(r"\A(?:wss|https)://(?:\*\.)?[A-Za-z0-9.-]+(?::\d+)?\Z")


def is_valid_network_grant(perm: str) -> bool:
"""True if `perm` is a `network:<origin>` grant with a well-formed origin."""
if not isinstance(perm, str) or not perm.startswith(NET_PREFIX):
return False
return bool(NET_ORIGIN_RE.match(perm[len(NET_PREFIX):]))


def is_known_capability(perm: str) -> bool:
"""True if `perm` is a recognised capability the vocabulary permits.

Accepts a bare namespace in KNOWN_CAPS or a `network:<origin>` grant. The
<origin> portion's format is validated by the package parser, not here; this
only classifies whether the capability itself is one the system understands.
Accepts a bare namespace in KNOWN_CAPS or a `network:<origin>` grant. This
only classifies the capability namespace; callers that record or enforce a
grant should additionally check is_valid_network_grant for the parametrized
form so a malformed origin is not accepted.
"""
if not isinstance(perm, str):
return False
Expand Down
18 changes: 9 additions & 9 deletions tinyagentos/userspace/package.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
from __future__ import annotations

import io
import re
import zipfile
from pathlib import Path

import yaml

from tinyagentos.userspace.capabilities import KNOWN_CAPS, NET_PREFIX, is_known_capability
from tinyagentos.userspace.capabilities import (
KNOWN_CAPS,
NET_ORIGIN_RE,
NET_PREFIX,
is_known_capability,
)

# A `network:<origin>` permission lets a granted app's bundle connect to that
# external origin (it is added to the sandbox CSP connect-src). The origin is
# strictly validated so it can never inject extra CSP directives: scheme://host
# with an optional leading "*." subdomain wildcard and an optional :port, and
# nothing else (no spaces, semicolons, quotes, paths, or newlines). \A and \Z
# anchor the WHOLE string -- `$` would also match just before a trailing
# newline, which would let "wss://host\n; ..." slip a newline into the value.
_NET_ORIGIN_RE = re.compile(r"\A(?:wss|https)://(?:\*\.)?[A-Za-z0-9.-]+(?::\d+)?\Z")
# strictly validated by NET_ORIGIN_RE (the canonical pattern in
# capabilities.py) so it can never inject extra CSP directives.

_ALLOWED_TYPES = {"web", "container", "tui"}
_REQUIRED = ("id", "name", "version", "app_type")
Expand Down Expand Up @@ -91,7 +91,7 @@ def parse_manifest(text: str) -> dict:
)
if perm.startswith(NET_PREFIX):
origin = perm[len(NET_PREFIX):]
if not _NET_ORIGIN_RE.match(origin):
if not NET_ORIGIN_RE.match(origin):
raise PackageError(
f"invalid network permission origin {origin!r}: must be "
"wss://host or https://host with an optional *. subdomain "
Expand Down
Loading