diff --git a/crates/rally-core/src/lib.rs b/crates/rally-core/src/lib.rs index 7ab89bd..1029d1e 100644 --- a/crates/rally-core/src/lib.rs +++ b/crates/rally-core/src/lib.rs @@ -24,6 +24,7 @@ pub enum SessionPhase { Implement, Review, FinalReview, + Paused, Done, } diff --git a/crates/rally-workflow-build/src/lib.rs b/crates/rally-workflow-build/src/lib.rs index 28d8545..dfe3cb1 100644 --- a/crates/rally-workflow-build/src/lib.rs +++ b/crates/rally-workflow-build/src/lib.rs @@ -63,6 +63,8 @@ pub struct BuildWorkflowState { pub reviews: Vec, #[serde(default)] pub pending_feedback: Vec, + #[serde(default)] + pub pause_after_step: bool, } impl Default for BuildWorkflowState { @@ -75,6 +77,7 @@ impl Default for BuildWorkflowState { checkpoint: None, reviews: Vec::new(), pending_feedback: Vec::new(), + pause_after_step: false, } } } @@ -107,10 +110,15 @@ impl BuildWorkflowState { } } -pub fn initial_workflow_state(implementer_agent: String, steps: Vec) -> Result { +pub fn initial_workflow_state( + implementer_agent: String, + steps: Vec, + pause_after_step: bool, +) -> Result { let state = BuildWorkflowState { implementer_agent, steps, + pause_after_step, ..BuildWorkflowState::default() }; encode_workflow_state(&state) @@ -285,10 +293,35 @@ pub fn get_wait_hint(state: &SessionState, agent: &str) -> Option { None } } + SessionPhase::Paused => { + Some(format!( + "Session paused for user feedback after step {}. \ + Waiting for `rally build continue --session {}`.", + workflow_state.current_step_idx, state.name + )) + } _ => None, } } +pub fn continue_session(state: &mut SessionState, feedback: Option) -> Result { + if state.phase != SessionPhase::Paused { + bail!("session is not paused (current phase: {:?})", state.phase); + } + let mut workflow_state = read_workflow_state(state)?; + if let Some(fb) = feedback { + workflow_state.pending_feedback.push(fb); + } + state.phase = SessionPhase::Implement; + write_workflow_state(state, &workflow_state)?; + let step_label = if let Some(step) = workflow_state.current_step() { + format!("step {}: {}", step.number, step.title) + } else { + "next step".to_string() + }; + Ok(format!("resumed; advancing to {step_label}")) +} + pub fn mark_done(state: &mut SessionState, _agent: &str) -> Result { match state.phase { SessionPhase::Done => Ok("session already complete".to_string()), @@ -580,7 +613,11 @@ fn complete_step( } workflow_state.current_step_idx += 1; - state.phase = SessionPhase::Implement; + state.phase = if workflow_state.pause_after_step { + SessionPhase::Paused + } else { + SessionPhase::Implement + }; Ok(()) } diff --git a/src/cli.rs b/src/cli.rs index c653216..65e4fbc 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -126,6 +126,12 @@ pub struct CreateArgs { pub turn_timeout_secs: u64, #[arg(long, default_value_t = 1200)] pub review_timeout_secs: u64, + #[arg( + long, + default_value_t = false, + help = "Pause for user feedback after each step is approved" + )] + pub pause_after_step: bool, } #[derive(Args, Debug)] @@ -266,6 +272,16 @@ pub enum BuildSubcommand { Checkpoint(CheckpointArgs), #[command(about = "Submit a review verdict")] Review(ReviewArgs), + #[command(about = "Resume a paused session (advance to the next step)")] + Continue(ContinueArgs), +} + +#[derive(Args, Debug)] +pub struct ContinueArgs { + #[arg(long)] + pub session: String, + #[arg(long, help = "Optional feedback to pass to the implementer for the next step")] + pub feedback: Option, } #[derive(Args, Debug)] diff --git a/src/commands.rs b/src/commands.rs index 1570dcf..3f554ef 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -7,9 +7,9 @@ use serde_json::{Value, json}; use crate::{ cli::{ - AgreeArgs, ChainArgs, ChallengeArgs, CheckpointArgs, CreateArgs, DoneArgs, FileIssueArgs, - JoinArgs, NextArgs, ReviewArgs, ReviewVerdictArg, SessionTypeArg, StatusArgs, - WorkflowActionArgs, + AgreeArgs, ChainArgs, ChallengeArgs, CheckpointArgs, ContinueArgs, CreateArgs, DoneArgs, + FileIssueArgs, JoinArgs, NextArgs, ReviewArgs, ReviewVerdictArg, SessionTypeArg, + StatusArgs, WorkflowActionArgs, }, state::{ AgentState, Config, SessionHandle, SessionPhase, SessionState, SessionType, @@ -84,6 +84,7 @@ pub fn create_with_registry(args: &CreateArgs, registry: &WorkflowRegistry) -> R Some(implement::initial_workflow_state( args.implementer.clone(), steps, + args.pause_after_step, )?), ) } @@ -411,6 +412,17 @@ pub fn review_with_registry(args: &ReviewArgs, registry: &WorkflowRegistry) -> R Ok(()) } +pub fn continue_session_with_registry(args: &ContinueArgs) -> Result<()> { + let handle = SessionHandle::open(&args.session)?; + let mut state = handle.load_state()?; + + let msg = implement::continue_session(&mut state, args.feedback.clone())?; + + handle.save_state(&state)?; + println!("{msg}"); + Ok(()) +} + pub fn chain_with_registry(args: &ChainArgs, registry: &WorkflowRegistry) -> Result<()> { let plan_session_dir; loop { @@ -454,6 +466,7 @@ pub fn chain_with_registry(args: &ChainArgs, registry: &WorkflowRegistry) -> Res max_rounds: 4, turn_timeout_secs: 300, review_timeout_secs: 1200, + pause_after_step: false, }; let session_name = create_with_registry(&create_args, registry)?; diff --git a/src/lib.rs b/src/lib.rs index a58e603..1f83b8e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -80,7 +80,12 @@ pub fn run_cli_with_registry(cli: Cli, registry: &WorkflowRegistry) -> Result { commands::checkpoint_with_registry(&inner, registry)? } - BuildSubcommand::Review(inner) => commands::review_with_registry(&inner, registry)?, + BuildSubcommand::Review(inner) => { + commands::review_with_registry(&inner, registry)? + } + BuildSubcommand::Continue(inner) => { + commands::continue_session_with_registry(&inner)? + } } Ok(0) } diff --git a/src/workflow/builtin.rs b/src/workflow/builtin.rs index 4bf41df..bfc4647 100644 --- a/src/workflow/builtin.rs +++ b/src/workflow/builtin.rs @@ -44,7 +44,7 @@ impl RunCommand for ImplementRunCommand { } fn usage(&self) -> &str { - "implement [--reviewers N]" + "implement [--reviewers N] [--pause-after-step]" } fn resolve(&self, args: &[String], workspace: &Path) -> Result { @@ -59,6 +59,7 @@ impl RunCommand for ImplementRunCommand { let role = &args[1]; let mut reviewers = 1u32; + let mut pause_after_step = false; let mut i = 2; while i < args.len() { if args[i] == "--reviewers" { @@ -72,6 +73,11 @@ impl RunCommand for ImplementRunCommand { i += 1; continue; } + if args[i] == "--pause-after-step" { + pause_after_step = true; + i += 1; + continue; + } i += 1; } @@ -109,6 +115,7 @@ impl RunCommand for ImplementRunCommand { max_rounds: 4, turn_timeout_secs: 300, review_timeout_secs: 1200, + pause_after_step, }; Ok(RunCommandResolution::Session { @@ -185,6 +192,7 @@ impl RunCommand for NegotiateRunCommand { max_rounds: 4, turn_timeout_secs: 300, review_timeout_secs: 1200, + pause_after_step: false, }; Ok(RunCommandResolution::Session { @@ -429,6 +437,15 @@ impl Workflow for BuildWorkflow { }); } + if ctx.host.state.phase == SessionPhase::Paused { + return Ok(NextPollDispatch::Complete { + message: format!( + "session paused for user feedback; run `rally build continue --session {}` to advance", + ctx.host.state.name + ), + }); + } + Ok(NextPollDispatch::Wait { hint: implement::get_wait_hint(ctx.host.state, ctx.agent), }) diff --git a/tests/extensibility_mvp.rs b/tests/extensibility_mvp.rs index 456302e..348fcd1 100644 --- a/tests/extensibility_mvp.rs +++ b/tests/extensibility_mvp.rs @@ -12,7 +12,7 @@ use serde_json::{Value, json}; use rally::{ Workflow, - cli::{Cli, CreateArgs, DoneArgs, JoinArgs, NextArgs, SessionTypeArg}, + cli::{Cli, ContinueArgs, CreateArgs, DoneArgs, JoinArgs, NextArgs, SessionTypeArg}, commands, run_cli_with_registry, state::{self, SessionHandle, SessionPhase}, workflow::{ @@ -2009,6 +2009,7 @@ fn workflow_errors_do_not_partially_write_state() -> Result<()> { max_rounds: 4, turn_timeout_secs: 300, review_timeout_secs: 1200, + pause_after_step: false, }, ®istry, )?; @@ -2039,3 +2040,280 @@ fn workflow_errors_do_not_partially_write_state() -> Result<()> { let _ = fs::remove_dir_all(state::session_dir(&session)); Ok(()) } + +fn build_multi_step_todo(name: &str) -> PathBuf { + let path = std::env::temp_dir().join(format!("{name}.md")); + write_file( + &path, + "## Spec\n\npause test\n\n## Plan\n\n\ + 1. First step\nAcceptance criteria:\n- done one\n\n\ + 2. Second step\nAcceptance criteria:\n- done two\n", + ); + path +} + +#[test] +fn pause_after_step_pauses_and_continue_resumes() -> Result<()> { + let registry = builtin_registry()?; + let workspace = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + + let session = unique_name("rally-pause"); + let todo = build_multi_step_todo(&session); + let _ = fs::remove_dir_all(state::session_dir(&session)); + + // Create session with --pause-after-step + run_cli( + &[ + "rally", + "create", + "implement", + "--name", + &session, + "--todo", + todo.to_str().unwrap(), + "--workspace", + workspace.to_str().unwrap(), + "--reviewers", + "1", + "--pause-after-step", + ], + ®istry, + )?; + + // Join both agents + commands::join_with_registry( + &JoinArgs { + session: session.clone(), + agent: "implementer".to_string(), + timeout: None, + }, + ®istry, + )?; + commands::join_with_registry( + &JoinArgs { + session: session.clone(), + agent: "reviewer-1".to_string(), + timeout: None, + }, + ®istry, + )?; + + // Implementer gets step 1 instruction + assert_eq!( + commands::next_with_registry( + &NextArgs { + session: session.clone(), + agent: "implementer".to_string(), + timeout: Some(2), + }, + ®istry + )?, + 0 + ); + + // Checkpoint step 1 + run_cli( + &[ + "rally", + "build", + "checkpoint", + "--session", + &session, + "--as", + "implementer", + ], + ®istry, + )?; + + // Reviewer approves step 1 + run_cli( + &[ + "rally", + "build", + "review", + "--session", + &session, + "--as", + "reviewer-1", + "--verdict", + "approve", + ], + ®istry, + )?; + + // Verify session is now paused (not advancing to step 2 automatically) + { + let handle = state::SessionHandle::open(&session)?; + let s = handle.load_state()?; + assert_eq!(s.phase, SessionPhase::Paused); + } + + // Implementer's next call returns exit code 2 (complete/paused) + assert_eq!( + commands::next_with_registry( + &NextArgs { + session: session.clone(), + agent: "implementer".to_string(), + timeout: Some(2), + }, + ®istry + )?, + 2 + ); + + // Continue the session with feedback + commands::continue_session_with_registry(&ContinueArgs { + session: session.clone(), + feedback: Some("please also handle edge cases".to_string()), + })?; + + // Verify phase is now Implement again + { + let handle = state::SessionHandle::open(&session)?; + let s = handle.load_state()?; + assert_eq!(s.phase, SessionPhase::Implement); + } + + // Implementer gets step 2 instruction (with feedback) + assert_eq!( + commands::next_with_registry( + &NextArgs { + session: session.clone(), + agent: "implementer".to_string(), + timeout: Some(2), + }, + ®istry + )?, + 0 + ); + + // Checkpoint step 2 + run_cli( + &[ + "rally", + "build", + "checkpoint", + "--session", + &session, + "--as", + "implementer", + ], + ®istry, + )?; + + // Reviewer approves step 2 → should go to FinalReview (not paused) + // because pause-after-step only applies between steps, not at final review + run_cli( + &[ + "rally", + "build", + "review", + "--session", + &session, + "--as", + "reviewer-1", + "--verdict", + "approve", + ], + ®istry, + )?; + + // Verify it went to FinalReview (last step triggers final review, not pause) + { + let handle = state::SessionHandle::open(&session)?; + let s = handle.load_state()?; + assert_eq!(s.phase, SessionPhase::FinalReview); + } + + // Reviewer approves final review + run_cli( + &[ + "rally", + "build", + "review", + "--session", + &session, + "--as", + "reviewer-1", + "--verdict", + "approve", + ], + ®istry, + )?; + + // Session complete + assert_eq!( + commands::next_with_registry( + &NextArgs { + session: session.clone(), + agent: "implementer".to_string(), + timeout: Some(2), + }, + ®istry + )?, + 2 + ); + + // Verify continue fails on non-paused session + let err = commands::continue_session_with_registry(&ContinueArgs { + session: session.clone(), + feedback: None, + }) + .expect_err("continue should fail on done session"); + assert!(err.to_string().contains("not paused")); + + let _ = fs::remove_file(todo); + let _ = fs::remove_dir_all(state::session_dir(&session)); + Ok(()) +} + +#[test] +fn pause_after_step_flag_parsed_by_run_command() -> Result<()> { + let workspace = std::env::temp_dir().join(format!( + "rally-pause-flag-{}", + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_nanos() + )); + fs::create_dir_all(&workspace)?; + let todo = workspace.join("task.md"); + fs::write( + &todo, + "## Spec\n\ntest\n\n## Plan\n\n1. Step one\nAcceptance criteria:\n- done\n", + )?; + + let cmd = rally::workflow::builtin::ImplementRunCommand; + use rally::workflow::RunCommand; + + // Without flag + let res = cmd.resolve( + &["task.md".to_string(), "implement".to_string()], + &workspace, + )?; + match res { + rally::workflow::RunCommandResolution::Session { create_args, .. } => { + assert!(!create_args.pause_after_step); + } + _ => panic!("expected Session"), + } + + // With flag + let res = cmd.resolve( + &[ + "task.md".to_string(), + "implement".to_string(), + "--pause-after-step".to_string(), + ], + &workspace, + )?; + match res { + rally::workflow::RunCommandResolution::Session { create_args, .. } => { + assert!(create_args.pause_after_step); + } + _ => panic!("expected Session"), + } + + let _ = fs::remove_dir_all(&workspace); + Ok(()) +}