Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions backend/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,93 @@ WIKI_MAX_CONTENT_SIZE=10485760
# Base URL for internal wiki content writer
WIKI_CONTENT_WRITE_BASE_URL=http://backend:8000



# ==========================================
# Knowledge Document Conversion Configuration
# 知识库文档转换配置(PDF/PPTX 等转换为 Markdown)
# ==========================================

# Master switch for document conversion feature
# 总开关:true 开启转换功能,false 关闭(走原来直接索引的逻辑)
KNOWLEDGE_CONVERSION_ENABLED=false

# Comma-separated list of file extensions that need conversion to markdown
# 需要转换为 Markdown 的文件扩展名列表(逗号分隔),如:pdf,pptx,docx
# 仅在 KNOWLEDGE_CONVERSION_ENABLED=true 时生效
KNOWLEDGE_CONVERSION_FILE_TYPES=pdf

# Celery queue name for document conversion tasks
# 文档转换任务的 Celery 队列名称
KNOWLEDGE_CONVERSION_QUEUE=knowledge_conversion

# Stale detection timeout for CONVERTING status (seconds)
# 转换状态过期时间(秒),超过此时间视为卡住,可被重新入队
KNOWLEDGE_INDEX_STALE_CONVERTING_SECONDS=1800

# Conversion task distributed lock configuration
# 转换任务分布式锁配置
KNOWLEDGE_CONVERSION_LOCK_TIMEOUT_SECONDS=10000
KNOWLEDGE_CONVERSION_LOCK_EXTEND_INTERVAL_SECONDS=60
KNOWLEDGE_CONVERSION_LOCK_MAX_RETRIES=2
KNOWLEDGE_CONVERSION_LOCK_RETRY_DELAY_SECONDS=30
Comment on lines +233 to +242
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

The converting stale window is shorter than the allowed task runtime.

KNOWLEDGE_INDEX_STALE_CONVERTING_SECONDS=1800 marks a conversion as stuck after 30 minutes, but the conversion task is allowed to run up to 9000s soft / 10000s hard. That means a healthy long-running conversion can be taken over as “stale” long before the worker actually times out, which undermines the new generation/state-machine logic.

🧰 Tools
🪛 dotenv-linter (4.0.0)

[warning] 240-240: [UnorderedKey] The KNOWLEDGE_CONVERSION_LOCK_EXTEND_INTERVAL_SECONDS key should go before the KNOWLEDGE_CONVERSION_LOCK_TIMEOUT_SECONDS key

(UnorderedKey)


[warning] 241-241: [UnorderedKey] The KNOWLEDGE_CONVERSION_LOCK_MAX_RETRIES key should go before the KNOWLEDGE_CONVERSION_LOCK_TIMEOUT_SECONDS key

(UnorderedKey)


[warning] 242-242: [UnorderedKey] The KNOWLEDGE_CONVERSION_LOCK_RETRY_DELAY_SECONDS key should go before the KNOWLEDGE_CONVERSION_LOCK_TIMEOUT_SECONDS key

