refactor(rag): knowledge runtime 配置传递改用引用模式#1021
refactor(rag): knowledge runtime 配置传递改用引用模式#1021sunnights wants to merge 2 commits intowecode-ai:mainfrom
Conversation
- Shared Models: 添加 KnowledgeBaseReference/RetrieverReference 引用类型 - Knowledge Runtime: 新增数据库连接和 CRD 解析逻辑 - RemoteGateway: 只传引用不传完整配置,敏感信息不在 HTTP 请求中传递 - LocalGateway 保持不变,继续使用完整配置模式
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (4)
💤 Files with no reviewable changes (1)
🚧 Files skipped from review as they are similar to previous changes (3)
📝 WalkthroughWalkthroughThe PR switches RAG to a reference-mode flow: the backend sends lightweight references (KnowledgeBaseReference, RetrieverReference, EmbeddingModelReference) instead of full configs; Knowledge Runtime gains a DB-backed RuntimeResolver and SQLAlchemy initialization via DATABASE_URL to resolve those references at runtime. Changes
Sequence Diagram(s)sequenceDiagram
participant Backend as Backend<br/>Gateway
participant KR as Knowledge<br/>Runtime
participant DB as Database
participant Storage as Storage<br/>Backend
Backend->>KR: RemoteQueryRequest\n(knowledge_base_references, user_id)
activate KR
KR->>KR: _resolve_references()
KR->>DB: fetch KnowledgeBase / Retriever / Model CRDs
DB-->>KR: CRD JSON (configs, secrets)
KR->>KR: decrypt & build RemoteKnowledgeBaseQueryConfig
KR->>Storage: create storage using resolved retriever_config
KR->>Storage: perform retrieval/indexing
Storage-->>KR: results
KR-->>Backend: aggregated Query/Index response
deactivate KR
sequenceDiagram
participant Backend as Backend<br/>Gateway
participant KR as Knowledge<br/>Runtime
participant DB as Database
Backend->>KR: RemoteTestConnectionRequest\n(retriever_reference)
activate KR
KR->>DB: fetch Retriever CRD by name/namespace/user_id
DB-->>KR: retriever CRD (credentials)
KR->>KR: resolve_retriever_config_for_test()
KR->>KR: instantiate storage client and test connection
KR-->>Backend: connection test result
deactivate KR
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
knowledge_runtime/knowledge_runtime/services/index_executor.py (1)
56-81:⚠️ Potential issue | 🟠 MajorValidate the top-level KB id against the resolved reference.
This resolves configuration from
request.knowledge_base_referencebut indexes/logs usingrequest.knowledge_base_id. If those differ, the request can index into one KB with another KB’s storage/owner config.Proposed fix
kb_config = self._resolver.resolve_knowledge_base_query_config( knowledge_base_id=request.knowledge_base_reference.knowledge_base_id, user_id=request.knowledge_base_reference.user_id, user_name=request.user_name, ) + if request.knowledge_base_id != kb_config.knowledge_base_id: + raise ValueError( + "knowledge_base_id must match knowledge_base_reference.knowledge_base_id" + ) # Create storage backend from resolved retriever config storage_backend = create_storage_backend_from_runtime_config( kb_config.retriever_config @@ # Build knowledge_id from knowledge_base_id - knowledge_id = str(request.knowledge_base_id) + knowledge_id = str(kb_config.knowledge_base_id) logger.info( - f"Indexing document for knowledge_base_id={request.knowledge_base_id}, " + f"Indexing document for knowledge_base_id={kb_config.knowledge_base_id}, " f"source_file={source_file}, user_id={kb_config.index_owner_user_id}" )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@knowledge_runtime/knowledge_runtime/services/index_executor.py` around lines 56 - 81, The code resolves KB config from request.knowledge_base_reference via _resolver.resolve_knowledge_base_query_config but then uses request.knowledge_base_id for indexing/logging (knowledge_id and logger.info), allowing a mismatch; update the logic in index executor so you validate that request.knowledge_base_id equals the resolved kb_config.knowledge_base_id (or else raise/return an error), and use kb_config.knowledge_base_id consistently when building knowledge_id, constructing storage/document operations, and in logger.info; reference resolve_knowledge_base_query_config, knowledge_base_reference, knowledge_base_id, kb_config, knowledge_id, and logger.info to locate and fix the checks and assignment.backend/app/services/rag/runtime_specs.py (1)
3-3:⚠️ Potential issue | 🟠 MajorAdd validation to enforce complete connection test configuration.
ConnectionTestRuntimeSpec()can be instantiated with no arguments, but bothRemoteGatewayandLocalGatewayrequire different fields to succeed. RemoteGateway needsretriever_nameanduser_idto construct a RetrieverReference; LocalGateway needsretriever_configforcreate_storage_backend_from_runtime_config(). Add a model validator to ensure one complete configuration mode is provided.Proposed fix
-from pydantic import BaseModel, ConfigDict, Field, field_validator +from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator @@ class ConnectionTestRuntimeSpec(RuntimeSpecModel): @@ # Full config mode field (for LocalGateway) retriever_config: Optional[RuntimeRetrieverConfig] = None + + `@model_validator`(mode="after") + def validate_connection_test_mode(self) -> "ConnectionTestRuntimeSpec": + has_reference = self.retriever_name is not None and self.user_id is not None + has_full_config = self.retriever_config is not None + if not has_reference and not has_full_config: + raise ValueError( + "Connection test requires either retriever_config or " + "retriever_name plus user_id" + ) + return self🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/rag/runtime_specs.py` at line 3, Add a pydantic model-level validator to ConnectionTestRuntimeSpec that enforces exactly one complete configuration path: either the RemoteGateway fields (both retriever_name and user_id present so you can construct a RetrieverReference) OR the LocalGateway path (retriever_config present so create_storage_backend_from_runtime_config() can run); if neither or a mix are provided, raise a ValidationError with a clear message. Implement the validator as a model_validator (or equivalent field_validator used as a model-level check) on ConnectionTestRuntimeSpec and reference the retriever_name, user_id, and retriever_config fields in the check so the constructor fails fast when the configuration is incomplete.
🧹 Nitpick comments (1)
knowledge_runtime/knowledge_runtime/db/__init__.py (1)
96-100: Dispose the SQLAlchemy engine when resetting DB state.
reset_db()drops the engine reference without closing its pool, which can leak open test connections.Proposed fix
def reset_db() -> None: """Reset database connection (useful for testing).""" global _engine, _SessionLocal + if _engine is not None: + _engine.dispose() _engine = None _SessionLocal = None🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@knowledge_runtime/knowledge_runtime/db/__init__.py` around lines 96 - 100, reset_db() currently clears the global _engine reference without closing its connection pool, causing potential connection leaks; update reset_db to check if _engine is not None and call its dispose() method (or dispose(pool) equivalent) before setting _engine = None and _SessionLocal = None so the SQLAlchemy engine is properly closed — locate the reset_db function and add an engine.dispose() guard using the _engine symbol prior to nullifying _engine and _SessionLocal.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/app/services/rag/remote_gateway.py`:
- Around line 208-214: The payload construction in RemoteQueryRequest currently
defaults missing spec.user_id to 0 (user_id=spec.user_id or 0), which can
silently use a system/public user; update the code in remote_gateway.py where
RemoteQueryRequest is built (and the analogous block around the second
occurrence) to require an explicit user_id: validate that spec.user_id is
present and raise an informative exception (or return an error) when it is
missing instead of using 0, and then pass spec.user_id directly into
RemoteQueryRequest.user_id; ensure the validation runs only for remote
reference-mode calls if that context is available.
In `@knowledge_runtime/knowledge_runtime/db/__init__.py`:
- Around line 17-18: Update init_db() to validate the DB connection immediately
after creating the SQLAlchemy engine: after create_engine(...) (the engine
variable created in init_db) open a connection or run a lightweight test query
(e.g., "SELECT 1") so failures (bad creds, missing driver, unreachable service)
surface at startup and the "Database connection initialized successfully" log is
only emitted after the test passes; also ensure SessionLocal/sessionmaker is
still configured after the check. In reset_db(), call engine.dispose() on the
existing engine instance before dropping or nulling the engine reference to
prevent connection pool leaks.
In `@knowledge_runtime/knowledge_runtime/main.py`:
- Around line 37-45: The code currently swallows init_db() failures when
settings.DATABASE_URL is set (using logger.warning), allowing the service to
start but crash later; update the reference-mode DB init block so that when
settings.DATABASE_URL is present and init_db() raises, you log the error (use
logger.error with the exception details) and fail fast by re-raising the
exception or terminating startup (e.g., raise the exception or call
sys.exit(1)); adjust the try/except around init_db() in main.py to ensure the
process does not continue in a misconfigured state.
In `@knowledge_runtime/knowledge_runtime/services/query_executor.py`:
- Around line 50-54: The call to _resolve_references currently uses
request.user_name and trusts each KnowledgeBaseReference.user_id; instead pass
and use the request-level permission context (RemoteQueryRequest.user_id) and
reject any KnowledgeBaseReference whose user_id does not match request.user_id
before resolving. Update the calls that pass request.user_name (both the block
with knowledge_base_references and the other occurrence) to pass
request.user_id, and modify _resolve_references to accept a user_id parameter,
validate reference.user_id == user_id (raise/return an explicit
permission/error) and only then resolve the reference to a full KB config.
In `@knowledge_runtime/knowledge_runtime/services/resolver.py`:
- Around line 58-62: The resolver allows cross-tenant access and uses incorrect
SQLAlchemy boolean checks; update _get_knowledge_base to accept a user_id
parameter and filter KnowledgeBase by id AND user_id, modify _get_retriever and
_get_model_kind so their non-default-namespace branches include explicit filters
for Kind.namespace, Kind.name and Kind.user_id (only fall back to public
resources after tenant-specific lookup fails), and replace all occurrences of
Kind.is_active == True with Kind.is_(True) (six spots referenced in the review).
Ensure you reference the functions _get_knowledge_base, _get_retriever, and
_get_model_kind and apply the triple-field (namespace, name, user_id) Kind
filtering everywhere.
- Around line 390-405: The _decrypt method currently swallows all exceptions
when calling decrypt_api_key, hiding decryption failures; update _decrypt to
either remove the try/except if decrypt_api_key never raises, or (preferred)
catch Exception as e and log the failure before returning the original value
(e.g., use the module logger to call logger.warning("Failed to decrypt
credential: %r", e) or similar) so operators can see AES/key/configuration
errors while still returning the original value on failure.
In `@shared/models/knowledge_runtime_protocol.py`:
- Around line 159-237: The request models (RemoteIndexRequest,
RemoteDeleteDocumentIndexRequest, RemotePurgeKnowledgeIndexRequest,
RemoteDropKnowledgeIndexRequest, RemoteListChunksRequest) accept both
knowledge_base_id and knowledge_base_reference but lack the alignment check that
RemoteQueryRequest has; add a Pydantic validator (e.g., a root_validator or
classmethod validator) to each of these classes to assert that if
knowledge_base_reference.knowledge_base_id is present it equals
knowledge_base_id, and raise a clear ValueError when they differ; alternatively
factor the check into a shared helper/validator mixin used by these classes to
avoid repeating logic.
---
Outside diff comments:
In `@backend/app/services/rag/runtime_specs.py`:
- Line 3: Add a pydantic model-level validator to ConnectionTestRuntimeSpec that
enforces exactly one complete configuration path: either the RemoteGateway
fields (both retriever_name and user_id present so you can construct a
RetrieverReference) OR the LocalGateway path (retriever_config present so
create_storage_backend_from_runtime_config() can run); if neither or a mix are
provided, raise a ValidationError with a clear message. Implement the validator
as a model_validator (or equivalent field_validator used as a model-level check)
on ConnectionTestRuntimeSpec and reference the retriever_name, user_id, and
retriever_config fields in the check so the constructor fails fast when the
configuration is incomplete.
In `@knowledge_runtime/knowledge_runtime/services/index_executor.py`:
- Around line 56-81: The code resolves KB config from
request.knowledge_base_reference via
_resolver.resolve_knowledge_base_query_config but then uses
request.knowledge_base_id for indexing/logging (knowledge_id and logger.info),
allowing a mismatch; update the logic in index executor so you validate that
request.knowledge_base_id equals the resolved kb_config.knowledge_base_id (or
else raise/return an error), and use kb_config.knowledge_base_id consistently
when building knowledge_id, constructing storage/document operations, and in
logger.info; reference resolve_knowledge_base_query_config,
knowledge_base_reference, knowledge_base_id, kb_config, knowledge_id, and
logger.info to locate and fix the checks and assignment.
---
Nitpick comments:
In `@knowledge_runtime/knowledge_runtime/db/__init__.py`:
- Around line 96-100: reset_db() currently clears the global _engine reference
without closing its connection pool, causing potential connection leaks; update
reset_db to check if _engine is not None and call its dispose() method (or
dispose(pool) equivalent) before setting _engine = None and _SessionLocal = None
so the SQLAlchemy engine is properly closed — locate the reset_db function and
add an engine.dispose() guard using the _engine symbol prior to nullifying
_engine and _SessionLocal.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 2900b06c-888c-4cc8-a88f-2b3167b6c904
📒 Files selected for processing (15)
backend/app/services/rag/remote_gateway.pybackend/app/services/rag/runtime_specs.pybackend/tests/services/rag/test_remote_gateway.pydocker-compose.ymlknowledge_runtime/.env.exampleknowledge_runtime/knowledge_runtime/config.pyknowledge_runtime/knowledge_runtime/db/__init__.pyknowledge_runtime/knowledge_runtime/main.pyknowledge_runtime/knowledge_runtime/services/admin_executor.pyknowledge_runtime/knowledge_runtime/services/index_executor.pyknowledge_runtime/knowledge_runtime/services/query_executor.pyknowledge_runtime/knowledge_runtime/services/resolver.pyshared/models/__init__.pyshared/models/knowledge_runtime_protocol.pyshared/tests/test_knowledge_runtime_protocol.py
| payload = RemoteQueryRequest( | ||
| knowledge_base_ids=spec.knowledge_base_ids, | ||
| query=spec.query, | ||
| max_results=spec.max_results, | ||
| knowledge_base_references=kb_references, | ||
| user_id=spec.user_id or 0, | ||
| document_ids=spec.document_ids, |
There was a problem hiding this comment.
Fail fast instead of defaulting missing user_id to 0.
spec.user_id or 0 silently turns an omitted user context into the public/system user context, which can resolve the wrong references. Require user_id explicitly for remote reference-mode calls.
Suggested fix
+ if spec.user_id is None:
+ raise ValueError("user_id is required for remote query reference mode")
+
payload = RemoteQueryRequest(
knowledge_base_ids=spec.knowledge_base_ids,
query=spec.query,
max_results=spec.max_results,
knowledge_base_references=kb_references,
- user_id=spec.user_id or 0,
+ user_id=spec.user_id,
document_ids=spec.document_ids,
metadata_condition=spec.metadata_condition,
user_name=spec.user_name,
enabled_index_families=spec.enabled_index_families,
retrieval_policy=spec.retrieval_policy,
@@
+ if spec.user_id is None:
+ raise ValueError("user_id is required for remote test_connection reference mode")
+
# Build Retriever reference
retriever_reference = RetrieverReference(
name=spec.retriever_name,
namespace=spec.retriever_namespace,
- user_id=spec.user_id or 0,
+ user_id=spec.user_id,
)Also applies to: 367-375
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/services/rag/remote_gateway.py` around lines 208 - 214, The
payload construction in RemoteQueryRequest currently defaults missing
spec.user_id to 0 (user_id=spec.user_id or 0), which can silently use a
system/public user; update the code in remote_gateway.py where
RemoteQueryRequest is built (and the analogous block around the second
occurrence) to require an explicit user_id: validate that spec.user_id is
present and raise an informative exception (or return an error) when it is
missing instead of using 0, and then pass spec.user_id directly into
RemoteQueryRequest.user_id; ensure the validation runs only for remote
reference-mode calls if that context is available.
| from sqlalchemy import create_engine | ||
| from sqlalchemy.orm import Session, sessionmaker |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
SQLAlchemy create_engine lazy connection first connect behavior pool_pre_ping
💡 Result:
SQLAlchemy's create_engine exhibits lazy connection behavior: the Engine and its Pool do not establish actual DBAPI connections until the first connection request via Engine.connect, Engine.execute, or similar methods. The Pool starts empty and creates connections on demand. With pool_pre_ping=True, when a connection (new or reused) is checked out from the pool: - A dialect-specific "ping" (e.g., SELECT 1) is emitted to test liveness. - For the very first connection, this ping occurs immediately after the DBAPI connect succeeds but before application use. - If the ping fails (e.g., DB unavailable), the connect error propagates normally. - If successful, the connection is delivered; failed pings on subsequent checkouts recycle the connection and invalidate older pool connections. Example: engine = create_engine("postgresql://...", pool_pre_ping=True) # No connections yet with engine.connect as conn: # Triggers first DBAPI connect + ping result = conn.execute(text("SELECT 1")) This pessimistic approach detects stale/disconnected connections proactively upon checkout, recommended for production to handle DB restarts/network issues gracefully. Note: does not protect mid-transaction disconnects; transactions must be retried. Official docs confirm pools wait until first use (no pre-creation), and pre_ping runs on every checkout including initials.
Citations:
- 1: http://docs.sqlalchemy.org/en/latest/core/pooling.html
- 2: https://www.sqlalchemy.org/docs/21/core/pooling.html
- 3: http://docs.sqlalchemy.org/en/latest/core/engines.html
- 4: https://docs.sqlalchemy.org/core/pooling.html
- 5: https://sqlalchemy.org/docs/faq/connections.html
- 6: https://docs.sqlalchemy.org/en/14/tutorial/engine.html
- 7: https://www.sqlalchemy.org/docs/21/core/connections.html
- 8: https://sqlalchemy-sqlalchemy.mintlify.app/concepts/engine
- 9: https://github.com/sqlalchemy/sqlalchemy/blob/main/lib/sqlalchemy/engine/create.py
- 10: Can/should sqlalchemy keep pooled connections alive by pinging? sqlalchemy/sqlalchemy#8943
- 11: https://oboe.com/learn/sqlalchemy-engine-architecture-and-connectivity-1i8a8tm/connection-pool-management-1
- 12: https://sqlalchemy-sqlalchemy.mintlify.app/core/pooling
🏁 Script executed:
cat -n knowledge_runtime/knowledge_runtime/db/__init__.py | head -60Repository: wecode-ai/Wegent
Length of output: 2064
🏁 Script executed:
cat -n knowledge_runtime/knowledge_runtime/db/__init__.py | tail -60Repository: wecode-ai/Wegent
Length of output: 1871
Add connection verification at startup in init_db().
Even though pool_pre_ping=True is configured, it only tests connectivity on the first connection checkout. The log message "Database connection initialized successfully" at line 51 is misleading because the database connection is not yet validated. Bad credentials, missing drivers, or an unavailable database service will not fail until the first resolver query. Execute a test query immediately after engine creation to catch these issues at startup.
Additionally, reset_db() should dispose the engine before dropping the reference to avoid connection pool leaks.
Proposed fix
-from sqlalchemy import create_engine
+from sqlalchemy import create_engine, text logger.info("Initializing database connection for Knowledge Runtime")
- _engine = create_engine(
+ engine = create_engine(
settings.DATABASE_URL,
pool_pre_ping=True,
pool_recycle=3600,
echo=False,
)
+ with engine.connect() as connection:
+ connection.execute(text("SELECT 1"))
+
+ _engine = engine
_SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=_engine)
logger.info("Database connection initialized successfully") def reset_db() -> None:
"""Reset database connection (useful for testing)."""
global _engine, _SessionLocal
+ if _engine is not None:
+ _engine.dispose()
_engine = None
_SessionLocal = None🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@knowledge_runtime/knowledge_runtime/db/__init__.py` around lines 17 - 18,
Update init_db() to validate the DB connection immediately after creating the
SQLAlchemy engine: after create_engine(...) (the engine variable created in
init_db) open a connection or run a lightweight test query (e.g., "SELECT 1") so
failures (bad creds, missing driver, unreachable service) surface at startup and
the "Database connection initialized successfully" log is only emitted after the
test passes; also ensure SessionLocal/sessionmaker is still configured after the
check. In reset_db(), call engine.dispose() on the existing engine instance
before dropping or nulling the engine reference to prevent connection pool
leaks.
| # Initialize database connection for reference mode | ||
| if settings.DATABASE_URL: | ||
| try: | ||
| init_db() | ||
| logger.info("Database connection initialized for reference mode") | ||
| except Exception as e: | ||
| logger.warning(f"Failed to initialize database connection: {e}") | ||
| else: | ||
| logger.info("DATABASE_URL not configured, reference mode will not be available") |
There was a problem hiding this comment.
Fail startup when configured reference-mode DB initialization fails.
With DATABASE_URL set, reference mode is expected to work. Swallowing init failures lets the service start healthy but fail later on every reference-mode request.
Proposed fix
# Initialize database connection for reference mode
if settings.DATABASE_URL:
- try:
- init_db()
- logger.info("Database connection initialized for reference mode")
- except Exception as e:
- logger.warning(f"Failed to initialize database connection: {e}")
+ init_db()
+ logger.info("Database connection initialized for reference mode")
else:
logger.info("DATABASE_URL not configured, reference mode will not be available")🧰 Tools
🪛 Ruff (0.15.10)
[warning] 42-42: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@knowledge_runtime/knowledge_runtime/main.py` around lines 37 - 45, The code
currently swallows init_db() failures when settings.DATABASE_URL is set (using
logger.warning), allowing the service to start but crash later; update the
reference-mode DB init block so that when settings.DATABASE_URL is present and
init_db() raises, you log the error (use logger.error with the exception
details) and fail fast by re-raising the exception or terminating startup (e.g.,
raise the exception or call sys.exit(1)); adjust the try/except around init_db()
in main.py to ensure the process does not continue in a misconfigured state.
| # Resolve KB references to full configurations | ||
| kb_configs = self._resolve_references( | ||
| request.knowledge_base_references, | ||
| request.user_name, | ||
| ) |
There was a problem hiding this comment.
Use the request-level user id when resolving KB references.
RemoteQueryRequest.user_id is documented as the permission context, but _resolve_references() ignores it and trusts each reference’s user_id. Reject mismatches before resolving.
Proposed fix
from shared.models import (
+ KnowledgeBaseReference,
RemoteKnowledgeBaseQueryConfig,
RemoteQueryRecord,
RemoteQueryRequest,
RemoteQueryResponse,
)
@@
kb_configs = self._resolve_references(
request.knowledge_base_references,
+ request.user_id,
request.user_name,
)
@@
def _resolve_references(
self,
- references: list[Any],
+ references: list[KnowledgeBaseReference],
+ request_user_id: int,
user_name: str | None,
) -> list[RemoteKnowledgeBaseQueryConfig]:
@@
configs: list[RemoteKnowledgeBaseQueryConfig] = []
for ref in references:
+ if ref.user_id != request_user_id:
+ raise ValueError("KnowledgeBaseReference user_id must match request user_id")
config = self._resolver.resolve_knowledge_base_query_config(
knowledge_base_id=ref.knowledge_base_id,
- user_id=ref.user_id,
+ user_id=request_user_id,
user_name=user_name,
)
configs.append(config)Also applies to: 86-108
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@knowledge_runtime/knowledge_runtime/services/query_executor.py` around lines
50 - 54, The call to _resolve_references currently uses request.user_name and
trusts each KnowledgeBaseReference.user_id; instead pass and use the
request-level permission context (RemoteQueryRequest.user_id) and reject any
KnowledgeBaseReference whose user_id does not match request.user_id before
resolving. Update the calls that pass request.user_name (both the block with
knowledge_base_references and the other occurrence) to pass request.user_id, and
modify _resolve_references to accept a user_id parameter, validate
reference.user_id == user_id (raise/return an explicit permission/error) and
only then resolve the reference to a full KB config.
| with get_db_session() as db: | ||
| # Query KnowledgeBase CRD | ||
| kb = self._get_knowledge_base(db, knowledge_base_id) | ||
| if kb is None: | ||
| raise ValueError(f"KnowledgeBase {knowledge_base_id} not found") |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
rg -n -C2 'Kind\.is_active\s*==\s*True' knowledge_runtime/knowledge_runtime/services/resolver.pyRepository: wecode-ai/Wegent
Length of output: 1377
🏁 Script executed:
#!/bin/bash
# Get the file size first
wc -l knowledge_runtime/knowledge_runtime/services/resolver.py
# Read the problematic sections
echo "=== Lines 50-70 (context around 58-62) ==="
sed -n '50,70p' knowledge_runtime/knowledge_runtime/services/resolver.py
echo -e "\n=== Lines 260-290 ==="
sed -n '260,290p' knowledge_runtime/knowledge_runtime/services/resolver.py
echo -e "\n=== Lines 300-350 ==="
sed -n '300,350p' knowledge_runtime/knowledge_runtime/services/resolver.py
echo -e "\n=== Lines 360-390 ==="
sed -n '360,390p' knowledge_runtime/knowledge_runtime/services/resolver.pyRepository: wecode-ai/Wegent
Length of output: 4451
Scope all Kind lookups by tenant to prevent cross-tenant resource access.
The resolver still lacks user_id filtering in critical lookups:
_get_knowledge_base()(lines 260-282) does not accept or filter byuser_id, allowing any tenant to access any KnowledgeBase by ID_get_retriever()non-default namespace branch (lines 321-330) omitsuser_identirely_get_model_kind()non-default namespace branch (lines 378-387) omitsuser_identirely
Add user_id parameter to _get_knowledge_base() and filter by it alongside ID. For Retriever and Model lookups in non-default namespaces, add explicit user_id filters before fallback to public resources. Also replace all Kind.is_active == True with .is_(True) per SQLAlchemy best practices (6 occurrences at lines 279, 314, 328, 341, 373, 385).
Per coding guidelines, Kind resources must be filtered by all three identifying fields: namespace, name, and user_id.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@knowledge_runtime/knowledge_runtime/services/resolver.py` around lines 58 -
62, The resolver allows cross-tenant access and uses incorrect SQLAlchemy
boolean checks; update _get_knowledge_base to accept a user_id parameter and
filter KnowledgeBase by id AND user_id, modify _get_retriever and
_get_model_kind so their non-default-namespace branches include explicit filters
for Kind.namespace, Kind.name and Kind.user_id (only fall back to public
resources after tenant-specific lookup fails), and replace all occurrences of
Kind.is_active == True with Kind.is_(True) (six spots referenced in the review).
Ensure you reference the functions _get_knowledge_base, _get_retriever, and
_get_model_kind and apply the triple-field (namespace, name, user_id) Kind
filtering everywhere.
| def _decrypt(self, value: Any) -> Any: | ||
| """Decrypt an encrypted value if applicable. | ||
|
|
||
| Args: | ||
| value: The value to decrypt. | ||
|
|
||
| Returns: | ||
| Decrypted value or original if not encrypted. | ||
| """ | ||
| if not value: | ||
| return value | ||
| try: | ||
| return decrypt_api_key(value) | ||
| except Exception: | ||
| # Not encrypted or decryption failed, return original | ||
| return value |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Inspect decrypt_api_key's exception contract so _decrypt can catch only expected failures.
rg -n -C5 'def decrypt_api_key|class .*Decrypt|InvalidToken|ValueError' sharedRepository: wecode-ai/Wegent
Length of output: 17790
🏁 Script executed:
#!/bin/bash
# Get full implementation of decrypt_api_key function
cat -n shared/utils/crypto.py | sed -n '248,280p'Repository: wecode-ai/Wegent
Length of output: 1086
🏁 Script executed:
#!/bin/bash
# Also check the full resolver.py _decrypt implementation
cat -n knowledge_runtime/knowledge_runtime/services/resolver.py | sed -n '385,410p'Repository: wecode-ai/Wegent
Length of output: 918
🏁 Script executed:
#!/bin/bash
# Check if there are custom exception classes in crypto.py
rg -n "class.*Exception|class.*Error" shared/utils/crypto.pyRepository: wecode-ai/Wegent
Length of output: 42
🏁 Script executed:
#!/bin/bash
# Look for how decrypt_api_key is used elsewhere in the codebase
rg -B2 -A3 "decrypt_api_key" --type py | head -100Repository: wecode-ai/Wegent
Length of output: 6284
🏁 Script executed:
#!/bin/bash
# Check what decrypt_sensitive_data does and what exceptions it raises
cat -n shared/utils/crypto.py | sed -n '1,247p' | grep -A 30 "def decrypt_sensitive_data"Repository: wecode-ai/Wegent
Length of output: 1134
🏁 Script executed:
#!/bin/bash
# Look for the full decrypt_sensitive_data implementation
rg -B 2 -A 20 "def decrypt_sensitive_data" shared/utils/crypto.pyRepository: wecode-ai/Wegent
Length of output: 589
🏁 Script executed:
#!/bin/bash
# Check backend implementation to see how they handle decrypt_api_key exceptions
cat -n backend/app/services/kind_impl.py | sed -n '230,250p'Repository: wecode-ai/Wegent
Length of output: 874
🏁 Script executed:
#!/bin/bash
# Get full decrypt_sensitive_data implementation including exception handling
cat -n shared/utils/crypto.py | sed -n '109,165p'Repository: wecode-ai/Wegent
Length of output: 2125
🏁 Script executed:
#!/bin/bash
# Check backend/app/services/kind_impl.py to see how decrypt_api_key exceptions are handled there
rg -B 5 -A 10 "decrypt_api_key" backend/app/services/kind_impl.py | head -80Repository: wecode-ai/Wegent
Length of output: 2124
🏁 Script executed:
#!/bin/bash
# Check if there are any tests for decrypt_api_key that show what exceptions it can raise
cat -n shared/tests/utils/test_crypto.py | grep -A 30 "test.*decrypt"Repository: wecode-ai/Wegent
Length of output: 8421
🏁 Script executed:
#!/bin/bash
# Check if _get_encryption_key can raise exceptions
rg -B 5 -A 15 "def _get_encryption_key" shared/utils/crypto.pyRepository: wecode-ai/Wegent
Length of output: 801
🏁 Script executed:
#!/bin/bash
# Let's see if there are any other callers of decrypt_api_key to understand expected behavior
rg -B 3 -A 3 "decrypt_api_key\(" --type py | grep -E "(except|raise|try)" -A 2 -B 2Repository: wecode-ai/Wegent
Length of output: 2787
🏁 Script executed:
#!/bin/bash
# Check if decrypt_sensitive_data actually gets called and what happens in resolver
cat -n knowledge_runtime/knowledge_runtime/services/resolver.py | sed -n '1,30p'Repository: wecode-ai/Wegent
Length of output: 993
Add logging for decryption failures instead of silently swallowing exceptions.
The _decrypt method catches all exceptions without logging, hiding AES/key/configuration failures. Since decrypt_api_key already handles decryption errors internally, either remove the try-except (the exception should not be raised), or add explicit logging similar to the pattern in backend/app/services/kind_impl.py:
except Exception as e:
logger.warning("Failed to decrypt credential: %r", e)
return valueThis ensures operators have visibility into decryption failures for debugging and monitoring.
🧰 Tools
🪛 Ruff (0.15.10)
[warning] 403-403: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@knowledge_runtime/knowledge_runtime/services/resolver.py` around lines 390 -
405, The _decrypt method currently swallows all exceptions when calling
decrypt_api_key, hiding decryption failures; update _decrypt to either remove
the try/except if decrypt_api_key never raises, or (preferred) catch Exception
as e and log the failure before returning the original value (e.g., use the
module logger to call logger.warning("Failed to decrypt credential: %r", e) or
similar) so operators can see AES/key/configuration errors while still returning
the original value on failure.
| class RemoteIndexRequest(KnowledgeRuntimeProtocolModel): | ||
| """Index request sent from Backend to knowledge_runtime.""" | ||
| """Index request sent from Backend to knowledge_runtime. | ||
|
|
||
| Uses reference mode: only passes KB reference, Runtime resolves full config. | ||
| """ | ||
|
|
||
| knowledge_base_id: int | ||
| document_id: int | None = None | ||
| index_owner_user_id: int | ||
| retriever_config: RuntimeRetrieverConfig | ||
| embedding_model_config: RuntimeEmbeddingModelConfig | ||
| content_ref: ContentRef | ||
|
|
||
| # Reference mode - Runtime resolves full config from database | ||
| knowledge_base_reference: KnowledgeBaseReference | ||
|
|
||
| # Optional overrides | ||
| splitter_config: NormalizedSplitterConfig = Field( | ||
| default_factory=build_runtime_default_splitter_config | ||
| ) | ||
| source_file: str | None = None | ||
| file_extension: str | None = None | ||
| index_families: list[str] = Field(default_factory=lambda: ["chunk_vector"]) | ||
| content_ref: ContentRef | ||
| trace_context: dict[str, Any] | None = None | ||
| user_name: str | None = None | ||
| extensions: dict[str, Any] | None = None | ||
|
|
||
|
|
||
| class RemoteDeleteDocumentIndexRequest(KnowledgeRuntimeProtocolModel): | ||
| """Delete-document-index request sent from Backend to knowledge_runtime.""" | ||
| """Delete-document-index request sent from Backend to knowledge_runtime. | ||
|
|
||
| Uses reference mode: only passes KB reference, Runtime resolves full config. | ||
| """ | ||
|
|
||
| knowledge_base_id: int | ||
| document_ref: str | ||
| index_owner_user_id: int | None = None | ||
| retriever_config: RuntimeRetrieverConfig | ||
|
|
||
| # Reference mode - Runtime resolves full config from database | ||
| knowledge_base_reference: KnowledgeBaseReference | ||
|
|
||
| enabled_index_families: list[str] = Field(default_factory=lambda: ["chunk_vector"]) | ||
| extensions: dict[str, Any] | None = None | ||
|
|
||
|
|
||
| class RemotePurgeKnowledgeIndexRequest(KnowledgeRuntimeProtocolModel): | ||
| """Delete-all-chunks request sent from Backend to knowledge_runtime.""" | ||
| """Delete-all-chunks request sent from Backend to knowledge_runtime. | ||
|
|
||
| Uses reference mode: only passes KB reference, Runtime resolves full config. | ||
| """ | ||
|
|
||
| knowledge_base_id: int | ||
| index_owner_user_id: int | ||
| retriever_config: RuntimeRetrieverConfig | ||
|
|
||
| # Reference mode - Runtime resolves full config from database | ||
| knowledge_base_reference: KnowledgeBaseReference | ||
|
|
||
| extensions: dict[str, Any] | None = None | ||
|
|
||
|
|
||
| class RemoteDropKnowledgeIndexRequest(KnowledgeRuntimeProtocolModel): | ||
| """Drop-physical-index request sent from Backend to knowledge_runtime.""" | ||
| """Drop-physical-index request sent from Backend to knowledge_runtime. | ||
|
|
||
| Uses reference mode: only passes KB reference, Runtime resolves full config. | ||
| """ | ||
|
|
||
| knowledge_base_id: int | ||
| index_owner_user_id: int | ||
| retriever_config: RuntimeRetrieverConfig | ||
|
|
||
| # Reference mode - Runtime resolves full config from database | ||
| knowledge_base_reference: KnowledgeBaseReference | ||
|
|
||
| extensions: dict[str, Any] | None = None | ||
|
|
||
|
|
||
| class RemoteListChunksRequest(KnowledgeRuntimeProtocolModel): | ||
| """List-chunks request sent from Backend to knowledge_runtime.""" | ||
| """List-chunks request sent from Backend to knowledge_runtime. | ||
|
|
||
| Uses reference mode: only passes KB reference, Runtime resolves full config. | ||
| """ | ||
|
|
||
| knowledge_base_id: int | ||
| index_owner_user_id: int | ||
| retriever_config: RuntimeRetrieverConfig | ||
| max_chunks: int = Field(default=10000, gt=0, le=10000) | ||
| query: str | None = None | ||
| metadata_condition: dict[str, Any] | None = None | ||
|
|
||
| # Reference mode - Runtime resolves full config from database | ||
| knowledge_base_reference: KnowledgeBaseReference |
There was a problem hiding this comment.
Validate single-KB references match the request KB ID.
These models accept both knowledge_base_id and knowledge_base_reference.knowledge_base_id, but only RemoteQueryRequest checks alignment. A mismatched admin/index payload can resolve config for one KB while operating on another KB ID.
Suggested fix
class RemoteIndexRequest(KnowledgeRuntimeProtocolModel):
@@
extensions: dict[str, Any] | None = None
+
+ `@model_validator`(mode="after")
+ def validate_knowledge_base_reference(self) -> "RemoteIndexRequest":
+ if self.knowledge_base_reference.knowledge_base_id != self.knowledge_base_id:
+ raise ValueError("knowledge_base_reference must match knowledge_base_id")
+ return self
@@
class RemoteDeleteDocumentIndexRequest(KnowledgeRuntimeProtocolModel):
@@
enabled_index_families: list[str] = Field(default_factory=lambda: ["chunk_vector"])
extensions: dict[str, Any] | None = None
+
+ `@model_validator`(mode="after")
+ def validate_knowledge_base_reference(self) -> "RemoteDeleteDocumentIndexRequest":
+ if self.knowledge_base_reference.knowledge_base_id != self.knowledge_base_id:
+ raise ValueError("knowledge_base_reference must match knowledge_base_id")
+ return self
@@
class RemotePurgeKnowledgeIndexRequest(KnowledgeRuntimeProtocolModel):
@@
extensions: dict[str, Any] | None = None
+
+ `@model_validator`(mode="after")
+ def validate_knowledge_base_reference(self) -> "RemotePurgeKnowledgeIndexRequest":
+ if self.knowledge_base_reference.knowledge_base_id != self.knowledge_base_id:
+ raise ValueError("knowledge_base_reference must match knowledge_base_id")
+ return self
@@
class RemoteDropKnowledgeIndexRequest(KnowledgeRuntimeProtocolModel):
@@
extensions: dict[str, Any] | None = None
+
+ `@model_validator`(mode="after")
+ def validate_knowledge_base_reference(self) -> "RemoteDropKnowledgeIndexRequest":
+ if self.knowledge_base_reference.knowledge_base_id != self.knowledge_base_id:
+ raise ValueError("knowledge_base_reference must match knowledge_base_id")
+ return self
@@
class RemoteListChunksRequest(KnowledgeRuntimeProtocolModel):
@@
extensions: dict[str, Any] | None = None
+
+ `@model_validator`(mode="after")
+ def validate_knowledge_base_reference(self) -> "RemoteListChunksRequest":
+ if self.knowledge_base_reference.knowledge_base_id != self.knowledge_base_id:
+ raise ValueError("knowledge_base_reference must match knowledge_base_id")
+ return self🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@shared/models/knowledge_runtime_protocol.py` around lines 159 - 237, The
request models (RemoteIndexRequest, RemoteDeleteDocumentIndexRequest,
RemotePurgeKnowledgeIndexRequest, RemoteDropKnowledgeIndexRequest,
RemoteListChunksRequest) accept both knowledge_base_id and
knowledge_base_reference but lack the alignment check that RemoteQueryRequest
has; add a Pydantic validator (e.g., a root_validator or classmethod validator)
to each of these classes to assert that if
knowledge_base_reference.knowledge_base_id is present it equals
knowledge_base_id, and raise a clear ValueError when they differ; alternatively
factor the check into a shared helper/validator mixin used by these classes to
avoid repeating logic.
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
knowledge_runtime/tests/test_index_executor.py (2)
184-199:⚠️ Potential issue | 🟠 MajorPatch the resolver before exercising the fetch-error path.
In reference mode,
IndexExecutor.execute()resolvesknowledge_base_referencebefore reaching content fetch, so this test can hit the real resolver/DB path and fail before the mockedContentFetchErroris raised.🧪 Proposed test fix
- async def test_execute_content_fetch_error_propagates(self, index_request) -> None: + async def test_execute_content_fetch_error_propagates( + self, index_request, resolved_kb_config + ) -> None: """Test that content fetch errors propagate correctly.""" from knowledge_runtime.services.content_fetcher import ContentFetchError + executor = IndexExecutor() + with ( + patch.object( + executor._resolver, + "resolve_knowledge_base_query_config", + return_value=resolved_kb_config, + ), patch("httpx.AsyncClient") as mock_client, patch("knowledge_runtime.config._settings", None), ): mock_client.return_value.__aenter__.return_value.get = AsyncMock( side_effect=ContentFetchError("Fetch failed", retryable=True) ) - executor = IndexExecutor() - with pytest.raises(ContentFetchError) as exc_info: await executor.execute(index_request)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@knowledge_runtime/tests/test_index_executor.py` around lines 184 - 199, The test fails to mock the reference resolution step so IndexExecutor.execute() may call the real resolver/DB before hitting the mocked content fetch error; patch the resolver that IndexExecutor uses to resolve knowledge_base_reference (mock the resolve function or the resolver method the executor calls) to return a resolved reference (a dummy resolved object or None as appropriate) before calling await executor.execute(index_request), then keep the existing mock for httpx.AsyncClient that raises ContentFetchError so the test exercises the fetch-error path; reference IndexExecutor.execute and the knowledge_base_reference resolution step when applying the mock.
93-122: 🛠️ Refactor suggestion | 🟠 MajorCreate the executor instance before patching its resolver.
These tests patch
IndexExecutor()._resolveron a temporary instance, then execute a differentIndexExecutor. This relies on the resolver being a shared mutable object and is fragile. Create the executor first and patch the instance that will actually be tested.♻️ Proposed refactor
+ executor = IndexExecutor() + with ( patch.object( - IndexExecutor()._resolver, + executor._resolver, "resolve_knowledge_base_query_config", return_value=resolved_kb_config, ), @@ - executor = IndexExecutor() result = await executor.execute(index_request)Apply the same pattern in
test_execute_storage_error_propagates.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@knowledge_runtime/tests/test_index_executor.py` around lines 93 - 122, The test currently patches IndexExecutor()._resolver on a throwaway instance and then constructs a different IndexExecutor for execution; instantiate the executor first (executor = IndexExecutor()) and patch the resolver on that real instance using patch.object(executor._resolver, "resolve_knowledge_base_query_config", return_value=resolved_kb_config) (and likewise update the second test test_execute_storage_error_propagates), so the patched resolver is the one used by executor.execute; keep the other patches (create_storage_backend_from_runtime_config, create_embedding_model_from_runtime_config, DocumentService, httpx.AsyncClient, config._settings) as-is but apply them in the same with(...) context that patches executor._resolver instead of IndexExecutor()._resolver.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@knowledge_runtime/tests/test_index_executor.py`:
- Around line 184-199: The test fails to mock the reference resolution step so
IndexExecutor.execute() may call the real resolver/DB before hitting the mocked
content fetch error; patch the resolver that IndexExecutor uses to resolve
knowledge_base_reference (mock the resolve function or the resolver method the
executor calls) to return a resolved reference (a dummy resolved object or None
as appropriate) before calling await executor.execute(index_request), then keep
the existing mock for httpx.AsyncClient that raises ContentFetchError so the
test exercises the fetch-error path; reference IndexExecutor.execute and the
knowledge_base_reference resolution step when applying the mock.
- Around line 93-122: The test currently patches IndexExecutor()._resolver on a
throwaway instance and then constructs a different IndexExecutor for execution;
instantiate the executor first (executor = IndexExecutor()) and patch the
resolver on that real instance using patch.object(executor._resolver,
"resolve_knowledge_base_query_config", return_value=resolved_kb_config) (and
likewise update the second test test_execute_storage_error_propagates), so the
patched resolver is the one used by executor.execute; keep the other patches
(create_storage_backend_from_runtime_config,
create_embedding_model_from_runtime_config, DocumentService, httpx.AsyncClient,
config._settings) as-is but apply them in the same with(...) context that
patches executor._resolver instead of IndexExecutor()._resolver.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: ead4a521-f400-4ae3-bfe5-2561ec08cc47
📒 Files selected for processing (3)
knowledge_runtime/tests/test_admin_executor.pyknowledge_runtime/tests/test_index_executor.pyknowledge_runtime/tests/test_query_executor.py
更新 Knowledge Runtime 测试文件以使用引用模式: - test_admin_executor.py: 使用 KnowledgeBaseReference 和 RetrieverReference - test_index_executor.py: 使用 KnowledgeBaseReference - test_query_executor.py: 使用 KnowledgeBaseReference 替代 knowledge_base_configs 所有测试现在 mock resolver 的 resolve 方法,验证引用解析流程
253b62a to
3e18ac9
Compare
Summary by CodeRabbit
New Features
Configuration Updates
Tests