Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 29 additions & 6 deletions backend/app/services/knowledge/knowledge_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"""

import asyncio
import logging
from dataclasses import dataclass
from typing import Optional

Expand Down Expand Up @@ -56,6 +57,9 @@
can_manage_accessible_knowledge_base_documents,
can_manage_accessible_knowledge_document,
)
from app.services.readers.kb_permissions import kb_permission_resolver

logger = logging.getLogger(__name__)


def _build_attachment_filename(name: str, file_extension: str) -> str:
Expand Down Expand Up @@ -444,12 +448,26 @@ def list_knowledge_bases(
# Get knowledge bases bound to group chats where user is a member
bound_kb_ids = KnowledgeService._get_bound_kb_ids_for_user(db, user_id)

# Single query to get personal, team, organization, shared, and bound knowledge bases
# Personal: user_id matches and namespace is "default"
# Team: namespace is in accessible_groups
# External resolver: returns additional KB IDs the user can access
# via extension rules (e.g. department / employee bindings).
# Errors are caught so a faulty extension cannot break the core listing path.
try:
ext_kb_ids = kb_permission_resolver.get_accessible_kb_ids(db, user_id)
except Exception as e:
logger.warning(
f"kb_permissions extension get_accessible_kb_ids failed: {e}; "
"falling back to empty list"
)
ext_kb_ids = []
Comment thread
coderabbitai[bot] marked this conversation as resolved.

# Single query to get personal, team, organization, shared, bound, and
# externally-accessible knowledge bases.
# Personal: user_id matches and namespace is "default"
# Team: namespace is in accessible_groups
# Organization: namespace has level='organization'
# Shared: id is in shared_kb_ids
# Bound: id is in bound_kb_ids (personal KBs bound to group chats)
# Shared: id is in shared_kb_ids
# Bound: id is in bound_kb_ids (personal KBs bound to group chats)
# External: id is in ext_kb_ids (e.g. department / employee bindings)
query = db.query(Kind).filter(
Kind.kind == "KnowledgeBase",
Kind.is_active == True,
Expand All @@ -469,6 +487,9 @@ def list_knowledge_bases(
if bound_kb_ids:
conditions.append(Kind.id.in_(bound_kb_ids))

if ext_kb_ids:
conditions.append(Kind.id.in_(ext_kb_ids))

if conditions:
from sqlalchemy import or_

Expand Down Expand Up @@ -2051,7 +2072,9 @@ def _get_user_kb_permission(
return True, BaseRole.Owner, False

has_access, role, is_creator = knowledge_share_service.get_user_kb_permission(
db, knowledge_base_id, user_id
db,
knowledge_base_id,
user_id,
)

effective_role = BaseRole(role) if role is not None else None
Expand Down
4 changes: 4 additions & 0 deletions backend/app/services/readers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@
from app.services.readers.groups import groupReader
from app.services.readers.group_members import groupMemberReader
from app.services.readers.shared_teams import sharedTeamReader
from app.services.readers.kb_permissions import kb_permission_resolver

bot = kindReader.get_by_name_and_namespace(db, user_id, KindType.BOT, "default", "mybot")
user = userReader.get_by_id(db, user_id)
is_member = groupMemberReader.is_member(db, "group-name", user_id)
ids = kb_permission_resolver.get_accessible_kb_ids(db, user_id)
"""

from app.services.readers.group_members import groupMemberReader
from app.services.readers.groups import groupReader
from app.services.readers.kb_permissions import kb_permission_resolver
from app.services.readers.kinds import KindType, kindReader
from app.services.readers.shared_teams import sharedTeamReader
from app.services.readers.users import userReader
Expand All @@ -32,5 +35,6 @@
"groupReader",
"groupMemberReader",
"sharedTeamReader",
"kb_permission_resolver",
"KindType",
]
216 changes: 216 additions & 0 deletions backend/app/services/readers/kb_permissions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
# SPDX-FileCopyrightText: 2025 Wegent, Inc.
#
# SPDX-License-Identifier: Apache-2.0

"""
Knowledge base external permission resolver extension point.

Loaded via Python entry points mechanism.

Usage (in extension package, e.g. myext/kb_permissions.py):

from app.services.readers.kb_permissions import IKbPermissionResolver

class MyResolver(IKbPermissionResolver):
def __init__(self, base: IKbPermissionResolver):
self._base = base

def resolve(self, db, kb_id, user_id, kb):
# return a role string or None
...

def get_accessible_kb_ids(self, db, user_id):
# return list of kb_ids the user can access
...

Register in pyproject.toml:

[project.entry-points."wegent.kb_permissions"]
my_resolver = "myext.kb_permissions:MyResolver"
"""

import importlib.metadata
import logging
import threading
from abc import ABC, abstractmethod
from typing import Optional

from sqlalchemy.orm import Session

logger = logging.getLogger(__name__)