(UnorderedKey)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/.env.example` around lines 233 - 242,
KNOWLEDGE_INDEX_STALE_CONVERTING_SECONDS (currently 1800) is shorter than the
allowed conversion runtime and will mark healthy long-running conversions as
stuck; change its value to be at least the hard/soft task timeout (e.g., >=
KNOWLEDGE_CONVERSION_LOCK_TIMEOUT_SECONDS / 10000) or derive it from that
constant (set to 10000 or slightly higher) so the stale window never expires
before the worker's permitted runtime.


# ==========================================
# MinerU API Configuration
# PDF 转 Markdown 的 MinerU API 配置
# ==========================================

# Base URL for MinerU API service
# MinerU API 服务地址,为空则禁用 PDF 转换
MINERU_API_BASE_URL=

# MinerU backend type: "pipeline" or other supported backends
MINERU_BACKEND=pipeline

# MinerU parse method: "ocr", "auto", etc.
MINERU_PARSE_METHOD=ocr

# Language list for OCR (comma-separated, e.g., "ch,en")
MINERU_LANG_LIST=ch

# Enable formula recognition
MINERU_FORMULA_ENABLE=true

# Enable table recognition
MINERU_TABLE_ENABLE=true

# Polling interval for task status checks (seconds)
MINERU_POLL_INTERVAL_SECONDS=3

# Maximum time to wait for MinerU task completion (seconds)
MINERU_MAX_WAIT_SECONDS=600

# ==========================================
# Document Conversion S3 Storage Configuration
# MinerU 提取图片的 S3 存储配置
# ==========================================

# Enable S3 upload for extracted images
# 是否启用 S3 图片上传
WORKER_CONVERSION_S3_ENABLED=false

# S3 endpoint URL (e.g., MinIO or AWS S3)
# S3 服务端点地址
WORKER_CONVERSION_S3_ENDPOINT=

# S3 access key
WORKER_CONVERSION_S3_ACCESS_KEY=

# S3 SECRET key
WORKER_CONVERSION_S3_SECRET_KEY=

# S3 bucket name for storing images
# S3 存储桶名称
WORKER_CONVERSION_S3_BUCKET_NAME=

# S3 region name
WORKER_CONVERSION_S3_REGION_NAME=us-east-1

# Data Table Configuration
# JSON string containing table provider credentials (DingTalk, etc.)
# Format: {"dingtalk":{"appKey":"YOUR_APP_KEY","appSecret":"YOUR_APP_secret","operatorId":"YOUR_OPERATOR_ID","userMapping":{"baseId":"YOUR_BASE_ID","sheetId":"YOUR_SHEET_ID"}}}
Expand Down
9 changes: 9 additions & 0 deletions backend/app/core/celery_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
include=[
"app.tasks.subscription_tasks",
"app.tasks.knowledge_tasks",
"app.tasks.conversion_tasks",
],
)

Expand All @@ -67,6 +68,14 @@
task_default_retry_delay=60, # 1 minute default retry delay
# Default queue configuration
task_default_queue=settings.CELERY_TASK_DEFAULT_QUEUE,
# Task routing: conversion tasks go to dedicated queue
# Main worker does NOT consume this queue
# Conversion worker: celery -A app.core.celery_app worker --queues=knowledge_conversion
task_routes={
"app.tasks.conversion_tasks.*": {
"queue": settings.KNOWLEDGE_CONVERSION_QUEUE,
},
},
# Beat schedule for periodic tasks
beat_schedule={
"check-due-subscriptions": {
Expand Down
72 changes: 72 additions & 0 deletions backend/app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,58 @@ def parse_rag_runtime_mode(cls, v: Any) -> str | dict[str, str]:
KNOWLEDGE_INDEX_STALE_QUEUED_SECONDS: int = 600
KNOWLEDGE_INDEX_STALE_INDEXING_SECONDS: int = 2700

# Knowledge document conversion configuration
# Master switch for document conversion feature
# When False, conversion is disabled and files are indexed directly (original behavior)
# When True, files matching KNOWLEDGE_CONVERSION_FILE_TYPES will be converted to markdown
KNOWLEDGE_CONVERSION_ENABLED: bool = False

# Comma-separated list of file extensions that need conversion to markdown
# before indexing. Example: "pdf,pptx,docx"
# Only used when KNOWLEDGE_CONVERSION_ENABLED is True.
KNOWLEDGE_CONVERSION_FILE_TYPES: str = ""

# Celery queue name for document conversion tasks
KNOWLEDGE_CONVERSION_QUEUE: str = "knowledge_conversion"

# Stale detection timeout for CONVERTING status (seconds, default 30 min)
KNOWLEDGE_INDEX_STALE_CONVERTING_SECONDS: int = 1800

# Conversion task distributed lock configuration
# Lock timeout should be longer than task soft_time_limit to prevent premature expiration
KNOWLEDGE_CONVERSION_LOCK_TIMEOUT_SECONDS: int = 2000
KNOWLEDGE_CONVERSION_LOCK_EXTEND_INTERVAL_SECONDS: int = 60
KNOWLEDGE_CONVERSION_LOCK_MAX_RETRIES: int = 2
KNOWLEDGE_CONVERSION_LOCK_RETRY_DELAY_SECONDS: int = 30
Comment on lines +347 to +355
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Default conversion timeouts still undershoot the worker limits.

These defaults can mark or unlock an in-flight conversion long before convert_document_task reaches its 9000/10000 second limits. In particular, Line 348 (1800) is below the allowed runtime, and Line 352 (2000) also contradicts the “longer than task soft_time_limit” comment. Please align the code defaults with the task limits so deployments that rely on defaults don’t reclaim healthy work.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/core/config.py` around lines 347 - 355, Adjust the default
timeouts so they exceed the convert_document_task soft_time_limit (9000/10000s);
specifically update KNOWLEDGE_INDEX_STALE_CONVERTING_SECONDS to a value >= the
task soft limit (e.g., 10000–11000s) and set
KNOWLEDGE_CONVERSION_LOCK_TIMEOUT_SECONDS to be longer than that soft_time_limit
(e.g., slightly above the chosen stale timeout) so the lock comment ("longer
than task soft_time_limit") holds; keep the lock extend/ retry settings
(KNOWLEDGE_CONVERSION_LOCK_EXTEND_INTERVAL_SECONDS,
KNOWLEDGE_CONVERSION_LOCK_MAX_RETRIES,
KNOWLEDGE_CONVERSION_LOCK_RETRY_DELAY_SECONDS) as-is unless you want to adjust
their cadence to match the new longer timeout.


# MinerU API configuration for PDF to Markdown conversion
# Base URL for MinerU API service (e.g., "http://10.2.40.157:8367")
MINERU_API_BASE_URL: str = ""
# MinerU backend type: "pipeline" or other supported backends
MINERU_BACKEND: str = "pipeline"
# MinerU parse method: "ocr", "auto", etc.
MINERU_PARSE_METHOD: str = "ocr"
# Language list for OCR (comma-separated, e.g., "ch,en")
MINERU_LANG_LIST: str = "ch"
# Enable formula recognition
MINERU_FORMULA_ENABLE: bool = True
# Enable table recognition
MINERU_TABLE_ENABLE: bool = True
# Polling interval for task status checks (seconds)
MINERU_POLL_INTERVAL_SECONDS: int = 3
# Maximum time to wait for MinerU task completion (seconds, default 10 min)
MINERU_MAX_WAIT_SECONDS: int = 600

# Document conversion S3 storage configuration for extracted images
# When enabled, images extracted by MinerU will be uploaded to S3
# and markdown image references will be updated to S3 URLs
WORKER_CONVERSION_S3_ENABLED: bool = False
WORKER_CONVERSION_S3_ENDPOINT: str = ""
WORKER_CONVERSION_S3_ACCESS_KEY: str = ""
WORKER_CONVERSION_S3_SECRET_KEY: str = ""
WORKER_CONVERSION_S3_BUCKET_NAME: str = ""
WORKER_CONVERSION_S3_REGION_NAME: str = "us-east-1"

# Circuit breaker configuration
CIRCUIT_BREAKER_FAIL_MAX: int = 5 # Open circuit after 5 consecutive failures
CIRCUIT_BREAKER_RESET_TIMEOUT: int = 60 # Try to recover after 60 seconds
Expand Down Expand Up @@ -595,6 +647,26 @@ def parse_rag_runtime_mode(cls, v: Any) -> str | dict[str, str]:
# Use: from shared.telemetry.config import get_otel_config
# All OTEL_* environment variables are read from there

def needs_conversion(self, file_extension: str) -> bool:
"""Check if a file extension requires conversion before indexing.

Conversion only occurs when:
1. KNOWLEDGE_CONVERSION_ENABLED is True (master switch)
2. KNOWLEDGE_CONVERSION_FILE_TYPES is not empty
3. The file extension is in the conversion list
"""
if not self.KNOWLEDGE_CONVERSION_ENABLED:
return False
if not self.KNOWLEDGE_CONVERSION_FILE_TYPES:
return False
ext = file_extension.lstrip(".").lower()
types = [
t.strip().lower()
for t in self.KNOWLEDGE_CONVERSION_FILE_TYPES.split(",")
if t.strip()
]
return ext in types

def get_rag_runtime_mode(self, operation: str) -> str:
"""Resolve the effective RAG runtime mode for an operation."""
config = self.RAG_RUNTIME_MODE
Expand Down
1 change: 1 addition & 0 deletions backend/app/models/knowledge.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class DocumentIndexStatus(str, PyEnum):

NOT_INDEXED = "not_indexed"
QUEUED = "queued"
CONVERTING = "converting"
INDEXING = "indexing"
SUCCESS = "success"
FAILED = "failed"
Expand Down
1 change: 1 addition & 0 deletions backend/app/schemas/knowledge.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class DocumentIndexStatus(str, Enum):

NOT_INDEXED = "not_indexed"
QUEUED = "queued"
CONVERTING = "converting"
INDEXING = "indexing"
SUCCESS = "success"
FAILED = "failed"
Expand Down
130 changes: 130 additions & 0 deletions backend/app/services/knowledge/index_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class IndexExecutionDecision:

ACTIVE_INDEX_STATUSES = {
DocumentIndexStatus.QUEUED,
DocumentIndexStatus.CONVERTING,
DocumentIndexStatus.INDEXING,
}

Expand Down Expand Up @@ -74,6 +75,12 @@ def _get_active_index_stale_reason(
):
return "stale_indexing"

if (
document.index_status == DocumentIndexStatus.CONVERTING
and age_seconds >= settings.KNOWLEDGE_INDEX_STALE_CONVERTING_SECONDS
):
return "stale_converting"

return None


Expand Down Expand Up @@ -420,6 +427,129 @@ def mark_document_index_succeeded(
return updated > 0


@trace_sync(
span_name="knowledge.mark_document_conversion_started",
tracer_name="knowledge.state_machine",
extract_attributes=lambda db, document_id, generation: {
"knowledge.document_id": document_id,
"knowledge.index_generation": generation,
},
)
def mark_document_conversion_started(
db: Session,
document_id: int,
generation: int,
) -> IndexExecutionDecision:
"""Transition QUEUED -> CONVERTING when conversion worker picks up the task."""
document = (
db.query(KnowledgeDocument)
.filter(KnowledgeDocument.id == document_id)
.with_for_update()
.first()
)
if document is None:
db.rollback()
_record_transition(
"knowledge.conversion.start.skipped",
document_id=document_id,
generation=generation,
reason="document_not_found",
)
return IndexExecutionDecision(
should_execute=False,
reason="document_not_found",
)

if document.index_generation != generation:
db.rollback()
_record_transition(
"knowledge.conversion.start.skipped",
document_id=document_id,
generation=generation,
reason="stale_generation",
previous_status=document.index_status,
)
return IndexExecutionDecision(
should_execute=False,
reason="stale_generation",
)

current_status = document.index_status or DocumentIndexStatus.NOT_INDEXED
if current_status != DocumentIndexStatus.QUEUED:
db.rollback()
_record_transition(
"knowledge.conversion.start.skipped",
document_id=document_id,
generation=generation,
reason=f"unexpected_status_{current_status.value}",
previous_status=current_status,
)
return IndexExecutionDecision(
should_execute=False,
reason=f"unexpected_status_{current_status.value}",
)

document.index_status = DocumentIndexStatus.CONVERTING
document.updated_at = _utcnow()
db.commit()

_record_transition(
"knowledge.conversion.start.accepted",
document_id=document_id,
generation=generation,
reason="conversion_started",
previous_status=current_status,
)
return IndexExecutionDecision(
should_execute=True,
reason="conversion_started",
)


@trace_sync(
span_name="knowledge.mark_document_conversion_succeeded",
tracer_name="knowledge.state_machine",
extract_attributes=lambda db, document_id, generation: {
"knowledge.document_id": document_id,
"knowledge.index_generation": generation,
},
)
def mark_document_conversion_succeeded(
db: Session,
document_id: int,
generation: int,
) -> bool:
"""Transition CONVERTING -> QUEUED after successful conversion.

The document returns to QUEUED so index_document_task can proceed
with the normal QUEUED -> INDEXING transition.
"""
updated = (
db.query(KnowledgeDocument)
.filter(
KnowledgeDocument.id == document_id,
KnowledgeDocument.index_generation == generation,
KnowledgeDocument.index_status == DocumentIndexStatus.CONVERTING,
)
.update(
{
KnowledgeDocument.index_status: DocumentIndexStatus.QUEUED,
KnowledgeDocument.updated_at: _utcnow(),
},
synchronize_session=False,
)
)
db.commit()

_record_transition(
"knowledge.conversion.finalize.success",
document_id=document_id,
generation=generation,
reason="converted" if updated > 0 else "stale_or_already_finalized",
)
return updated > 0


@trace_sync(
span_name="knowledge.mark_document_index_failed",
tracer_name="knowledge.state_machine",
Expand Down
Loading