Skip to content

Add Record/Replay functionality for offline processing (Issue #2759) #2760

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions src/crewai/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,22 @@ def install(context):


@crewai.command()
def run():
@click.option(
"--record",
is_flag=True,
help="Record LLM responses for later replay",
)
@click.option(
"--replay",
is_flag=True,
help="Replay from recorded LLM responses without making network calls",
)
def run(record: bool = False, replay: bool = False):
"""Run the Crew."""
run_crew()
if record and replay:
raise click.UsageError("Cannot use --record and --replay simultaneously")
click.echo("Running the Crew")
run_crew(record=record, replay=replay)


@crewai.command()
Expand Down
17 changes: 14 additions & 3 deletions src/crewai/cli/run_crew.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@ class CrewType(Enum):
FLOW = "flow"


def run_crew() -> None:
def run_crew(record: bool = False, replay: bool = False) -> None:
"""
Run the crew or flow by running a command in the UV environment.

Starting from version 0.103.0, this command can be used to run both
standard crews and flows. For flows, it detects the type from pyproject.toml
and automatically runs the appropriate command.

Args:
record (bool, optional): Whether to record LLM responses. Defaults to False.
replay (bool, optional): Whether to replay from recorded LLM responses. Defaults to False.
"""
crewai_version = get_crewai_version()
min_required_version = "0.71.0"
Expand All @@ -44,17 +48,24 @@ def run_crew() -> None:
click.echo(f"Running the {'Flow' if is_flow else 'Crew'}")

# Execute the appropriate command
execute_command(crew_type)
execute_command(crew_type, record, replay)


def execute_command(crew_type: CrewType) -> None:
def execute_command(crew_type: CrewType, record: bool = False, replay: bool = False) -> None:
"""
Execute the appropriate command based on crew type.

Args:
crew_type: The type of crew to run
record: Whether to record LLM responses
replay: Whether to replay from recorded LLM responses
"""
command = ["uv", "run", "kickoff" if crew_type == CrewType.FLOW else "run_crew"]

if record:
command.append("--record")
if replay:
command.append("--replay")

try:
subprocess.run(command, capture_output=False, text=True, check=True)
Expand Down
31 changes: 31 additions & 0 deletions src/crewai/crew.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,15 @@ class Crew(FlowTrackable, BaseModel):
default_factory=SecurityConfig,
description="Security configuration for the crew, including fingerprinting.",
)
record_mode: bool = Field(
default=False,
description="Whether to record LLM responses for later replay.",
)
replay_mode: bool = Field(
default=False,
description="Whether to replay from recorded LLM responses without making network calls.",
)
_llm_response_cache_handler: Optional[Any] = PrivateAttr(default=None)

@field_validator("id", mode="before")
@classmethod
Expand Down Expand Up @@ -633,6 +642,19 @@ def kickoff(
self._task_output_handler.reset()
self._logging_color = "bold_purple"

if self.record_mode and self.replay_mode:
raise ValueError("Cannot use both record_mode and replay_mode at the same time")

if self.record_mode or self.replay_mode:
from crewai.utilities.llm_response_cache_handler import (
LLMResponseCacheHandler,
)
self._llm_response_cache_handler = LLMResponseCacheHandler()
if self.record_mode:
self._llm_response_cache_handler.start_recording()
elif self.replay_mode:
self._llm_response_cache_handler.start_replaying()

if inputs is not None:
self._inputs = inputs
self._interpolate_inputs(inputs)
Expand All @@ -651,6 +673,12 @@ def kickoff(

if not agent.step_callback: # type: ignore # "BaseAgent" has no attribute "step_callback"
agent.step_callback = self.step_callback # type: ignore # "BaseAgent" has no attribute "step_callback"

if self._llm_response_cache_handler:
if hasattr(agent, "llm") and agent.llm:
agent.llm.set_response_cache_handler(self._llm_response_cache_handler)
if hasattr(agent, "function_calling_llm") and agent.function_calling_llm:
agent.function_calling_llm.set_response_cache_handler(self._llm_response_cache_handler)

agent.create_agent_executor()

Expand Down Expand Up @@ -1287,6 +1315,9 @@ def _interpolate_inputs(self, inputs: Dict[str, Any]) -> None:
def _finish_execution(self, final_string_output: str) -> None:
if self.max_rpm:
self._rpm_controller.stop_rpm_counter()

if self._llm_response_cache_handler:
self._llm_response_cache_handler.stop()

def calculate_usage_metrics(self) -> UsageMetrics:
"""Calculates and returns the usage metrics."""
Expand Down
44 changes: 39 additions & 5 deletions src/crewai/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ def __init__(
self.additional_params = kwargs
self.is_anthropic = self._is_anthropic_model(model)
self.stream = stream
self._response_cache_handler = None

litellm.drop_params = True

Expand Down Expand Up @@ -869,25 +870,43 @@ def call(
for message in messages:
if message.get("role") == "system":
message["role"] = "assistant"

if self._response_cache_handler and self._response_cache_handler.is_replaying():
cached_response = self._response_cache_handler.get_cached_response(
self.model, messages
)
if cached_response:
# Emit completion event for the cached response
self._handle_emit_call_events(cached_response, LLMCallType.LLM_CALL)
return cached_response

# --- 5) Set up callbacks if provided
# --- 6) Set up callbacks if provided
with suppress_warnings():
if callbacks and len(callbacks) > 0:
self.set_callbacks(callbacks)

try:
# --- 6) Prepare parameters for the completion call
# --- 7) Prepare parameters for the completion call
params = self._prepare_completion_params(messages, tools)

# --- 7) Make the completion call and handle response
# --- 8) Make the completion call and handle response
if self.stream:
return self._handle_streaming_response(
response = self._handle_streaming_response(
params, callbacks, available_functions
)
else:
return self._handle_non_streaming_response(
response = self._handle_non_streaming_response(
params, callbacks, available_functions
)

if (self._response_cache_handler and
self._response_cache_handler.is_recording() and
isinstance(response, str)):
self._response_cache_handler.cache_response(
self.model, messages, response
)

return response

except LLMContextLengthExceededException:
# Re-raise LLMContextLengthExceededException as it should be handled
Expand Down Expand Up @@ -1107,3 +1126,18 @@ def set_env_callbacks(self):

litellm.success_callback = success_callbacks
litellm.failure_callback = failure_callbacks

def set_response_cache_handler(self, handler):
"""
Sets the response cache handler for record/replay functionality.

Args:
handler: An instance of LLMResponseCacheHandler.
"""
self._response_cache_handler = handler

def clear_response_cache_handler(self):
"""
Clears the response cache handler.
"""
self._response_cache_handler = None
Loading
Loading