Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/rally-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub enum SessionPhase {
FinalizationReview,
Implement,
Review,
FinalReview,
Done,
}

Expand Down
249 changes: 187 additions & 62 deletions crates/rally-workflow-build/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,22 @@ pub fn get_instruction(state: &SessionState, agent: &str) -> Option<String> {
if !is_implementer_with_state(&workflow_state, agent) {
return None;
}
let step = workflow_state.current_step()?;

let mut lines = vec![format!("Implement step {}: {}", step.number, step.title)];
lines.push("Acceptance criteria:".to_string());
lines.push(format_criteria(&step.acceptance_criteria));
let mut lines = if let Some(step) = workflow_state.current_step() {
vec![
format!("Implement step {}: {}", step.number, step.title),
"Acceptance criteria:".to_string(),
format_criteria(&step.acceptance_criteria),
]
} else {
vec![
"Implement final holistic fixes requested by reviewers.".to_string(),
"Acceptance criteria:".to_string(),
"- Address all final review feedback and verify integrated behavior across the full change."
.to_string(),
"- Ensure docs/tests are coherent for the entire workflow, not just a single step."
.to_string(),
]
};

if !workflow_state.pending_feedback.is_empty() {
lines.push("Reviewer feedback to address:".to_string());
Expand All @@ -176,26 +187,33 @@ pub fn get_instruction(state: &SessionState, agent: &str) -> Option<String> {
));
Some(lines.join("\n"))
}
SessionPhase::Review => {
SessionPhase::Review | SessionPhase::FinalReview => {
if is_implementer_with_state(&workflow_state, agent) {
return None;
}
let step = workflow_state.current_step()?;
let checkpoint = workflow_state.checkpoint.as_ref()?;

if workflow_state.reviews.iter().any(|r| r.reviewer == agent) {
return None;
}

Some(format!(
"Review step {}: {}\nCommit: {}\nAcceptance criteria:\n{}\nSubmit verdict with rally build review --session {} --as {} --verdict approve|changes-requested|blocked",
step.number,
step.title,
checkpoint.commit_sha,
format_criteria(&step.acceptance_criteria),
state.name,
agent
))
if state.phase == SessionPhase::FinalReview {
Some(format!(
"Final holistic review of full implementation\nCommit: {}\nReview the complete change for cross-step integration, architecture consistency, and missing risks.\nSubmit verdict with rally build review --session {} --as {} --verdict APPROVE|CHANGES_REQUESTED|BLOCKED",
checkpoint.commit_sha, state.name, agent
))
} else {
let step = workflow_state.current_step()?;
Some(format!(
"Review step {}: {}\nCommit: {}\nAcceptance criteria:\n{}\nSubmit verdict with rally build review --session {} --as {} --verdict APPROVE|CHANGES_REQUESTED|BLOCKED",
step.number,
step.title,
checkpoint.commit_sha,
format_criteria(&step.acceptance_criteria),
state.name,
agent
))
}
}
_ => None,
}
Expand All @@ -216,14 +234,16 @@ pub fn get_wait_hint(state: &SessionState, agent: &str) -> Option<String> {
))
} else {
Some(format!(
"Waiting for implementer '{}' to submit a checkpoint.",
"Waiting for implementer '{}' to submit a final holistic-fixes checkpoint.",
workflow_state.implementer_agent
))
}
}
SessionPhase::Review => {
SessionPhase::Review | SessionPhase::FinalReview => {
if is_implementer_with_state(&workflow_state, agent) {
if let Some(step) = workflow_state.current_step() {
if state.phase == SessionPhase::FinalReview {
Some("Waiting for reviewer verdicts on final holistic review.".to_string())
} else if let Some(step) = workflow_state.current_step() {
Some(format!(
"Waiting for reviewer verdicts on step {}: {}.",
step.number, step.title
Expand All @@ -232,10 +252,15 @@ pub fn get_wait_hint(state: &SessionState, agent: &str) -> Option<String> {
Some("Waiting for reviewer verdicts.".to_string())
}
} else if workflow_state.reviews.iter().any(|r| r.reviewer == agent) {
Some(
"Your review was submitted. Waiting for remaining reviewer verdicts or decision."
.to_string(),
)
if state.phase == SessionPhase::FinalReview {
Some("Your final review was submitted. Waiting for remaining reviewer verdicts."
.to_string())
} else {
Some(
"Your review was submitted. Waiting for remaining reviewer verdicts or decision."
.to_string(),
)
}
} else {
None
}
Expand All @@ -260,51 +285,46 @@ pub fn checkpoint(state: &mut SessionState, agent: &str) -> Result<String> {
bail!("checkpoint is only valid during implement phase");
}

let step = workflow_state
let step_number = if let Some(step) = workflow_state
.steps
.get_mut(workflow_state.current_step_idx)
.ok_or_else(|| anyhow!("no current step available"))?;
step.status = StepStatus::InProgress;
{
step.status = StepStatus::InProgress;
step.number
} else {
0
};

let workspace = state
.workspace
.as_ref()
.ok_or_else(|| anyhow!("implement session missing workspace path"))?;

let output = Command::new("git")
.arg("-C")
.arg(workspace)
.arg("rev-parse")
.arg("HEAD")
.output()
.map_err(|e| anyhow!("failed to run git rev-parse HEAD in {}: {e}", workspace))?;

if !output.status.success() {
bail!(
"git rev-parse HEAD failed in {}: {}",
workspace,
String::from_utf8_lossy(&output.stderr).trim()
);
}

let sha = String::from_utf8_lossy(&output.stdout).trim().to_string();
if sha.is_empty() {
bail!("git rev-parse HEAD returned an empty commit sha");
}
let sha = resolve_head_sha(workspace)?;

workflow_state.checkpoint = Some(CheckpointState {
step_number: step.number,
step_number,
commit_sha: sha.clone(),
created_at: now(),
});
workflow_state.reviews.clear();
workflow_state.pending_feedback.clear();
state.phase = SessionPhase::Review;
state.phase = if step_number == 0 {
SessionPhase::FinalReview
} else {
SessionPhase::Review
};
write_workflow_state(state, &workflow_state)?;

Ok(format!(
"checkpoint captured at commit {sha}; moved to review"
))
if step_number == 0 {
Ok(format!(
"final holistic checkpoint captured at commit {sha}; moved to final review"
))
} else {
Ok(format!(
"checkpoint captured at commit {sha}; moved to review"
))
}
}

pub fn submit_review(
Expand All @@ -316,8 +336,8 @@ pub fn submit_review(
) -> Result<String> {
let mut workflow_state = read_workflow_state(state)?;

if state.phase != SessionPhase::Review {
bail!("review is only valid during review phase");
if state.phase != SessionPhase::Review && state.phase != SessionPhase::FinalReview {
bail!("review is only valid during review or final review phase");
}
if is_implementer_with_state(&workflow_state, reviewer) {
bail!("implementer cannot submit review verdicts");
Expand All @@ -335,16 +355,19 @@ pub fn submit_review(
.as_ref()
.ok_or_else(|| anyhow!("cannot review without an active checkpoint"))?;

let step_dir = session_dir
.join("reviews")
.join(format!("step-{:02}", checkpoint.step_number));
let step_dir = session_dir.join("reviews").join(review_bucket_name(checkpoint));
fs::create_dir_all(&step_dir)?;
let review_file = step_dir.join(format!("{reviewer}.md"));
let step_label = if checkpoint.step_number == 0 {
"final".to_string()
} else {
checkpoint.step_number.to_string()
};
let file_content = format!(
"verdict: {:?}\nreviewer: {}\nstep: {}\ncommit: {}\ntime: {}\n\n{}\n",
verdict,
reviewer,
checkpoint.step_number,
step_label,
checkpoint.commit_sha,
now().to_rfc3339(),
message.clone().unwrap_or_default()
Expand All @@ -368,9 +391,10 @@ pub fn submit_review(
}

pub fn process_review_state(state: &mut SessionState) -> Result<Option<String>> {
if state.phase != SessionPhase::Review {
if state.phase != SessionPhase::Review && state.phase != SessionPhase::FinalReview {
return Ok(None);
}
let is_final_review = state.phase == SessionPhase::FinalReview;

let mut workflow_state = read_workflow_state(state)?;
let checkpoint = match workflow_state.checkpoint.clone() {
Expand Down Expand Up @@ -399,8 +423,32 @@ pub fn process_review_state(state: &mut SessionState) -> Result<Option<String>>
&& workflow_state.reviews.len() >= expected_reviewers;

if has_blocked {
if is_final_review {
workflow_state.pending_feedback = workflow_state
.reviews
.iter()
.filter(|r| r.verdict == ReviewVerdict::Blocked)
.map(|r| {
r.message
.clone()
.unwrap_or_else(|| format!("{} marked final review BLOCKED", r.reviewer))
})
.collect();
workflow_state.reviews.clear();
workflow_state.checkpoint = None;
state.phase = SessionPhase::Implement;
write_workflow_state(state, &workflow_state)?;
return Ok(Some(
"final review marked BLOCKED; returned to implement phase".to_string(),
));
}
complete_step(state, &mut workflow_state, StepStatus::Escalated, true)?;
write_workflow_state(state, &workflow_state)?;
if state.phase == SessionPhase::FinalReview {
return Ok(Some(
"review marked BLOCKED; entered final holistic review".to_string(),
));
}
return Ok(Some("review marked BLOCKED; step escalated".to_string()));
}

Expand All @@ -419,17 +467,40 @@ pub fn process_review_state(state: &mut SessionState) -> Result<Option<String>>
workflow_state.checkpoint = None;
state.phase = SessionPhase::Implement;
write_workflow_state(state, &workflow_state)?;
return Ok(Some(
"changes requested; returned to implement phase".to_string(),
));
if is_final_review {
return Ok(Some(
"final review requested changes; returned to implement phase".to_string(),
));
}
return Ok(Some("changes requested; returned to implement phase".to_string()));
}

if all_approve {
if is_final_review {
workflow_state.pending_feedback.clear();
workflow_state.reviews.clear();
workflow_state.checkpoint = None;
state.phase = SessionPhase::Done;
write_workflow_state(state, &workflow_state)?;
return Ok(Some(
"all reviewers approved final holistic review; session complete".to_string(),
));
}

complete_step(state, &mut workflow_state, StepStatus::Approved, false)?;
write_workflow_state(state, &workflow_state)?;
if state.phase == SessionPhase::FinalReview {
return Ok(Some(
"all reviewers approved; moved to final holistic review".to_string(),
));
}
return Ok(Some("all reviewers approved; advanced".to_string()));
}

if is_final_review {
return Ok(None);
}

let elapsed = now()
.signed_duration_since(checkpoint.created_at)
.num_seconds();
Expand Down Expand Up @@ -479,7 +550,8 @@ fn complete_step(
workflow_state.checkpoint = None;

if idx + 1 >= workflow_state.steps.len() {
state.phase = SessionPhase::Done;
workflow_state.current_step_idx = workflow_state.steps.len();
begin_final_review(state, workflow_state)?;
return Ok(());
}

Expand All @@ -493,6 +565,59 @@ fn expected_reviewer_count(state: &SessionState) -> usize {
expected.max(1)
}

fn review_bucket_name(checkpoint: &CheckpointState) -> String {
if checkpoint.step_number == 0 {
"final".to_string()
} else {
format!("step-{:02}", checkpoint.step_number)
}
}

fn begin_final_review(
state: &mut SessionState,
workflow_state: &mut BuildWorkflowState,
) -> Result<()> {
let workspace = state
.workspace
.as_ref()
.ok_or_else(|| anyhow!("implement session missing workspace path"))?;
let sha = resolve_head_sha(workspace)?;

workflow_state.checkpoint = Some(CheckpointState {
step_number: 0,
commit_sha: sha,
created_at: now(),
});
workflow_state.reviews.clear();
workflow_state.pending_feedback.clear();
state.phase = SessionPhase::FinalReview;
Ok(())
}

fn resolve_head_sha(workspace: &str) -> Result<String> {
let output = Command::new("git")
.arg("-C")
.arg(workspace)
.arg("rev-parse")
.arg("HEAD")
.output()
.map_err(|e| anyhow!("failed to run git rev-parse HEAD in {}: {e}", workspace))?;

if !output.status.success() {
bail!(
"git rev-parse HEAD failed in {}: {}",
workspace,
String::from_utf8_lossy(&output.stderr).trim()
);
}

let sha = String::from_utf8_lossy(&output.stdout).trim().to_string();
if sha.is_empty() {
bail!("git rev-parse HEAD returned an empty commit sha");
}
Ok(sha)
}

fn format_criteria(criteria: &[String]) -> String {
if criteria.is_empty() {
return "- (none listed)".to_string();
Expand Down
Loading