Skip to content

Add Record/Replay functionality for offline processing (Issue #2759) #2760

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

devin-ai-integration[bot]
Copy link
Contributor

Record/Replay Functionality for Offline Processing

Description

This PR implements the Record/Replay functionality requested in issue #2759. This feature allows users to:

  1. Record a CrewAI run with all LLM responses using crewai run --record
  2. Replay the run later without making any network calls using crewai run --replay

Benefits

  • Faster iteration during development
  • Ability to work offline without network connectivity
  • Predictable results for testing and debugging
  • Lower costs by not using tokens during development iterations

Implementation Details

  • Added a SQLite-based storage for caching LLM responses
  • Modified the LLM class to intercept and cache responses during recording
  • Added CLI flags for record and replay modes
  • Added tests to verify the functionality

Testing

  • Added unit tests for the LLM response cache
  • Added integration tests for the record/replay functionality
  • Manually tested the CLI commands

Usage Examples

# Record a run
crew = Crew(agents=[agent], tasks=[task])
crew.record_mode = True
crew.kickoff()

# Replay a run
crew = Crew(agents=[agent], tasks=[task])
crew.replay_mode = True
crew.kickoff()

Or via CLI:

# Record a run
crewai run --record

# Replay a run
crewai run --replay

Fixes #2759

Link to Devin run: https://app.devin.ai/sessions/9f63ec91d12b40f0af538c9cb054bf68
Requested by: Joe Moura ([email protected])

Copy link
Contributor Author

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@joaomdmoura
Copy link
Collaborator

Disclaimer: This review was made by a crew of AI Agents.

Code Review Comment: Record/Replay Functionality Addition

Overview

The recent pull request introduces essential capabilities for offline processing through the enhancement of Record/Replay functionality for LLM responses. This addition significantly optimizes the testing and development workflows by allowing for efficient caching and replaying of LLM responses.

Key Findings

CLI Implementation (cli.py)

  • Positive: The implementation of new flags for record and replay modes is clean and well-structured.
  • Improvement Suggestion: To enhance user experience, validate mutually exclusive flags at the command flow’s entry point to prevent simultaneous usage:
    @crewai.command()
    @click.option("--record", is_flag=True, help="Record LLM responses for later replay")
    @click.option("--replay", is_flag=True, help="Replay from recorded LLM responses")
    def run(record: bool = False, replay: bool = False):
        if record and replay:
            raise click.UsageError("Cannot use --record and --replay simultaneously")
        run_crew(record=record, replay=replay)

Cache Storage Implementation (llm_response_cache_storage.py)

  • Positive: Adequate handling of SQLite connections and secure hashing for requests is implemented, along with proper error handling.
  • Improvement Suggestions:
    1. Connection Pooling: Enhance performance by implementing connection pooling:
    class LLMResponseCacheStorage:
        def __init__(self, db_path: str):
            self.db_path = db_path
            self._connection_pool = {}
            
        def _get_connection(self):
            thread_id = threading.get_ident()
            if thread_id not in self._connection_pool:
                self._connection_pool[thread_id] = sqlite3.connect(self.db_path)
            return self._connection_pool[thread_id]
    1. Cache Expiration: Integrate a mechanism to remove expired responses to maintain cache relevance:
    def cleanup_expired_cache(self, max_age_days: int = 7):
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                DELETE FROM llm_response_cache
                WHERE timestamp < datetime('now', '-? days')
                """,
                (max_age_days,)
            )
            conn.commit()

LLM Integration (llm.py)

  • Issues Identified:
    1. Potential memory leak concerns if the cache handler is referenced improperly.
    2. Inadequate error handling during cache operations which may lead to silent failures.
  • Suggested Improvements: Enhance cache handling logic with robust error management:
    def call(self, messages: List[Dict[str, Any]], ...) -> str:
        try:
            if self._response_cache_handler and self._response_cache_handler.is_replaying():
                cached_response = self._response_cache_handler.get_cached_response(self.model, messages)
                if cached_response:
                    return cached_response
            response = self._make_llm_call(messages, tools)
            if self._response_cache_handler and self._response_cache_handler.is_recording():
                self._response_cache_handler.cache_response(self.model, messages, response)
            return response
        except Exception as e:
            logger.error(f"LLM call failed: {e}")
            raise

Testing Coverage

  • Positive: The test cases cover both recording and replay functionalities comprehensively.
  • Improvement Suggestion: Expand test cases to cover edge scenarios including error handling and concurrent cache access:
    def test_llm_cache_errors():
        handler = LLMResponseCacheHandler()
        handler.storage.add.side_effect = sqlite3.Error("Mock DB error")
        with pytest.raises(sqlite3.Error):
            handler.cache_response("model", [], "response")
        
    def test_concurrent_cache_access():
        handler = LLMResponseCacheHandler()
        def concurrent_access():
            for _ in range(100):
                handler.cache_response("model", [], "response")
        threads = [Thread(target=concurrent_access) for _ in range(10)]
        [t.start() for t in threads]
        [t.join() for t in threads]

Historical Context and Lessons Learned

  1. Security: SQL injection protection through parameterized queries is properly implemented. Input validation for user inputs is still necessary.
  2. Performance: Introducing cache size limits and eviction policies can prevent overhead from excessive cache size.
  3. Documentation: Comprehensive documentation additions are needed to explain the cache file management and usage examples clearly.

Conclusion

The implementation of the Record/Replay functionality is well-structured with a solid foundation. Addressing the identified suggestions around error handling, cache management, and testing can greatly enhance the robustness and effectiveness of this feature, making it more suitable for production use.

Action Items

  1. Implement cache expiration and pooling.
  2. Enhance error handling across the caching mechanism.
  3. Document cache management procedures and provide usage examples.
  4. Conduct tests for concurrent access scenarios.

Overall, the new feature is a valuable addition to improve the development workflow for LLM applications. With these improvements, the feature will be more resilient and efficient.

@mikhail
Copy link

mikhail commented May 7, 2025

Thanks @joaomdmoura for such a speedy PR!

From a high level review this accomplishes what the feature request #2759 described. One thing I find non-obvious is the on-disk storage. Does sqlite need to be specified and configured? What will happen if --record is specified by sqlite is not configured?

Additional thoughts:

  • What's the behavior when --replay is specified and it's mostly found but one specific request has a cache-miss?
  • Can this be extended to be used in tests?
  • Can this be leveraged inside python code? Meaning 1 agent has neverCache=True or something?

As I'm writing this I'm wondering if "replay" should be more complex than a boolean. For example I want unique scenarios:

  1. Replay = required: when I'm running tests I want to replay or fail if the recording is not found.
  2. Replay = opportunistic: this is basically cache. If identical task has been performed and result recorded then use it, otherwise do your LLM thing.
  3. Replay = disabled: ignore previous recordings

Additionally the word choice "replay" will be conflicting with "replay" feature to do a specific task.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEATURE] Record / Replay functionality for offline processing
2 participants