Skip to content

feat(queuefs): Add SQLite backend with ack/recover and unified control-file/config specs#1500

Open
sponge225 wants to merge 1 commit intovolcengine:mainfrom
sponge225:feat/queuefs-sqlite
Open

feat(queuefs): Add SQLite backend with ack/recover and unified control-file/config specs#1500
sponge225 wants to merge 1 commit intovolcengine:mainfrom
sponge225:feat/queuefs-sqlite

Conversation

@sponge225
Copy link
Copy Markdown
Collaborator

Description

旧版 QueueFS 依赖 Go/SQLite 的 queue.db 做“队列落盘+重启恢复”,而更新后这条链路在 Rust 版 queuefs 里缺失了 SQLite 后端/恢复语义,所以新版重启后任务队列不再天然可恢复。

本 PR 为 queuefs 引入 SQLite 持久化后端,用于让队列任务在进程重启后仍可继续消费;并实现与旧版行为一致的 dequeue -> ack 两阶段确认(at-least-once)语义:dequeue 只把消息标记为 processing 并返回,只有 ack 才真正删除。与此同时,补齐 queuefs 的 mount 配置项,并修复一个 server 启动日志示例 URL 的小问题。

Related Issue

N/A

Type of Change

  • Bug fix (non-breaking change that fixes an issue)
  • New feature (non-breaking change that adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation update
  • Refactoring (no functional changes)
  • Performance improvement
  • Test update

Changes Made

  • 新增 SQLite 队列后端(SQLiteQueueBackend
    • 新增/兼容表结构:queue_metadata(记录队列名)与 queue_messages(记录消息、状态、时间等)
    • 状态字段:statuspending / processing)与 processing_started_at
    • dequeue 行为:从 pending 选取一条消息后更新为 processing 并返回(保证“取出即锁定”,防止被重复取出)
    • ack 行为:按 queue_name + message_id + status='processing' 删除;返回值表示是否真的删到
    • 启动恢复:recover_stale_sec=0 时恢复全部 processingpendingrecover_stale_sec>0 时仅恢复超过阈值的 processing
    • SQLite 运行参数:设置 busy_timeout_ms,启用 WAL 相关 pragma(提高并发读写表现)
  • QueueFS 插件配置项(mount config)
    • backend: memory | sqlite | sqlite3
    • db_path: sqlite DB 文件路径(当 backend=sqlite/sqlite3 必填)
    • recover_stale_sec: 启动时恢复 processing 的阈值(秒,0 表示全部恢复)
    • busy_timeout_ms: sqlite busy timeout(毫秒)
  • 对外接口与兼容点
    • 控制文件:enqueue(写) / dequeue(读) / peek(读) / size(读) / clear(写) / ack(写 msg_id)
    • 空队列行为:dequeue/peek 在无消息时返回 {}(用于兼容现有客户端判空逻辑)
    • 文档同步:readme() 补齐 ack 的用法与控制文件列表
  • 杂项修复
    • 修复 server 启动日志 curl 示例 URL 的双斜杠:http://{}/api/v1/mount

Testing

  • I have added tests that prove my fix is effective or that my feature works
  • New and existing unit tests pass locally with my changes
  • I have tested this on the following platforms:
    • Linux
    • macOS
    • Windows

本地验证命令与结果:

  • cargo test -p ragfs queuefs(23/23 passed)

Checklist

  • My code follows the project's coding style
  • I have performed a self-review of my code
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • My changes generate no new warnings
  • Any dependent changes have been merged and published

Screenshots (if applicable)

N/A

Additional Notes

@CLAassistant
Copy link
Copy Markdown

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

@github-actions
Copy link
Copy Markdown

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
🏅 Score: 78
🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 Multiple PR themes

Sub-PR theme: Add SQLite backend with ack/recover semantics

Relevant files:

  • crates/ragfs/src/plugins/queuefs/backend.rs
  • crates/ragfs/src/plugins/queuefs/mod.rs

Sub-PR theme: Add unified control-file/config specs

Relevant files:

  • crates/ragfs/src/plugins/queuefs/mod.rs

Sub-PR theme: Fix server startup log example URL

Relevant files:

  • crates/ragfs/src/server/main.rs

⚡ Recommended focus areas for review

Binary Data Corruption

StoredMessage converts message data to String using from_utf8_lossy, which corrupts arbitrary binary data. This breaks queue functionality for non-UTF-8 payloads.

        data: String::from_utf8_lossy(&msg.data).to_string(),
        // Prefer unix seconds for compatibility with older queue.db producers.
        timestamp: Some(serde_json::Value::Number(unix_secs(msg.timestamp).into())),
    }
}

