Skip to content

fix(queue): preserve embedding message ids across serialization#1380

Merged
qin-ctx merged 1 commit intovolcengine:mainfrom
officialasishkumar:fix/embedding-msg-id-serialization
Apr 13, 2026
Merged

fix(queue): preserve embedding message ids across serialization#1380
qin-ctx merged 1 commit intovolcengine:mainfrom
officialasishkumar:fix/embedding-msg-id-serialization

Conversation

@officialasishkumar
Copy link
Copy Markdown
Contributor

Description

Preserve EmbeddingMsg.id when embedding queue messages are serialized and later deserialized by the worker. Direct embedding waits register this id with RequestWaitTracker; losing it can leave wait=True requests pending until timeout even after the embedding job succeeds.

Related Issue

Fixes #1379

Type of Change

  • Bug fix (non-breaking change that fixes an issue)
  • New feature (non-breaking change that adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation update
  • Refactoring (no functional changes)
  • Performance improvement
  • Test update

Changes Made

  • Declared EmbeddingMsg.id as a dataclass field so asdict() includes it in queued payloads.
  • Kept the existing constructor signature unchanged.
  • Added a regression test covering queue-style round trip deserialization and request wait completion.

Testing

  • I have added tests that prove my fix is effective or that my feature works
  • New and existing unit tests pass locally with my changes
  • I have tested this on the following platforms:
    • Linux
    • macOS
    • Windows

Commands run:

ruff format --check openviking/storage/queuefs/embedding_msg.py tests/storage/test_embedding_msg.py
ruff check openviking/storage/queuefs/embedding_msg.py tests/storage/test_embedding_msg.py
git diff --check
PYTHONPATH=. /tmp/openviking-test-venv/bin/pytest -o addopts='' --confcutdir=tests/storage tests/storage/test_embedding_msg.py -q
PYTHONPATH=. /tmp/openviking-test-venv/bin/pytest -o addopts='' --confcutdir=tests/telemetry tests/telemetry/test_request_wait_tracker.py -q

Checklist

  • My code follows the project's coding style
  • I have performed a self-review of my code
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • My changes generate no new warnings
  • Any dependent changes have been merged and published

Screenshots (if applicable)

N/A

Additional Notes

The temporary validation environment was removed after running the tests.

EmbeddingMsg instances are registered with RequestWaitTracker before they are written to the embedding queue. The queue serializes messages via dataclasses.asdict(), but id was only assigned in __init__ and was not declared as a dataclass field, so the payload lost the registered root id. When the worker deserialized the message it generated a fresh id and marked the wrong root as complete, allowing wait=True callers to hang until timeout.

Declare id as a dataclass field so queue payloads retain the registered id, and add a regression test covering the serialize/dequeue round trip with RequestWaitTracker.

Fixes volcengine#1379
@github-actions
Copy link
Copy Markdown

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

🎫 Ticket compliance analysis ✅

1379 - Fully compliant

Compliant requirements:

  • Preserve EmbeddingMsg.id across serialization/deserialization
  • Add a regression test covering the fix
⏱️ Estimated effort to review: 2 🔵🔵⚪⚪⚪
🏅 Score: 90
🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ No major issues detected

@github-actions
Copy link
Copy Markdown

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix id field initialization

The custom init method will not initialize the new id field, and the dataclass
default_factory won't be used. Modify the init to accept an optional id
parameter and set self.id, falling back to uuid4() if not provided. Also update
from_dict/to_dict (if present) to handle the id field.

openviking/storage/queuefs/embedding_msg.py [9-20]

 @dataclass
 class EmbeddingMsg:
     message: Union[str, List[Dict[str, Any]]]
     context_data: Dict[str, Any]
     id: str = field(default_factory=lambda: str(uuid4()))
     telemetry_id: str = ""
     semantic_msg_id: Optional[str] = None
 
     def __init__(
         self,
         message: Union[str, List[Dict[str, Any]]],
+        context_data: Dict[str, Any],
+        telemetry_id: str = "",
+        semantic_msg_id: Optional[str] = None,
+        id: Optional[str] = None,
+    ):
+        self.message = message
+        self.context_data = context_data
+        self.id = id if id is not None else str(uuid4())
+        self.telemetry_id = telemetry_id
+        self.semantic_msg_id = semantic_msg_id
Suggestion importance[1-10]: 8

__

Why: The custom __init__ method would bypass the dataclass's default_factory for the new id field, leaving it uninitialized. This is a critical issue since the added test relies on msg.id being present and preserved in roundtrips.

Medium

@qin-ctx qin-ctx merged commit 3b4dfd3 into volcengine:main Apr 13, 2026
2 checks passed
@github-project-automation github-project-automation bot moved this from Backlog to Done in OpenViking project Apr 13, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

[Bug]: Embedding queue messages drop their request wait id during serialization

2 participants