# Entry point group for KB permission resolvers
ENTRY_POINT_GROUP = "wegent.kb_permissions"


# =============================================================================
# Interface
# =============================================================================


class IKbPermissionResolver(ABC):
"""
Abstract interface for external knowledge base permission resolution.

Implementations return a role string when the external system grants
access, or None to fall through to the built-in permission logic.
"""

@abstractmethod
def resolve(
self,
db: Session,
kb_id: int,
user_id: int,
kb: object,
) -> Optional[str]:
"""
Resolve permission for a single knowledge base access check.

Called after all built-in checks have returned False.

Args:
db: Database session
kb_id: Knowledge base ID
user_id: Requesting user ID
kb: Kind object (knowledge base record)

Returns:
Role string ("Owner"/"Maintainer"/"Developer"/"Reporter") if the
external system grants access, None to continue with built-in
denial.
"""
pass

@abstractmethod
def get_accessible_kb_ids(self, db: Session, user_id: int) -> list[int]:
"""
Return knowledge base IDs accessible to the user via external rules.

Called during list queries to extend the OR conditions. Return an
empty list when there are no additional IDs to include.

Args:
db: Database session
user_id: Requesting user ID

Returns:
List of knowledge base IDs (may be empty).
"""
pass


# =============================================================================
# Default Implementation
# =============================================================================


class DefaultKbPermissionResolver(IKbPermissionResolver):
"""
No-op resolver used when no extension is configured.

Always returns None / [] so no extra permissions are granted.
"""

def resolve(
self,
db: Session,
kb_id: int,
user_id: int,
kb: object,
) -> Optional[str]:
return None

def get_accessible_kb_ids(self, db: Session, user_id: int) -> list[int]:
return []


# =============================================================================
# Loader
# =============================================================================


def _load_from_entry_points(
base: IKbPermissionResolver,
) -> Optional[IKbPermissionResolver]:
"""
Load resolver from entry points.

Args:
base: The base resolver to pass to the loaded resolver's constructor.

Returns:
The loaded resolver instance, or None if no valid entry point found.
"""
try:
entry_points = importlib.metadata.entry_points(group=ENTRY_POINT_GROUP)
except TypeError:
# Python < 3.10 compatibility
all_eps = importlib.metadata.entry_points()
entry_points = all_eps.get(ENTRY_POINT_GROUP, [])

if not entry_points:
return None

# Use the first entry point
ep = next(iter(entry_points))

try:
resolver_class = ep.load()

if not issubclass(resolver_class, IKbPermissionResolver):
logger.error(
f"Entry point {ep.name} ({resolver_class}) does not implement IKbPermissionResolver"
)
return None

result = resolver_class(base)
logger.info(f"KB permission resolver loaded from entry point: {ep.name}")
return result

except Exception as e:
logger.warning(f"Failed to load entry point {ep.name}: {e}", exc_info=True)
return None


def _create_resolver() -> IKbPermissionResolver:
"""Create resolver, loading from entry points if available."""
base: IKbPermissionResolver = DefaultKbPermissionResolver()

# Try to load from entry points
loaded = _load_from_entry_points(base)
if loaded is not None:
return loaded

return base


# =============================================================================
# Lazy Singleton
# =============================================================================


class _LazyReader:
"""Lazy-loaded resolver proxy that delegates to the actual resolver instance."""

_instance: IKbPermissionResolver | None = None
_init_lock: threading.Lock = threading.Lock()

def _get(self) -> IKbPermissionResolver:
if self._instance is None:
with self._init_lock:
# Double-checked locking to ensure only one resolver is created
# under concurrent access.
if self._instance is None:
self._instance = _create_resolver()
return self._instance

def __getattr__(self, name: str):
return getattr(self._get(), name)


# =============================================================================
# Export
# =============================================================================

kb_permission_resolver: IKbPermissionResolver = _LazyReader() # type: ignore
16 changes: 15 additions & 1 deletion backend/app/services/share/knowledge_share_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,13 @@ def get_user_kb_permission(
"""
Get user's permission for a knowledge base.

Priority: creator > explicit permission (ResourceMember) > group permission > task binding
Priority: creator > explicit permission (ResourceMember) > group permission
> task binding > external resolver

Args:
db: Database session
knowledge_base_id: Knowledge base ID
user_id: Requesting user ID

Returns:
Tuple of (has_access, role, is_creator)
Expand Down Expand Up @@ -374,6 +380,14 @@ def get_user_kb_permission(
# User is member of a group chat that has this KB bound
return True, ResourceRole.Reporter.value, False

# External permission resolver (e.g. department / employee bindings).
# Called last so it never interferes with built-in access control.
from app.services.readers.kb_permissions import kb_permission_resolver

ext_role = kb_permission_resolver.resolve(db, knowledge_base_id, user_id, kb)
if ext_role is not None:
return True, ext_role, False

return False, None, False

def check_permission(
Expand Down
Empty file.
Loading
Loading