fn into_message(self) -> Message {
    Message {
        id: self.id,
        data: self.data.into_bytes(),
Stuck Messages on Deserialization Failure

Dequeue commits the transaction marking a message as processing before deserializing the message data. If deserialization fails, the message remains stuck in processing state.

fn dequeue(&mut self, queue_name: &str) -> Result<Option<Message>> {
    let mut conn = self
        .conn
        .lock()
        .map_err(|e| Error::internal(format!("sqlite mutex poisoned: {}", e)))?;

    let tx = conn
        .transaction()
        .map_err(|e| Error::internal(format!("sqlite transaction begin error: {}", e)))?;

    let row = tx.query_row(
        "SELECT id, data FROM queue_messages
         WHERE queue_name = ?1 AND status = 'pending'
         ORDER BY id LIMIT 1",
        params![queue_name],
        |row| Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?)),
    );

    let (id, raw_data) = match row {
        Ok(row) => row,
        Err(rusqlite::Error::QueryReturnedNoRows) => return Ok(None),
        Err(e) => return Err(Error::internal(format!("sqlite dequeue query error: {}", e))),
    };

    tx.execute(
        "UPDATE queue_messages
         SET status = 'processing', processing_started_at = ?1
         WHERE id = ?2",
        params![Utc::now().timestamp(), id],
    )
    .map_err(|e| Error::internal(format!("sqlite mark processing error: {}", e)))?;

    tx.commit()
        .map_err(|e| Error::internal(format!("sqlite transaction commit error: {}", e)))?;

    let stored: StoredMessage = serde_json::from_str(&raw_data)?;
    Ok(Some(stored.into_message()))
Redundant Data Storage

StoredMessage serializes id, data, and timestamp into JSON, which is then stored in the queue_messages table alongside separate columns for message_id and timestamp. This redundancy wastes space and could lead to inconsistencies.

let stored = serde_json::to_string(&StoredMessage::from_message(&msg))?;
conn.execute(
    "INSERT INTO queue_messages (queue_name, message_id, data, timestamp, status)
     VALUES (?1, ?2, ?3, ?4, 'pending')",
    params![queue_name, msg.id, stored, unix_secs(msg.timestamp)],
)
.map_err(|e| Error::internal(format!("sqlite enqueue error: {}", e)))?;

@github-actions
Copy link
Copy Markdown

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Preserve arbitrary message bytes

The current implementation corrupts non-UTF8 message data by using
String::from_utf8_lossy. Replace StoredMessage.data with a Vec to preserve arbitrary
bytes, which serde will serialize as a JSON array (correct, though slightly
verbose). This ensures message data integrity for all byte sequences.

crates/ragfs/src/plugins/queuefs/backend.rs [38-63]

 #[derive(Debug, Clone, Serialize, Deserialize)]
 struct StoredMessage {
     id: String,
-    data: String,
+    data: Vec<u8>,
     #[serde(default)]
     timestamp: Option<serde_json::Value>,
 }
 
 impl StoredMessage {
     fn from_message(msg: &Message) -> Self {
         Self {
             id: msg.id.clone(),
-            data: String::from_utf8_lossy(&msg.data).to_string(),
+            data: msg.data.clone(),
             // Prefer unix seconds for compatibility with older queue.db producers.
             timestamp: Some(serde_json::Value::Number(unix_secs(msg.timestamp).into())),
         }
     }
 
     fn into_message(self) -> Message {
         Message {
             id: self.id,
-            data: self.data.into_bytes(),
+            data: self.data,
             timestamp: parse_stored_timestamp(self.timestamp),
         }
     }
 }
Suggestion importance[1-10]: 8

__

Why: This fixes a critical issue where non-UTF8 message data would be corrupted by String::from_utf8_lossy. Using Vec<u8> preserves arbitrary bytes, ensuring message integrity. The change is accurate and directly addresses a potential data loss bug.

Medium
General
Make migrations less fragile

The migration logic relies on fragile error message string matching to ignore
already-applied changes. For the index statements, use CREATE INDEX IF NOT EXISTS
(already present) which natively avoids errors, so you don't need to catch those.
For ALTER TABLE, consider checking pragma table_info(queue_messages) first to avoid
relying on error strings.

crates/ragfs/src/plugins/queuefs/backend.rs [326-350]

 fn run_migrations(&self) -> Result<()> {
     let conn = self
         .conn
         .lock()
         .map_err(|e| Error::internal(format!("sqlite mutex poisoned: {}", e)))?;
 
-    for stmt in [
-        "ALTER TABLE queue_messages ADD COLUMN status TEXT NOT NULL DEFAULT 'pending'",
-        "ALTER TABLE queue_messages ADD COLUMN processing_started_at INTEGER",
-        "CREATE INDEX IF NOT EXISTS idx_queue_status ON queue_messages(queue_name, status, id)",
-        "CREATE INDEX IF NOT EXISTS idx_queue_message_id ON queue_messages(queue_name, message_id)",
-    ] {
-        if let Err(err) = conn.execute(stmt, []) {
-            let text = err.to_string();
-            if !text.contains("duplicate column name") && !text.contains("already exists") {
-                return Err(Error::internal(format!(
-                    "sqlite migration failed for '{}': {}",
-                    stmt, text
-                )));
-            }
-        }
+    // Check and add status column if missing
+    let has_status: bool = conn.query_row(
+        "SELECT 1 FROM pragma_table_info('queue_messages') WHERE name = 'status'",
+        [],
+        |_| Ok(true),
+    ).unwrap_or(false);
+    if !has_status {
+        conn.execute(
+            "ALTER TABLE queue_messages ADD COLUMN status TEXT NOT NULL DEFAULT 'pending'",
+            [],
+        )
+        .map_err(|e| Error::internal(format!("sqlite migration add status column error: {}", e)))?;
     }
+
+    // Check and add processing_started_at column if missing
+    let has_processing_started_at: bool = conn.query_row(
+        "SELECT 1 FROM pragma_table_info('queue_messages') WHERE name = 'processing_started_at'",
+        [],
+        |_| Ok(true),
+    ).unwrap_or(false);
+    if !has_processing_started_at {
+        conn.execute(
+            "ALTER TABLE queue_messages ADD COLUMN processing_started_at INTEGER",
+            [],
+        )
+        .map_err(|e| Error::internal(format!("sqlite migration add processing_started_at column error: {}", e)))?;
+    }
+
+    // Create indexes (uses IF NOT EXISTS, safe to run unconditionally)
+    conn.execute_batch(
+        r#"
+        CREATE INDEX IF NOT EXISTS idx_queue_status ON queue_messages(queue_name, status, id);
+        CREATE INDEX IF NOT EXISTS idx_queue_message_id ON queue_messages(queue_name, message_id);
+        "#,
+    )
+    .map_err(|e| Error::internal(format!("sqlite migration create indexes error: {}", e)))?;
 
     Ok(())
 }
Suggestion importance[1-10]: 5

__

Why: This improves migration robustness by avoiding fragile error message string matching. While the current code works, the suggestion prevents potential issues with SQLite version changes. It's a quality improvement but not critical for functionality.

Low

@qin-ctx qin-ctx self-assigned this Apr 16, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: Backlog

Development

Successfully merging this pull request may close these issues.

3 participants