You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution. You have signed the CLA already but the status is still pending? Let us recheck it.
StoredMessage converts message data to String using from_utf8_lossy, which corrupts arbitrary binary data. This breaks queue functionality for non-UTF-8 payloads.
data:String::from_utf8_lossy(&msg.data).to_string(),// Prefer unix seconds for compatibility with older queue.db producers.timestamp:Some(serde_json::Value::Number(unix_secs(msg.timestamp).into())),}}fn into_message(self) -> Message{Message{id:self.id,data:self.data.into_bytes(),
Dequeue commits the transaction marking a message as processing before deserializing the message data. If deserialization fails, the message remains stuck in processing state.
fn dequeue(&mutself,queue_name:&str) -> Result<Option<Message>>{letmut conn = self.conn.lock().map_err(|e| Error::internal(format!("sqlite mutex poisoned: {}", e)))?;let tx = conn
.transaction().map_err(|e| Error::internal(format!("sqlite transaction begin error: {}", e)))?;let row = tx.query_row("SELECT id, data FROM queue_messages WHERE queue_name = ?1 AND status = 'pending' ORDER BY id LIMIT 1",params![queue_name],
|row| Ok((row.get::<_,i64>(0)?, row.get::<_,String>(1)?)),);let(id, raw_data) = match row {Ok(row) => row,Err(rusqlite::Error::QueryReturnedNoRows) => returnOk(None),Err(e) => returnErr(Error::internal(format!("sqlite dequeue query error: {}", e))),};
tx.execute("UPDATE queue_messages SET status = 'processing', processing_started_at = ?1 WHERE id = ?2",params![Utc::now().timestamp(), id],).map_err(|e| Error::internal(format!("sqlite mark processing error: {}", e)))?;
tx.commit().map_err(|e| Error::internal(format!("sqlite transaction commit error: {}", e)))?;let stored:StoredMessage = serde_json::from_str(&raw_data)?;Ok(Some(stored.into_message()))
StoredMessage serializes id, data, and timestamp into JSON, which is then stored in the queue_messages table alongside separate columns for message_id and timestamp. This redundancy wastes space and could lead to inconsistencies.
The current implementation corrupts non-UTF8 message data by using String::from_utf8_lossy. Replace StoredMessage.data with a Vec to preserve arbitrary bytes, which serde will serialize as a JSON array (correct, though slightly verbose). This ensures message data integrity for all byte sequences.
Why: This fixes a critical issue where non-UTF8 message data would be corrupted by String::from_utf8_lossy. Using Vec<u8> preserves arbitrary bytes, ensuring message integrity. The change is accurate and directly addresses a potential data loss bug.
Medium
General
Make migrations less fragile
The migration logic relies on fragile error message string matching to ignore already-applied changes. For the index statements, use CREATE INDEX IF NOT EXISTS (already present) which natively avoids errors, so you don't need to catch those. For ALTER TABLE, consider checking pragma table_info(queue_messages) first to avoid relying on error strings.
fn run_migrations(&self) -> Result<()> {
let conn = self
.conn
.lock()
.map_err(|e| Error::internal(format!("sqlite mutex poisoned: {}", e)))?;
- for stmt in [- "ALTER TABLE queue_messages ADD COLUMN status TEXT NOT NULL DEFAULT 'pending'",- "ALTER TABLE queue_messages ADD COLUMN processing_started_at INTEGER",- "CREATE INDEX IF NOT EXISTS idx_queue_status ON queue_messages(queue_name, status, id)",- "CREATE INDEX IF NOT EXISTS idx_queue_message_id ON queue_messages(queue_name, message_id)",- ] {- if let Err(err) = conn.execute(stmt, []) {- let text = err.to_string();- if !text.contains("duplicate column name") && !text.contains("already exists") {- return Err(Error::internal(format!(- "sqlite migration failed for '{}': {}",- stmt, text- )));- }- }+ // Check and add status column if missing+ let has_status: bool = conn.query_row(+ "SELECT 1 FROM pragma_table_info('queue_messages') WHERE name = 'status'",+ [],+ |_| Ok(true),+ ).unwrap_or(false);+ if !has_status {+ conn.execute(+ "ALTER TABLE queue_messages ADD COLUMN status TEXT NOT NULL DEFAULT 'pending'",+ [],+ )+ .map_err(|e| Error::internal(format!("sqlite migration add status column error: {}", e)))?;
}
++ // Check and add processing_started_at column if missing+ let has_processing_started_at: bool = conn.query_row(+ "SELECT 1 FROM pragma_table_info('queue_messages') WHERE name = 'processing_started_at'",+ [],+ |_| Ok(true),+ ).unwrap_or(false);+ if !has_processing_started_at {+ conn.execute(+ "ALTER TABLE queue_messages ADD COLUMN processing_started_at INTEGER",+ [],+ )+ .map_err(|e| Error::internal(format!("sqlite migration add processing_started_at column error: {}", e)))?;+ }++ // Create indexes (uses IF NOT EXISTS, safe to run unconditionally)+ conn.execute_batch(+ r#"+ CREATE INDEX IF NOT EXISTS idx_queue_status ON queue_messages(queue_name, status, id);+ CREATE INDEX IF NOT EXISTS idx_queue_message_id ON queue_messages(queue_name, message_id);+ "#,+ )+ .map_err(|e| Error::internal(format!("sqlite migration create indexes error: {}", e)))?;
Ok(())
}
Suggestion importance[1-10]: 5
__
Why: This improves migration robustness by avoiding fragile error message string matching. While the current code works, the suggestion prevents potential issues with SQLite version changes. It's a quality improvement but not critical for functionality.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
旧版 QueueFS 依赖 Go/SQLite 的 queue.db 做“队列落盘+重启恢复”,而更新后这条链路在 Rust 版 queuefs 里缺失了 SQLite 后端/恢复语义,所以新版重启后任务队列不再天然可恢复。
本 PR 为
queuefs引入 SQLite 持久化后端,用于让队列任务在进程重启后仍可继续消费;并实现与旧版行为一致的dequeue -> ack两阶段确认(at-least-once)语义:dequeue只把消息标记为 processing 并返回,只有ack才真正删除。与此同时,补齐 queuefs 的 mount 配置项,并修复一个 server 启动日志示例 URL 的小问题。Related Issue
N/A
Type of Change
Changes Made
SQLiteQueueBackend)queue_metadata(记录队列名)与queue_messages(记录消息、状态、时间等)status(pending/processing)与processing_started_atpending选取一条消息后更新为processing并返回(保证“取出即锁定”,防止被重复取出)queue_name + message_id + status='processing'删除;返回值表示是否真的删到recover_stale_sec=0时恢复全部processing为pending;recover_stale_sec>0时仅恢复超过阈值的processingbusy_timeout_ms,启用 WAL 相关 pragma(提高并发读写表现)backend:memory | sqlite | sqlite3db_path: sqlite DB 文件路径(当 backend=sqlite/sqlite3 必填)recover_stale_sec: 启动时恢复 processing 的阈值(秒,0 表示全部恢复)busy_timeout_ms: sqlite busy timeout(毫秒)enqueue(写) /dequeue(读) /peek(读) /size(读) /clear(写) /ack(写 msg_id)dequeue/peek在无消息时返回{}(用于兼容现有客户端判空逻辑)readme()补齐ack的用法与控制文件列表http://{}/api/v1/mountTesting
本地验证命令与结果:
cargo test -p ragfs queuefs(23/23 passed)Checklist
Screenshots (if applicable)
N/A
Additional Notes