diff --git a/.github/workflows/pre-merge.yml b/.github/workflows/pre-merge.yml new file mode 100644 index 0000000..e8fd9b6 --- /dev/null +++ b/.github/workflows/pre-merge.yml @@ -0,0 +1,35 @@ +name: pre-merge + +on: + pull_request: + branches: [main, master] + workflow_dispatch: + +jobs: + pre-merge: + if: github.actor == 'justinmoon' + runs-on: blacksmith-16vcpu-ubuntu-2404 + steps: + - uses: actions/checkout@v4 + + - uses: useblacksmith/stickydisk@v1 + with: + key: ${{ github.repository }}-nix-v1-${{ runner.os }} + path: /nix + - name: Fix /nix ownership + run: | + if [ -d /nix ] && [ "$(stat -c %u /nix)" != "$(id -u)" ]; then + sudo chown -R "$(id -u):$(id -g)" /nix + fi + - uses: nixbuild/nix-quick-install-action@v30 + + - uses: useblacksmith/stickydisk@v1 + with: + key: ${{ github.repository }}-cargo-home-v1-${{ runner.os }} + path: ~/.cargo + - uses: useblacksmith/stickydisk@v1 + with: + key: ${{ github.repository }}-cargo-target-v1-${{ runner.os }} + path: target + + - run: nix develop .#default -c just pre-merge diff --git a/Cargo.lock b/Cargo.lock index fd06ab2..90a1c4d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -342,6 +342,41 @@ dependencies = [ "clap", "dirs", "fs2", + "rally-core", + "rally-workflow-build", + "rally-workflow-plan", + "serde", + "serde_json", +] + +[[package]] +name = "rally-core" +version = "0.1.0" +dependencies = [ + "chrono", + "serde", + "serde_json", +] + +[[package]] +name = "rally-workflow-build" +version = "0.1.0" +dependencies = [ + "anyhow", + "chrono", + "rally-core", + "serde", + "serde_json", +] + +[[package]] +name = "rally-workflow-plan" +version = "0.1.0" +dependencies = [ + "anyhow", + "chrono", + "rally-core", + "rally-workflow-build", "serde", "serde_json", ] diff --git a/Cargo.toml b/Cargo.toml index 0354e48..0fdcd25 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,13 @@ name = "rally" version = "0.1.0" edition = "2024" +[workspace] +members = [ + "crates/rally-core", + "crates/rally-workflow-plan", + "crates/rally-workflow-build", +] + [dependencies] anyhow = "1.0" chrono = { version = "0.4", features = ["serde", "clock"] } @@ -11,3 +18,6 @@ dirs = "6.0" fs2 = "0.4" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +rally-core = { path = "crates/rally-core" } +rally-workflow-build = { path = "crates/rally-workflow-build" } +rally-workflow-plan = { path = "crates/rally-workflow-plan" } diff --git a/crates/rally-core/Cargo.toml b/crates/rally-core/Cargo.toml new file mode 100644 index 0000000..53d6eee --- /dev/null +++ b/crates/rally-core/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "rally-core" +version = "0.1.0" +edition = "2024" + +[dependencies] +chrono = { version = "0.4", features = ["serde", "clock"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" diff --git a/crates/rally-core/src/lib.rs b/crates/rally-core/src/lib.rs new file mode 100644 index 0000000..035f8d0 --- /dev/null +++ b/crates/rally-core/src/lib.rs @@ -0,0 +1,69 @@ +use std::collections::BTreeMap; + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum SessionType { + Plan, + Implement, + Workflow, +} + +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum SessionPhase { + Registration, + Proposal, + Analysis, + Negotiation, + FinalizationWrite, + FinalizationReview, + Implement, + Review, + Done, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Config { + pub expected_agents: u32, + pub max_rounds: u32, + pub turn_timeout_secs: u64, + pub review_timeout_secs: u64, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct AgentState { + pub name: String, + pub joined_at: DateTime, + pub last_seen: DateTime, + pub phase_status: String, + pub rounds: u32, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct SessionState { + pub name: String, + pub session_type: SessionType, + pub phase: SessionPhase, + pub created_at: DateTime, + pub config: Config, + pub agents: BTreeMap, + pub topic: Option, + pub todo_path: Option, + pub workspace: Option, + #[serde(default)] + pub workflow_id: Option, + #[serde(default)] + pub workflow_source: Option, + #[serde(default)] + pub workflow_version: Option, + #[serde(default)] + pub workflow_state: Option, +} + +pub fn now() -> DateTime { + Utc::now() +} diff --git a/crates/rally-workflow-build/Cargo.toml b/crates/rally-workflow-build/Cargo.toml new file mode 100644 index 0000000..de52d6a --- /dev/null +++ b/crates/rally-workflow-build/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "rally-workflow-build" +version = "0.1.0" +edition = "2024" + +[dependencies] +anyhow = "1.0" +chrono = { version = "0.4", features = ["serde", "clock"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +rally-core = { path = "../rally-core" } diff --git a/src/implement.rs b/crates/rally-workflow-build/src/lib.rs similarity index 56% rename from src/implement.rs rename to crates/rally-workflow-build/src/lib.rs index 2644104..38bfc6c 100644 --- a/src/implement.rs +++ b/crates/rally-workflow-build/src/lib.rs @@ -1,11 +1,147 @@ use std::{fs, path::Path, process::Command}; -use anyhow::{Result, anyhow, bail}; +use anyhow::{Context, Result, anyhow, bail}; +use chrono::{DateTime, Utc}; +use rally_core::{SessionPhase, SessionState, now}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +pub enum StepStatus { + Pending, + InProgress, + Approved, + Escalated, +} + +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +pub enum ReviewVerdict { + Approve, + ChangesRequested, + Blocked, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct StepState { + pub number: u32, + pub title: String, + pub acceptance_criteria: Vec, + pub status: StepStatus, + pub escalated: bool, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct CheckpointState { + pub step_number: u32, + pub commit_sha: String, + pub created_at: DateTime, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ReviewState { + pub reviewer: String, + pub verdict: ReviewVerdict, + pub message: Option, + pub created_at: DateTime, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct BuildWorkflowState { + #[serde(default = "default_state_version")] + pub state_version: u32, + #[serde(default = "default_implementer")] + pub implementer_agent: String, + #[serde(default)] + pub steps: Vec, + #[serde(default)] + pub current_step_idx: usize, + #[serde(default)] + pub checkpoint: Option, + #[serde(default)] + pub reviews: Vec, + #[serde(default)] + pub pending_feedback: Vec, +} + +impl Default for BuildWorkflowState { + fn default() -> Self { + Self { + state_version: default_state_version(), + implementer_agent: default_implementer(), + steps: Vec::new(), + current_step_idx: 0, + checkpoint: None, + reviews: Vec::new(), + pending_feedback: Vec::new(), + } + } +} + +fn default_state_version() -> u32 { + 1 +} + +fn default_implementer() -> String { + "implementer".to_string() +} + +impl BuildWorkflowState { + pub fn validate(&self) -> Result<()> { + if self.implementer_agent.trim().is_empty() { + bail!("workflow_state implementer_agent cannot be empty"); + } + if self.current_step_idx > self.steps.len() { + bail!( + "workflow_state current_step_idx {} exceeds steps length {}", + self.current_step_idx, + self.steps.len() + ); + } + Ok(()) + } + + pub fn current_step(&self) -> Option<&StepState> { + self.steps.get(self.current_step_idx) + } +} + +pub fn initial_workflow_state(implementer_agent: String, steps: Vec) -> Result { + let state = BuildWorkflowState { + implementer_agent, + steps, + ..BuildWorkflowState::default() + }; + encode_workflow_state(&state) +} + +pub fn parse_workflow_state(raw: Option) -> Result { + let parsed = match raw { + Some(raw) => serde_json::from_value::(raw) + .context("invalid build workflow_state payload")?, + None => BuildWorkflowState::default(), + }; + parsed.validate()?; + Ok(parsed) +} + +pub fn encode_workflow_state(workflow_state: &BuildWorkflowState) -> Result { + workflow_state.validate()?; + serde_json::to_value(workflow_state).context("failed to encode build workflow_state") +} + +pub fn read_workflow_state(state: &SessionState) -> Result { + parse_workflow_state(state.workflow_state.clone()) +} -use crate::state::{ - CheckpointState, ReviewState, ReviewVerdict, SessionPhase, SessionState, StepState, StepStatus, - now, -}; +fn write_workflow_state( + state: &mut SessionState, + workflow_state: &BuildWorkflowState, +) -> Result<()> { + state.workflow_state = Some(encode_workflow_state(workflow_state)?); + Ok(()) +} pub fn parse_steps_from_todo(path: &Path) -> Result> { let raw = fs::read_to_string(path) @@ -14,20 +150,22 @@ pub fn parse_steps_from_todo(path: &Path) -> Result> { } pub fn get_instruction(state: &SessionState, agent: &str) -> Option { + let workflow_state = read_workflow_state(state).ok()?; + match state.phase { SessionPhase::Implement => { - if !is_implementer(state, agent) { + if !is_implementer_with_state(&workflow_state, agent) { return None; } - let step = state.steps.get(state.current_step_idx)?; + let step = workflow_state.current_step()?; let mut lines = vec![format!("Implement step {}: {}", step.number, step.title)]; lines.push("Acceptance criteria:".to_string()); lines.push(format_criteria(&step.acceptance_criteria)); - if !state.pending_feedback.is_empty() { + if !workflow_state.pending_feedback.is_empty() { lines.push("Reviewer feedback to address:".to_string()); - for feedback in &state.pending_feedback { + for feedback in &workflow_state.pending_feedback { lines.push(format!("- {feedback}")); } } @@ -39,18 +177,18 @@ pub fn get_instruction(state: &SessionState, agent: &str) -> Option { Some(lines.join("\n")) } SessionPhase::Review => { - if is_implementer(state, agent) { + if is_implementer_with_state(&workflow_state, agent) { return None; } - let step = state.steps.get(state.current_step_idx)?; - let checkpoint = state.checkpoint.as_ref()?; + let step = workflow_state.current_step()?; + let checkpoint = workflow_state.checkpoint.as_ref()?; - if state.reviews.iter().any(|r| r.reviewer == agent) { + if workflow_state.reviews.iter().any(|r| r.reviewer == agent) { return None; } Some(format!( - "Review step {}: {}\nCommit: {}\nAcceptance criteria:\n{}\nSubmit verdict with rally build review --session {} --as {} --verdict APPROVE|CHANGES_REQUESTED|BLOCKED", + "Review step {}: {}\nCommit: {}\nAcceptance criteria:\n{}\nSubmit verdict with rally build review --session {} --as {} --verdict approve|changes-requested|blocked", step.number, step.title, checkpoint.commit_sha, @@ -64,29 +202,28 @@ pub fn get_instruction(state: &SessionState, agent: &str) -> Option { } pub fn get_wait_hint(state: &SessionState, agent: &str) -> Option { + let workflow_state = read_workflow_state(state).ok()?; + match state.phase { SessionPhase::Implement => { - if is_implementer(state, agent) { + if is_implementer_with_state(&workflow_state, agent) { return None; } - let implementer = state - .implementer_agent - .clone() - .unwrap_or_else(|| "implementer".to_string()); - if let Some(step) = state.steps.get(state.current_step_idx) { + if let Some(step) = workflow_state.current_step() { Some(format!( - "Waiting for implementer '{implementer}' to checkpoint step {}: {}.", - step.number, step.title + "Waiting for implementer '{}' to checkpoint step {}: {}.", + workflow_state.implementer_agent, step.number, step.title )) } else { Some(format!( - "Waiting for implementer '{implementer}' to submit a checkpoint." + "Waiting for implementer '{}' to submit a checkpoint.", + workflow_state.implementer_agent )) } } SessionPhase::Review => { - if is_implementer(state, agent) { - if let Some(step) = state.steps.get(state.current_step_idx) { + if is_implementer_with_state(&workflow_state, agent) { + if let Some(step) = workflow_state.current_step() { Some(format!( "Waiting for reviewer verdicts on step {}: {}.", step.number, step.title @@ -94,9 +231,11 @@ pub fn get_wait_hint(state: &SessionState, agent: &str) -> Option { } else { Some("Waiting for reviewer verdicts.".to_string()) } - } else if state.reviews.iter().any(|r| r.reviewer == agent) { - Some("Your review was submitted. Waiting for remaining reviewer verdicts or decision." - .to_string()) + } else if workflow_state.reviews.iter().any(|r| r.reviewer == agent) { + Some( + "Your review was submitted. Waiting for remaining reviewer verdicts or decision." + .to_string(), + ) } else { None } @@ -113,16 +252,17 @@ pub fn mark_done(state: &mut SessionState, _agent: &str) -> Result { } pub fn checkpoint(state: &mut SessionState, agent: &str) -> Result { - if !is_implementer(state, agent) { + let mut workflow_state = read_workflow_state(state)?; + if !is_implementer_with_state(&workflow_state, agent) { bail!("only implementer can create a checkpoint"); } if state.phase != SessionPhase::Implement { bail!("checkpoint is only valid during implement phase"); } - let step = state + let step = workflow_state .steps - .get_mut(state.current_step_idx) + .get_mut(workflow_state.current_step_idx) .ok_or_else(|| anyhow!("no current step available"))?; step.status = StepStatus::InProgress; @@ -152,14 +292,15 @@ pub fn checkpoint(state: &mut SessionState, agent: &str) -> Result { bail!("git rev-parse HEAD returned an empty commit sha"); } - state.checkpoint = Some(CheckpointState { + workflow_state.checkpoint = Some(CheckpointState { step_number: step.number, commit_sha: sha.clone(), created_at: now(), }); - state.reviews.clear(); - state.pending_feedback.clear(); + workflow_state.reviews.clear(); + workflow_state.pending_feedback.clear(); state.phase = SessionPhase::Review; + write_workflow_state(state, &workflow_state)?; Ok(format!( "checkpoint captured at commit {sha}; moved to review" @@ -173,17 +314,23 @@ pub fn submit_review( verdict: ReviewVerdict, message: Option, ) -> Result { + let mut workflow_state = read_workflow_state(state)?; + if state.phase != SessionPhase::Review { bail!("review is only valid during review phase"); } - if is_implementer(state, reviewer) { + if is_implementer_with_state(&workflow_state, reviewer) { bail!("implementer cannot submit review verdicts"); } - if state.reviews.iter().any(|r| r.reviewer == reviewer) { + if workflow_state + .reviews + .iter() + .any(|r| r.reviewer == reviewer) + { bail!("reviewer '{}' already submitted a verdict", reviewer); } - let checkpoint = state + let checkpoint = workflow_state .checkpoint .as_ref() .ok_or_else(|| anyhow!("cannot review without an active checkpoint"))?; @@ -204,12 +351,13 @@ pub fn submit_review( ); fs::write(&review_file, file_content)?; - state.reviews.push(ReviewState { + workflow_state.reviews.push(ReviewState { reviewer: reviewer.to_string(), verdict, message, created_at: now(), }); + write_workflow_state(state, &workflow_state)?; let mut out = format!("recorded review at {}", review_file.display()); if let Some(decision) = process_review_state(state)? { @@ -224,38 +372,40 @@ pub fn process_review_state(state: &mut SessionState) -> Result> return Ok(None); } - let checkpoint = match state.checkpoint.clone() { + let mut workflow_state = read_workflow_state(state)?; + let checkpoint = match workflow_state.checkpoint.clone() { Some(cp) => cp, None => return Ok(None), }; let expected_reviewers = expected_reviewer_count(state); - let has_blocked = state + let has_blocked = workflow_state .reviews .iter() .any(|r| r.verdict == ReviewVerdict::Blocked); - let has_changes = state + let has_changes = workflow_state .reviews .iter() .any(|r| r.verdict == ReviewVerdict::ChangesRequested); - let has_approve = state + let has_approve = workflow_state .reviews .iter() .any(|r| r.verdict == ReviewVerdict::Approve); - let all_approve = !state.reviews.is_empty() - && state + let all_approve = !workflow_state.reviews.is_empty() + && workflow_state .reviews .iter() .all(|r| r.verdict == ReviewVerdict::Approve) - && state.reviews.len() >= expected_reviewers; + && workflow_state.reviews.len() >= expected_reviewers; if has_blocked { - complete_step(state, StepStatus::Escalated, true)?; + complete_step(state, &mut workflow_state, StepStatus::Escalated, true)?; + write_workflow_state(state, &workflow_state)?; return Ok(Some("review marked BLOCKED; step escalated".to_string())); } if has_changes { - state.pending_feedback = state + workflow_state.pending_feedback = workflow_state .reviews .iter() .filter(|r| r.verdict == ReviewVerdict::ChangesRequested) @@ -265,16 +415,18 @@ pub fn process_review_state(state: &mut SessionState) -> Result> .unwrap_or_else(|| format!("{} requested changes", r.reviewer)) }) .collect(); - state.reviews.clear(); - state.checkpoint = None; + workflow_state.reviews.clear(); + workflow_state.checkpoint = None; state.phase = SessionPhase::Implement; + write_workflow_state(state, &workflow_state)?; return Ok(Some( "changes requested; returned to implement phase".to_string(), )); } if all_approve { - complete_step(state, StepStatus::Approved, false)?; + complete_step(state, &mut workflow_state, StepStatus::Approved, false)?; + write_workflow_state(state, &workflow_state)?; return Ok(Some("all reviewers approved; advanced".to_string())); } @@ -286,41 +438,52 @@ pub fn process_review_state(state: &mut SessionState) -> Result> && !has_changes && !has_blocked { - complete_step(state, StepStatus::Approved, false)?; + complete_step(state, &mut workflow_state, StepStatus::Approved, false)?; + write_workflow_state(state, &workflow_state)?; return Ok(Some( "review timeout with partial approval; advanced by timeout policy".to_string(), )); } + write_workflow_state(state, &workflow_state)?; Ok(None) } pub fn is_implementer(state: &SessionState, agent: &str) -> bool { - match state.implementer_agent.as_deref() { - Some(name) => name == agent, - None => agent == "implementer", + match read_workflow_state(state) { + Ok(workflow_state) => is_implementer_with_state(&workflow_state, agent), + Err(_) => agent == "implementer", } } -fn complete_step(state: &mut SessionState, status: StepStatus, escalated: bool) -> Result<()> { - let idx = state.current_step_idx; - let step = state +fn is_implementer_with_state(workflow_state: &BuildWorkflowState, agent: &str) -> bool { + workflow_state.implementer_agent == agent +} + +fn complete_step( + state: &mut SessionState, + workflow_state: &mut BuildWorkflowState, + status: StepStatus, + escalated: bool, +) -> Result<()> { + let idx = workflow_state.current_step_idx; + let step = workflow_state .steps .get_mut(idx) .ok_or_else(|| anyhow!("no current step available"))?; step.status = status; step.escalated = escalated; - state.pending_feedback.clear(); - state.reviews.clear(); - state.checkpoint = None; + workflow_state.pending_feedback.clear(); + workflow_state.reviews.clear(); + workflow_state.checkpoint = None; - if idx + 1 >= state.steps.len() { + if idx + 1 >= workflow_state.steps.len() { state.phase = SessionPhase::Done; return Ok(()); } - state.current_step_idx += 1; + workflow_state.current_step_idx += 1; state.phase = SessionPhase::Implement; Ok(()) } diff --git a/crates/rally-workflow-plan/Cargo.toml b/crates/rally-workflow-plan/Cargo.toml new file mode 100644 index 0000000..efa1b42 --- /dev/null +++ b/crates/rally-workflow-plan/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "rally-workflow-plan" +version = "0.1.0" +edition = "2024" + +[dependencies] +anyhow = "1.0" +chrono = { version = "0.4", features = ["serde", "clock"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +rally-core = { path = "../rally-core" } +rally-workflow-build = { path = "../rally-workflow-build" } diff --git a/crates/rally-workflow-plan/src/lib.rs b/crates/rally-workflow-plan/src/lib.rs new file mode 100644 index 0000000..101b98e --- /dev/null +++ b/crates/rally-workflow-plan/src/lib.rs @@ -0,0 +1,1287 @@ +use std::{ + collections::{BTreeMap, BTreeSet, VecDeque}, + fs, + path::{Path, PathBuf}, +}; + +use anyhow::{Context, Result, anyhow, bail}; +use chrono::{DateTime, Duration, Utc}; +use rally_core::{SessionPhase, SessionState, now}; +use rally_workflow_build::parse_steps_from_todo; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +pub enum IssueStatus { + Open, + Agreed, + Escalated, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct TurnState { + pub holder: String, + pub started_at: DateTime, + pub round: u32, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct IssueState { + pub id: u32, + pub slug: String, + pub file: String, + pub title: String, + pub author: String, + pub status: IssueStatus, + pub positions: BTreeSet, + pub challenges: BTreeSet, + pub agreed_by: Option, + pub created_at: DateTime, +} + +#[derive(Clone, Debug, Serialize, Deserialize, Default)] +pub struct FinalizationState { + pub writer: Option, + pub reviewer: Option, + pub todo_ready: bool, + pub reopened_for_issues: bool, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct PlanWorkflowState { + #[serde(default = "default_state_version")] + pub state_version: u32, + #[serde(default)] + pub turn: Option, + #[serde(default)] + pub issues: Vec, + #[serde(default)] + pub finalization: FinalizationState, +} + +impl Default for PlanWorkflowState { + fn default() -> Self { + Self { + state_version: default_state_version(), + turn: None, + issues: Vec::new(), + finalization: FinalizationState::default(), + } + } +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum PlanPollDispatch { + Instruction { body: String }, + Wait { hint: Option }, + Complete { message: String }, +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct PlanDoneDispatch { + pub message: String, + pub next_phase: Option, +} + +fn default_state_version() -> u32 { + 1 +} + +impl PlanWorkflowState { + pub fn validate(&self) -> Result<()> { + if let Some(turn) = &self.turn + && turn.holder.trim().is_empty() + { + bail!("workflow_state turn holder cannot be empty"); + } + Ok(()) + } +} + +pub fn initial_workflow_state() -> Result { + encode_workflow_state(&PlanWorkflowState::default()) +} + +pub fn parse_workflow_state(raw: Option) -> Result { + let parsed = match raw { + Some(raw) => serde_json::from_value::(raw) + .context("invalid plan workflow_state payload")?, + None => PlanWorkflowState::default(), + }; + parsed.validate()?; + Ok(parsed) +} + +pub fn encode_workflow_state(workflow_state: &PlanWorkflowState) -> Result { + workflow_state.validate()?; + serde_json::to_value(workflow_state).context("failed to encode plan workflow_state") +} + +pub fn read_workflow_state(state: &SessionState) -> Result { + parse_workflow_state(state.workflow_state.clone()) +} + +fn write_workflow_state( + state: &mut SessionState, + workflow_state: &PlanWorkflowState, +) -> Result<()> { + state.workflow_state = Some(encode_workflow_state(workflow_state)?); + Ok(()) +} + +pub fn get_instruction(state: &SessionState, agent: &str, session_dir: &Path) -> Option { + let workflow_state = read_workflow_state(state).ok()?; + + match state.phase { + SessionPhase::Proposal => { + let done_key = "proposal_done"; + let agent_state = state.agents.get(agent)?; + if agent_state.phase_status == done_key { + None + } else { + Some(proposal_instruction( + &session_dir + .join("sources") + .join(format!("{agent}-proposal.md")) + .display() + .to_string(), + &state + .topic + .clone() + .unwrap_or_else(|| "(no topic provided)".to_string()), + &state.name, + agent, + )) + } + } + SessionPhase::Analysis => { + let done_key = "analysis_done"; + let agent_state = state.agents.get(agent)?; + if agent_state.phase_status == done_key { + None + } else { + let proposal_files = list_source_proposals(session_dir); + let proposal_lines = if proposal_files.is_empty() { + format!( + "- (none found yet) in {}", + session_dir.join("sources").display() + ) + } else { + proposal_files + .iter() + .map(|p| format!("- {}", p.display())) + .collect::>() + .join("\n") + }; + Some(analysis_instruction( + &proposal_lines, + &session_dir + .join("analysis") + .join(format!("{agent}.md")) + .display() + .to_string(), + &state.name, + agent, + )) + } + } + SessionPhase::Negotiation => { + let turn = workflow_state.turn.as_ref()?; + if turn.holder != agent { + return None; + } + + let open_issues = workflow_state + .issues + .iter() + .filter(|i| i.status == IssueStatus::Open) + .collect::>(); + if open_issues.is_empty() { + return Some("No open issues remain; waiting for phase transition.".to_string()); + } + + let mut lines = Vec::new(); + lines.push(format!( + "It is your turn in negotiation (round {}).", + turn.round + )); + lines.push("Open issues:".to_string()); + + for issue in open_issues { + let positions = join_or_none(issue.positions.iter().cloned().collect()); + let challenges = join_or_none(issue.challenges.iter().cloned().collect()); + let can_agree = issue.challenges.iter().any(|a| a != agent) + && issue.positions.iter().any(|a| a != agent); + + let hint = if !issue.positions.contains(agent) { + "You haven't written a position yet" + } else if can_agree { + "You may agree - challenge exists from another agent" + } else if !issue.challenges.contains(agent) { + "Consider challenging another position if needed" + } else { + "Update or refine your position/challenge" + }; + + lines.push(format!( + "- #{:02} {} ({}) | positions: [{}] | challenges: [{}] | hint: {}", + issue.id, + issue.file, + format_status(issue), + positions, + challenges, + hint + )); + lines.push(format!( + " Position file: {}", + position_path(session_dir, issue, agent).display() + )); + } + + lines.push(format!( + "When done for this turn, run: rally done --session {} --as {}", + state.name, agent + )); + Some(lines.join("\n")) + } + SessionPhase::FinalizationWrite => { + if workflow_state.finalization.writer.as_deref() == Some(agent) + || workflow_state.finalization.writer.is_none() + { + Some(finalization_write_instruction( + &session_dir.join("coverage-audit.md").display().to_string(), + &session_dir.join("final.md").display().to_string(), + &state.name, + agent, + )) + } else { + None + } + } + SessionPhase::FinalizationReview => { + if workflow_state.finalization.writer.as_deref() == Some(agent) { + None + } else { + Some(finalization_review_instruction( + &session_dir.join("coverage-audit.md").display().to_string(), + &session_dir.join("final.md").display().to_string(), + &state.name, + agent, + )) + } + } + _ => None, + } +} + +pub fn get_wait_hint(state: &SessionState, agent: &str) -> Option { + let workflow_state = read_workflow_state(state).ok()?; + match state.phase { + SessionPhase::FinalizationReview => { + if workflow_state.finalization.writer.as_deref() == Some(agent) { + Some(format!( + "Waiting for another agent to review final.md in session '{}'.", + state.name + )) + } else { + None + } + } + _ => None, + } +} + +pub fn prepare_instruction(state: &mut SessionState, agent: &str) -> Result { + let mut workflow_state = read_workflow_state(state)?; + if state.phase == SessionPhase::FinalizationWrite + && workflow_state.finalization.writer.is_none() + { + workflow_state.finalization.writer = Some(agent.to_string()); + write_workflow_state(state, &workflow_state)?; + return Ok(true); + } + Ok(false) +} + +pub fn file_issue( + state: &mut SessionState, + session_dir: &Path, + agent: &str, + title: &str, + question: Option<&str>, + context: Option<&str>, +) -> Result { + let mut workflow_state = read_workflow_state(state)?; + + if state.phase != SessionPhase::Analysis && state.phase != SessionPhase::FinalizationReview { + bail!("file-issue is only valid during analysis or finalization review"); + } + + let next_id = workflow_state + .issues + .iter() + .map(|i| i.id) + .max() + .unwrap_or(0) + + 1; + let slug = slugify(title); + let file_name = format!("{:02}-{slug}.md", next_id); + let issue_path = session_dir.join("issues").join(&file_name); + + let question = question.unwrap_or(""); + let context = context.unwrap_or(""); + let template = format!( + "# Issue {:02}: {}\n\nTitle: {}\nQuestion: {}\nContext: {}\n\n## Position A\n\n## Position B\n", + next_id, title, title, question, context + ); + + fs::write(&issue_path, template)?; + + workflow_state.issues.push(IssueState { + id: next_id, + slug, + file: file_name, + title: title.to_string(), + author: agent.to_string(), + status: IssueStatus::Open, + positions: Default::default(), + challenges: Default::default(), + agreed_by: None, + created_at: now(), + }); + + if state.phase == SessionPhase::FinalizationReview { + workflow_state.finalization.reopened_for_issues = true; + state.phase = SessionPhase::Negotiation; + for agent_state in state.agents.values_mut() { + agent_state.rounds = 0; + } + initialize_turn(state, &mut workflow_state)?; + } + + write_workflow_state(state, &workflow_state)?; + Ok(issue_path.display().to_string()) +} + +pub fn challenge_issue( + state: &mut SessionState, + session_dir: &Path, + agent: &str, + issue_id: u32, +) -> Result { + let mut workflow_state = read_workflow_state(state)?; + ensure_turn_holder(state, &workflow_state, agent)?; + refresh_negotiation_state_in_place(state, &mut workflow_state, session_dir)?; + + let issue = workflow_state + .issues + .iter_mut() + .find(|i| i.id == issue_id) + .ok_or_else(|| anyhow!("issue {:02} not found", issue_id))?; + + if issue.status != IssueStatus::Open { + bail!("issue {:02} is not open", issue_id); + } + + let current_issue_id = issue.id; + let position = position_path(session_dir, issue, agent); + if let Some(parent) = position.parent() { + fs::create_dir_all(parent)?; + } + + if !position.exists() { + fs::write( + &position, + format!("# Position for issue {:02}\n\n", current_issue_id), + )?; + } + + let marker = format!( + "\n## Challenge {}\n- Add your challenge details here.\n", + now().to_rfc3339() + ); + append_text(&position, &marker)?; + + issue.positions.insert(agent.to_string()); + issue.challenges.insert(agent.to_string()); + + let position_display = position.display().to_string(); + write_workflow_state(state, &workflow_state)?; + Ok(format!( + "challenge recorded for issue {:02} at {}", + current_issue_id, position_display + )) +} + +pub fn agree_issue( + state: &mut SessionState, + session_dir: &Path, + agent: &str, + issue_id: u32, +) -> Result { + let mut workflow_state = read_workflow_state(state)?; + ensure_turn_holder(state, &workflow_state, agent)?; + refresh_negotiation_state_in_place(state, &mut workflow_state, session_dir)?; + + let issue = workflow_state + .issues + .iter_mut() + .find(|i| i.id == issue_id) + .ok_or_else(|| anyhow!("issue {:02} not found", issue_id))?; + + if issue.status != IssueStatus::Open { + bail!("issue {:02} is not open", issue_id); + } + + if !issue.positions.iter().any(|a| a != agent) { + bail!( + "cannot agree issue {:02}: only your own positions exist", + issue_id + ); + } + + if !issue.challenges.iter().any(|a| a != agent) { + bail!( + "cannot agree issue {:02}: no challenge from another agent exists", + issue_id + ); + } + + issue.status = IssueStatus::Agreed; + issue.agreed_by = Some(agent.to_string()); + + let mut msg = format!("issue {:02} marked AGREED by {}", issue_id, agent); + if maybe_resolve_negotiation(state, &mut workflow_state) { + msg.push_str("; all issues resolved, advanced to finalization"); + } + + write_workflow_state(state, &workflow_state)?; + Ok(msg) +} + +pub fn mark_done(state: &mut SessionState, agent: &str, session_dir: &Path) -> Result { + if !state.agents.contains_key(agent) { + bail!("agent '{}' is not joined", agent); + } + + let mut workflow_state = read_workflow_state(state)?; + let result = match state.phase { + SessionPhase::Proposal => { + let proposal_path = session_dir + .join("sources") + .join(format!("{agent}-proposal.md")); + if !proposal_path.exists() { + bail!( + "missing proposal file {}; write it before running done", + proposal_path.display() + ); + } + + if let Some(agent_state) = state.agents.get_mut(agent) { + agent_state.phase_status = "proposal_done".to_string(); + } + if state + .agents + .values() + .all(|a| a.phase_status == "proposal_done") + { + for a in state.agents.values_mut() { + a.phase_status = "analysis_pending".to_string(); + } + state.phase = SessionPhase::Analysis; + "proposal phase complete; advanced to analysis".to_string() + } else { + "marked proposal done".to_string() + } + } + SessionPhase::Analysis => { + let analysis_path = session_dir.join("analysis").join(format!("{agent}.md")); + if !analysis_path.exists() { + bail!( + "missing analysis file {}; write it before running done", + analysis_path.display() + ); + } + + if let Some(agent_state) = state.agents.get_mut(agent) { + agent_state.phase_status = "analysis_done".to_string(); + } + if state + .agents + .values() + .all(|a| a.phase_status == "analysis_done") + { + renumber_issues_interleaved(&mut workflow_state, session_dir)?; + state.phase = SessionPhase::Negotiation; + initialize_turn(state, &mut workflow_state)?; + "analysis phase complete; issues normalized; advanced to negotiation".to_string() + } else { + "marked analysis done".to_string() + } + } + SessionPhase::Negotiation => { + ensure_turn_holder(state, &workflow_state, agent)?; + refresh_negotiation_state_in_place(state, &mut workflow_state, session_dir)?; + if state.phase != SessionPhase::Negotiation { + "issues already resolved; advanced phase".to_string() + } else { + advance_turn(state, &mut workflow_state)?; + if state.phase == SessionPhase::FinalizationWrite { + "max rounds reached or all issues resolved; advanced to finalization" + .to_string() + } else { + "advanced negotiation turn".to_string() + } + } + } + SessionPhase::FinalizationWrite => { + if workflow_state.finalization.writer.as_deref() != Some(agent) { + bail!( + "only '{}' can complete finalization write", + workflow_state + .finalization + .writer + .as_deref() + .unwrap_or("") + ); + } + let coverage = session_dir.join("coverage-audit.md"); + let final_doc = session_dir.join("final.md"); + if !coverage.exists() { + bail!("missing {}", coverage.display()); + } + if !final_doc.exists() { + bail!("missing {}", final_doc.display()); + } + parse_steps_from_todo(&final_doc).map_err(|err| { + anyhow!( + "final.md must be write-todo format (## Spec + ## Plan with numbered steps): {err}" + ) + })?; + if state.config.expected_agents <= 1 || state.agents.len() <= 1 { + finalize_and_complete( + state, + &mut workflow_state, + session_dir, + agent, + "single-agent", + )? + } else { + state.phase = SessionPhase::FinalizationReview; + "finalization write completed; advanced to review".to_string() + } + } + SessionPhase::FinalizationReview => { + if workflow_state.finalization.writer.as_deref() == Some(agent) { + if state.config.expected_agents <= 1 || state.agents.len() <= 1 { + finalize_and_complete( + state, + &mut workflow_state, + session_dir, + agent, + "single-agent", + )? + } else { + bail!("the final writer cannot self-approve"); + } + } else { + finalize_and_complete( + state, + &mut workflow_state, + session_dir, + agent, + "reviewer-approved", + )? + } + } + SessionPhase::Done => "session already complete".to_string(), + _ => bail!("done is not valid during {:?}", state.phase), + }; + + write_workflow_state(state, &workflow_state)?; + Ok(result) +} + +fn finalize_and_complete( + state: &mut SessionState, + workflow_state: &mut PlanWorkflowState, + session_dir: &Path, + approver: &str, + mode: &str, +) -> Result { + let final_doc = session_dir.join("final.md"); + if !final_doc.exists() { + bail!("missing {}", final_doc.display()); + } + let todo_doc = session_dir.join("todo.md"); + fs::copy(&final_doc, &todo_doc)?; + workflow_state.finalization.reviewer = Some(approver.to_string()); + workflow_state.finalization.todo_ready = true; + state.phase = SessionPhase::Done; + Ok(format!( + "finalization ({mode}); wrote {} and completed session", + todo_doc.display() + )) +} + +pub fn enforce_turn_timeout(state: &mut SessionState) -> Result { + let mut workflow_state = read_workflow_state(state)?; + if state.phase != SessionPhase::Negotiation { + return Ok(false); + } + + let Some(turn) = workflow_state.turn.as_ref() else { + initialize_turn(state, &mut workflow_state)?; + write_workflow_state(state, &workflow_state)?; + return Ok(true); + }; + + let elapsed = now().signed_duration_since(turn.started_at); + if elapsed >= Duration::seconds(state.config.turn_timeout_secs as i64) { + advance_turn(state, &mut workflow_state)?; + write_workflow_state(state, &workflow_state)?; + return Ok(true); + } + + Ok(false) +} + +pub fn refresh_negotiation_state(state: &mut SessionState, session_dir: &Path) -> Result { + let mut workflow_state = read_workflow_state(state)?; + let changed = refresh_negotiation_state_in_place(state, &mut workflow_state, session_dir)?; + if changed { + write_workflow_state(state, &workflow_state)?; + } + Ok(changed) +} + +fn refresh_negotiation_state_in_place( + state: &mut SessionState, + workflow_state: &mut PlanWorkflowState, + session_dir: &Path, +) -> Result { + if state.phase != SessionPhase::Negotiation { + return Ok(false); + } + + let mut changed = false; + for issue in &mut workflow_state.issues { + let dir = issue_dir_path(session_dir, issue.id, &issue.slug); + if !dir.exists() { + continue; + } + + for entry in fs::read_dir(&dir)? { + let entry = entry?; + if !entry.file_type()?.is_file() { + continue; + } + + let path = entry.path(); + if path.extension().and_then(|s| s.to_str()) != Some("md") { + continue; + } + + let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else { + continue; + }; + + if issue.positions.insert(stem.to_string()) { + changed = true; + } + + let content = fs::read_to_string(&path).unwrap_or_default(); + if content.contains("## Challenge") && issue.challenges.insert(stem.to_string()) { + changed = true; + } + } + } + + if maybe_resolve_negotiation(state, workflow_state) { + changed = true; + } + + Ok(changed) +} + +pub fn initialize_turn(state: &SessionState, workflow_state: &mut PlanWorkflowState) -> Result<()> { + let mut agents = state.agents.keys().cloned().collect::>(); + agents.sort(); + + let first = agents + .first() + .ok_or_else(|| anyhow!("cannot initialize turn with no agents"))? + .clone(); + + workflow_state.turn = Some(TurnState { + holder: first, + started_at: now(), + round: 1, + }); + + Ok(()) +} + +pub fn advance_turn( + state: &mut SessionState, + workflow_state: &mut PlanWorkflowState, +) -> Result<()> { + let mut agents = state.agents.keys().cloned().collect::>(); + agents.sort(); + + if agents.is_empty() { + bail!("cannot advance turn with no agents"); + } + + let current_holder = workflow_state + .turn + .as_ref() + .map(|t| t.holder.clone()) + .unwrap_or_else(|| agents[0].clone()); + + if let Some(agent_state) = state.agents.get_mut(¤t_holder) { + agent_state.rounds += 1; + } + + if maybe_resolve_negotiation(state, workflow_state) { + workflow_state.turn = None; + return Ok(()); + } + + let idx = agents + .iter() + .position(|a| *a == current_holder) + .unwrap_or(0usize); + let next = agents[(idx + 1) % agents.len()].clone(); + + let round = workflow_state + .turn + .as_ref() + .map(|t| t.round + 1) + .unwrap_or(1); + workflow_state.turn = Some(TurnState { + holder: next, + started_at: now(), + round, + }); + + Ok(()) +} + +fn maybe_resolve_negotiation( + state: &mut SessionState, + workflow_state: &mut PlanWorkflowState, +) -> bool { + if state.phase != SessionPhase::Negotiation { + return false; + } + + let mut changed = false; + let open_count = workflow_state + .issues + .iter() + .filter(|i| i.status == IssueStatus::Open) + .count(); + + if open_count > 0 + && !state.agents.is_empty() + && state + .agents + .values() + .all(|a| a.rounds >= state.config.max_rounds) + { + for issue in &mut workflow_state.issues { + if issue.status == IssueStatus::Open { + issue.status = IssueStatus::Escalated; + changed = true; + } + } + } + + if workflow_state + .issues + .iter() + .all(|i| i.status != IssueStatus::Open) + { + state.phase = SessionPhase::FinalizationWrite; + workflow_state.turn = None; + changed = true; + } + + changed +} + +fn renumber_issues_interleaved( + workflow_state: &mut PlanWorkflowState, + session_dir: &Path, +) -> Result<()> { + let mut by_author: BTreeMap> = BTreeMap::new(); + for issue in workflow_state.issues.drain(..) { + by_author + .entry(issue.author.clone()) + .or_default() + .push_back(issue); + } + + let mut interleaved = Vec::new(); + loop { + let mut progressed = false; + let authors = by_author.keys().cloned().collect::>(); + for author in authors { + let queue = by_author.get_mut(&author).expect("author key must exist"); + if let Some(issue) = queue.pop_front() { + interleaved.push(issue); + progressed = true; + } + } + if !progressed { + break; + } + } + + for (idx, issue) in interleaved.iter_mut().enumerate() { + let new_id = (idx + 1) as u32; + let new_file = format!("{:02}-{}.md", new_id, issue.slug); + + let old_path = session_dir.join("issues").join(&issue.file); + let new_path = session_dir.join("issues").join(&new_file); + if old_path != new_path && old_path.exists() { + fs::rename(old_path, &new_path)?; + } + + issue.id = new_id; + issue.file = new_file; + } + + workflow_state.issues = interleaved; + Ok(()) +} + +fn ensure_turn_holder( + state: &SessionState, + workflow_state: &PlanWorkflowState, + agent: &str, +) -> Result<()> { + if state.phase != SessionPhase::Negotiation { + bail!("command is only valid during negotiation phase") + } + + let turn = workflow_state + .turn + .as_ref() + .ok_or_else(|| anyhow!("turn state is not initialized"))?; + if turn.holder != agent { + bail!("it is currently '{}' turn", turn.holder) + } + + Ok(()) +} + +fn position_path(session_dir: &Path, issue: &IssueState, agent: &str) -> PathBuf { + issue_dir_path(session_dir, issue.id, &issue.slug).join(format!("{agent}.md")) +} + +fn issue_dir_path(session_dir: &Path, issue_id: u32, slug: &str) -> PathBuf { + session_dir + .join("positions") + .join(format!("{:02}-{slug}", issue_id)) +} + +fn append_text(path: &Path, text: &str) -> Result<()> { + let existing = fs::read_to_string(path).unwrap_or_default(); + fs::write(path, format!("{existing}{text}"))?; + Ok(()) +} + +fn format_status(issue: &IssueState) -> &'static str { + match issue.status { + IssueStatus::Open => "OPEN", + IssueStatus::Agreed => "AGREED", + IssueStatus::Escalated => "ESCALATED", + } +} + +fn join_or_none(mut items: Vec) -> String { + if items.is_empty() { + return "none".to_string(); + } + items.sort(); + items.join(", ") +} + +fn list_source_proposals(session_dir: &Path) -> Vec { + let sources_dir = session_dir.join("sources"); + let Ok(entries) = fs::read_dir(&sources_dir) else { + return Vec::new(); + }; + + let mut files = entries + .filter_map(|entry| entry.ok()) + .map(|entry| entry.path()) + .filter(|path| path.is_file()) + .collect::>(); + files.sort(); + files +} + +fn slugify(input: &str) -> String { + let mut out = String::with_capacity(input.len()); + let mut last_dash = false; + + for ch in input.chars() { + let c = ch.to_ascii_lowercase(); + if c.is_ascii_alphanumeric() { + out.push(c); + last_dash = false; + } else if !last_dash { + out.push('-'); + last_dash = true; + } + } + + let trimmed = out.trim_matches('-').to_string(); + if trimmed.is_empty() { + "issue".to_string() + } else { + trimmed + } +} + +pub trait PlanPolicy { + fn id(&self) -> &'static str; + + fn allow_registration(&self, _state: &SessionState) -> Result<()> { + Ok(()) + } + + fn merge_issue_title(&self, _state: &SessionState, title: &str) -> Result { + Ok(title.to_string()) + } + + fn allow_agree(&self, _state: &SessionState, _issue_id: u32, _agent: &str) -> Result<()> { + Ok(()) + } + + fn allow_finalize(&self, _state: &SessionState, _agent: &str) -> Result<()> { + Ok(()) + } +} + +#[derive(Clone, Copy, Debug, Default)] +pub struct DefaultPlanPolicy; + +impl PlanPolicy for DefaultPlanPolicy { + fn id(&self) -> &'static str { + "default" + } +} + +#[derive(Clone, Copy, Debug, Default)] +pub struct StrictNegotiationPolicy; + +impl PlanPolicy for StrictNegotiationPolicy { + fn id(&self) -> &'static str { + "strict-negotiate" + } + + fn allow_registration(&self, state: &SessionState) -> Result<()> { + if state.config.expected_agents < 3 { + bail!( + "strict negotiate policy requires at least 3 agents (got {})", + state.config.expected_agents + ); + } + Ok(()) + } + + fn merge_issue_title(&self, state: &SessionState, title: &str) -> Result { + let normalized = normalize_issue_key(title); + let workflow_state = read_workflow_state(state)?; + if workflow_state + .issues + .iter() + .any(|issue| normalize_issue_key(&issue.title) == normalized) + { + bail!( + "strict negotiate policy rejected duplicate issue title '{}'", + title + ); + } + Ok(title.to_string()) + } + + fn allow_agree(&self, state: &SessionState, issue_id: u32, agent: &str) -> Result<()> { + let workflow_state = read_workflow_state(state)?; + let issue = workflow_state + .issues + .iter() + .find(|issue| issue.id == issue_id) + .ok_or_else(|| anyhow!("issue {:02} not found", issue_id))?; + let cross_agent_challenges = issue + .challenges + .iter() + .filter(|name| *name != agent) + .count(); + if cross_agent_challenges < 2 { + bail!( + "strict negotiate policy requires at least two cross-agent challenges before agree" + ); + } + Ok(()) + } + + fn allow_finalize(&self, state: &SessionState, _agent: &str) -> Result<()> { + let workflow_state = read_workflow_state(state)?; + let agreed = workflow_state + .issues + .iter() + .filter(|issue| issue.status == IssueStatus::Agreed) + .count(); + if agreed == 0 { + bail!("strict negotiate policy requires at least one AGREED issue before finalization"); + } + Ok(()) + } +} + +pub struct PlanEngine { + policy: P, +} + +impl PlanEngine

{ + pub fn new(policy: P) -> Self { + Self { policy } + } + + pub fn join(&self, state: &mut SessionState) -> Result<()> { + self.policy.allow_registration(state)?; + if state.phase == SessionPhase::Registration + && state.agents.len() as u32 >= state.config.expected_agents + { + state.phase = SessionPhase::Proposal; + } + Ok(()) + } + + pub fn poll_next( + &self, + state: &mut SessionState, + session_dir: &Path, + agent: &str, + ) -> Result { + refresh_negotiation_state(state, session_dir)?; + enforce_turn_timeout(state)?; + let _ = prepare_instruction(state, agent)?; + + if let Some(instruction) = get_instruction(state, agent, session_dir) { + return Ok(PlanPollDispatch::Instruction { body: instruction }); + } + + if state.phase == SessionPhase::Done { + return Ok(PlanPollDispatch::Complete { + message: "session already complete".to_string(), + }); + } + + Ok(PlanPollDispatch::Wait { + hint: get_wait_hint(state, agent), + }) + } + + pub fn done( + &self, + state: &mut SessionState, + session_dir: &Path, + agent: &str, + ) -> Result { + if matches!( + state.phase, + SessionPhase::FinalizationWrite | SessionPhase::FinalizationReview + ) { + self.policy.allow_finalize(state, agent)?; + } + let message = mark_done(state, agent, session_dir)?; + Ok(PlanDoneDispatch { + message, + next_phase: Some(state.phase.clone()), + }) + } + + pub fn action( + &self, + state: &mut SessionState, + session_dir: &Path, + agent: &str, + name: &str, + args: &Value, + ) -> Result { + let message = match name { + "file-issue" => { + let title = required_string(args, "title", name)?; + let title = self.policy.merge_issue_title(state, &title)?; + let question = optional_string(args, "question"); + let context = optional_string(args, "context"); + let path = file_issue( + state, + session_dir, + agent, + &title, + question.as_deref(), + context.as_deref(), + )?; + format!("filed issue at {path}") + } + "challenge" => { + let issue = required_u32(args, "issue", name)?; + challenge_issue(state, session_dir, agent, issue)? + } + "agree" => { + let issue = required_u32(args, "issue", name)?; + self.policy.allow_agree(state, issue, agent)?; + agree_issue(state, session_dir, agent, issue)? + } + _ => { + bail!( + "unknown action '{}' for plan engine policy '{}'; supported actions: file-issue, challenge, agree", + name, + self.policy.id() + ) + } + }; + + Ok(message) + } + + pub fn status(&self, state: &SessionState) -> Result> { + let workflow_state = read_workflow_state(state)?; + let mut lines = Vec::new(); + + if let Some(turn) = &workflow_state.turn { + lines.push(format!( + "Turn: holder={} round={} started={}", + turn.holder, + turn.round, + turn.started_at.to_rfc3339() + )); + } else { + lines.push("Turn: none".to_string()); + } + + let open = workflow_state + .issues + .iter() + .filter(|i| i.status == IssueStatus::Open) + .count(); + let agreed = workflow_state + .issues + .iter() + .filter(|i| i.status == IssueStatus::Agreed) + .count(); + let escalated = workflow_state + .issues + .iter() + .filter(|i| i.status == IssueStatus::Escalated) + .count(); + lines.push(format!( + "Issue summary: {} open, {} agreed, {} escalated", + open, agreed, escalated + )); + + Ok(lines) + } +} + +pub fn proposal_instruction( + proposal_path: &str, + topic: &str, + session: &str, + agent: &str, +) -> String { + format!( + "Write your proposal to: {proposal_path}\nSession topic: {topic}\nAfter writing it, run: rally done --session {session} --as {agent}" + ) +} + +pub fn analysis_instruction( + proposals: &str, + analysis_path: &str, + session: &str, + agent: &str, +) -> String { + format!( + "Read proposal files:\n{proposals}\nWrite divergence analysis to {analysis_path}.\nFile issues with: rally plan file-issue --session {session} --as {agent} --title \"...\"\nWhen complete, run: rally done --session {session} --as {agent}" + ) +} + +pub fn finalization_write_instruction( + coverage_path: &str, + final_path: &str, + session: &str, + agent: &str, +) -> String { + format!( + "Write final artifacts: {coverage_path} and {final_path}.\n`final.md` must use write-todo format:\n- include `## Spec`\n- include `## Plan`\n- use numbered steps (`1.`, `2.`, ...)\n- include acceptance criteria per step\n- include a final manual QA step\nThen run: rally done --session {session} --as {agent}" + ) +} + +pub fn finalization_review_instruction( + coverage_path: &str, + final_path: &str, + session: &str, + agent: &str, +) -> String { + format!( + "Review {coverage_path} and {final_path}. If approved, run rally done --session {session} --as {agent}. If rejected, file issues with rally plan file-issue, which returns the session to negotiation." + ) +} + +fn required_string(args: &Value, key: &str, action: &str) -> Result { + args.get(key) + .and_then(Value::as_str) + .map(ToString::to_string) + .ok_or_else(|| anyhow!("action '{}' requires string field '{}'", action, key)) +} + +fn optional_string(args: &Value, key: &str) -> Option { + args.get(key) + .and_then(Value::as_str) + .map(ToString::to_string) +} + +fn required_u32(args: &Value, key: &str, action: &str) -> Result { + let value = args + .get(key) + .and_then(Value::as_u64) + .ok_or_else(|| anyhow!("action '{}' requires numeric field '{}'", action, key))?; + u32::try_from(value).map_err(|_| { + anyhow!( + "action '{}' field '{}' is out of range for u32", + action, + key + ) + }) +} + +fn normalize_issue_key(input: &str) -> String { + let mut out = String::with_capacity(input.len()); + let mut last_dash = false; + + for ch in input.chars() { + let c = ch.to_ascii_lowercase(); + if c.is_ascii_alphanumeric() { + out.push(c); + last_dash = false; + } else if !last_dash { + out.push('-'); + last_dash = true; + } + } + + let trimmed = out.trim_matches('-').to_string(); + if trimmed.is_empty() { + "issue".to_string() + } else { + trimmed + } +} diff --git a/docs/manual-qa-workflow-composition.md b/docs/manual-qa-workflow-composition.md new file mode 100644 index 0000000..856c8ef --- /dev/null +++ b/docs/manual-qa-workflow-composition.md @@ -0,0 +1,50 @@ +# Manual QA: Workflow Composition + +Run from repository root with the explicit wired binary: + +`/Users/justin/code/rally/target/debug/rally` + +## 1. Built-in Plan/Build Sanity + +1. Create and run a plan session: +`/Users/justin/code/rally/target/debug/rally create plan --name qa-plan --agents 2 --topic "qa workflow composition"` +`/Users/justin/code/rally/target/debug/rally join --session qa-plan --as planner-a` +`/Users/justin/code/rally/target/debug/rally join --session qa-plan --as planner-b` + +2. Create proposals/analysis files per `next`, file at least one issue, and drive to completion. +3. Confirm `qa-plan` reaches `Done` and writes `~/.rally/sessions/qa-plan/todo.md`. + +4. Chain into implement flow (or create directly): +`/Users/justin/code/rally/target/debug/rally chain --plan qa-plan --implement qa-build` + +5. Run implementer/reviewer loop and confirm checkpoint/review transitions to `Done`. + +## 2. Composed Workflow Policy Reuse + +1. Create composed negotiate-like workflow session: +`/Users/justin/code/rally/target/debug/rally create workflow --name qa-negotiate --workflow demo/negotiate --agents 3 --topic "qa composed"` + +2. Join agents: +`/Users/justin/code/rally/target/debug/rally join --session qa-negotiate --as agent-a` +`/Users/justin/code/rally/target/debug/rally join --session qa-negotiate --as agent-b` +`/Users/justin/code/rally/target/debug/rally join --session qa-negotiate --as agent-c` + +3. Validate strict policy behavior: +- Duplicate issue title is rejected. +- Agree is rejected until cross-agent challenge threshold is met. +- Finalization is rejected unless there is at least one AGREED issue. + +4. Validate prompt reuse: +- Proposal/analysis/finalization prompts match shared plan templates in wording. + +## 3. Delegation and State Integrity + +1. Run actions that exercise interop delegation paths. +2. Trigger a delegated failure case and confirm persisted parent state does not partially mutate. +3. Confirm child state remains namespaced under: +`workflow_state._children.` + +## 4. Cleanup + +Remove QA sessions when done: +`rm -rf ~/.rally/sessions/qa-plan ~/.rally/sessions/qa-build ~/.rally/sessions/qa-negotiate` diff --git a/docs/workflow-composition-api.md b/docs/workflow-composition-api.md new file mode 100644 index 0000000..fadaa52 --- /dev/null +++ b/docs/workflow-composition-api.md @@ -0,0 +1,112 @@ +# Workflow Composition and Interop API + +This document explains how to build workflow crates that keep private typed state while interoperating through Rally core. + +## Ownership Boundaries + +Core owns: +- Session envelope and host metadata (`name`, `session_type`, `phase`, `config`, agents, timing) +- Locking, persistence, and write atomicity (`SessionHandle` save/load) +- Workflow dispatch contract (`on_create`, `on_join`, `poll_next`, `on_done`, `on_action`, `format_status`) + +Workflow crates own: +- Typed workflow state schema and validation +- Workflow behavior and transitions +- Prompt/instruction rendering and policy rules +- Composition/delegation semantics + +Core never interprets plan/build/negotiate-specific fields directly. Workflow-specific state is serialized as opaque JSON in `SessionState.workflow_state`. + +## Authoring a Workflow Crate with Private Typed State + +Create a workflow crate that defines its own state model and codec: + +```rust +use anyhow::{Context, Result}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct MyWorkflowState { + pub version: u32, + pub task_count: u32, +} + +pub fn parse_workflow_state(raw: Option) -> Result { + match raw { + Some(value) => serde_json::from_value(value).context("invalid my workflow_state"), + None => Ok(MyWorkflowState { + version: 1, + task_count: 0, + }), + } +} + +pub fn encode_workflow_state(state: &MyWorkflowState) -> Result { + Ok(serde_json::to_value(state)?) +} +``` + +In the workflow implementation, read/write `SessionState.workflow_state` through your crate codec instead of adding fields to core state structs. + +## Composition Example (Shared Engine + Policy Override) + +`PlanEngine` centralizes plan behavior and accepts policy overrides: + +- `DefaultPlanPolicy` keeps built-in plan behavior. +- `StrictNegotiationPolicy` overrides: + - registration constraints + - issue merge rules (duplicate title rejection) + - agreement gates (minimum cross-agent challenges) + - finalization checks (requires at least one agreed issue) + +The composed demo workflow `demo/negotiate` reuses the shared plan engine: + +```rust +let engine = rally::plan_engine::PlanEngine::new(rally::plan_engine::StrictNegotiationPolicy); +engine.poll_next(state, session_dir, agent)?; +engine.action(state, session_dir, agent, "file-issue", args)?; +engine.done(state, session_dir, agent)?; +``` + +Prompt text is reused from shared builders in `plan_engine`: +- `proposal_instruction(...)` +- `analysis_instruction(...)` +- `finalization_write_instruction(...)` +- `finalization_review_instruction(...)` + +## Delegation Example (Parent -> Child with Isolated Namespace) + +Use `workflow::interop` to delegate to child workflow logic with isolated child state: + +```rust +use rally::workflow::interop::{delegate_action, DelegationTarget}; + +let target = DelegationTarget { + workflow_id: "builtin/plan", + namespace: "child_plan", +}; + +let dispatch = delegate_action( + parent_state, + session_dir, + registry, + target, + agent, + "file-issue", + &serde_json::json!({"title":"Delegated issue"}), +)?; +``` + +Behavioral guarantees: +- Child state is read/written under `workflow_state._children.`. +- Parent state is only updated after successful child dispatch. +- On delegation failure, parent persisted state remains unchanged. + +## Recommended Pattern + +1. Keep all workflow-specific schema in a workflow crate. +2. Expose parse/encode helpers over `Option` / `Value`. +3. Route workflow behavior through a shared engine with policy hooks. +4. Use `interop` delegation only for explicit parent/child contracts. +5. Add regression tests for prompt stability and rollback safety. diff --git a/justfile b/justfile new file mode 100644 index 0000000..6316c45 --- /dev/null +++ b/justfile @@ -0,0 +1,9 @@ +set shell := ["bash", "-eu", "-o", "pipefail", "-c"] + +[private] +nix-check: + @test -n "${IN_NIX_SHELL:-}" || (echo "Run 'nix develop' first" && exit 1) + +pre-merge: nix-check + cargo test --workspace --all-targets + @echo "pre-merge complete" diff --git a/src/bin/custom-workflow-example.rs b/src/bin/custom-workflow-example.rs deleted file mode 100644 index c43ed30..0000000 --- a/src/bin/custom-workflow-example.rs +++ /dev/null @@ -1,105 +0,0 @@ -use anyhow::Result; -use rally::{ - WorkflowRegistry, run_with_registry, - workflow::{ - self, DoneContext, DoneDispatch, JoinContext, JoinDispatch, NextPollContext, - NextPollDispatch, StatusContext, StatusDispatch, - }, -}; -use serde_json::json; - -struct ExampleWorkflow; - -impl rally::Workflow for ExampleWorkflow { - fn id(&self) -> &str { - "example/mock-buildtime" - } - - fn supported_actions(&self) -> &'static [&'static str] { - &["ack"] - } - - fn on_join(&self, _ctx: &mut JoinContext<'_>) -> Result { - Ok(JoinDispatch::default()) - } - - fn poll_next(&self, ctx: &mut NextPollContext<'_>) -> Result { - let acked = ctx - .host - .state - .workflow_state - .as_ref() - .and_then(|v| v.get("acked")) - .and_then(|v| v.as_bool()) - .unwrap_or(false); - let body = if acked { - "Acknowledged. Run rally done --session --as ." - } else { - "Run rally workflow action --session --as --name ack." - }; - Ok(NextPollDispatch::Instruction { - body: body.to_string(), - }) - } - - fn on_done(&self, ctx: &mut DoneContext<'_>) -> Result { - let acked = ctx - .host - .state - .workflow_state - .as_ref() - .and_then(|v| v.get("acked")) - .and_then(|v| v.as_bool()) - .unwrap_or(false); - if !acked { - anyhow::bail!("run workflow action ack before done"); - } - ctx.host.state.phase = rally::state::SessionPhase::Done; - Ok(DoneDispatch { - message: "custom workflow completed".to_string(), - next_phase: Some(rally::state::SessionPhase::Done), - }) - } - - fn on_action(&self, ctx: &mut rally::workflow::ActionContext<'_>) -> Result { - match ctx.name { - "ack" => { - ctx.host.state.workflow_state = Some(json!({"acked": true})); - Ok(rally::workflow::ActionDispatch { - message: "ack recorded".to_string(), - }) - } - _ => anyhow::bail!( - "unknown action '{}' for workflow '{}'; supported actions: ack", - ctx.name, - self.id() - ), - } - } - - fn format_status(&self, _ctx: &StatusContext<'_>) -> Result { - Ok(StatusDispatch { - lines: vec!["Custom workflow example is active (action: ack).".to_string()], - }) - } -} - -fn build_registry() -> Result { - let mut registry = workflow::default_registry()?; - registry.register(ExampleWorkflow)?; - Ok(registry) -} - -fn main() { - let code = match (|| -> Result { - let registry = build_registry()?; - run_with_registry(registry) - })() { - Ok(code) => code, - Err(err) => { - eprintln!("error: {err:#}"); - 3 - } - }; - std::process::exit(code); -} diff --git a/src/commands/mod.rs b/src/commands.rs similarity index 78% rename from src/commands/mod.rs rename to src/commands.rs index 8a7ab88..8cc4606 100644 --- a/src/commands/mod.rs +++ b/src/commands.rs @@ -1,6 +1,8 @@ use std::{env, fs, path::Path, process::Command as ProcessCommand, thread, time::Duration}; use anyhow::{Context, Result, bail}; +use rally_workflow_build as implement; +use rally_workflow_plan as plan; use serde_json::{Value, json}; use crate::{ @@ -9,25 +11,17 @@ use crate::{ JoinArgs, NextArgs, ReviewArgs, ReviewVerdictArg, SessionTypeArg, StatusArgs, WorkflowActionArgs, }, - implement, state::{ - AgentState, Config, FinalizationState, SessionHandle, SessionPhase, SessionState, - SessionType, create_session_state, ensure_dirs_for_session, list_sessions, now, - session_dir, + AgentState, Config, SessionHandle, SessionPhase, SessionState, SessionType, + create_session_state, ensure_dirs_for_session, list_sessions, now, session_dir, }, + watch, workflow::{ self, ActionContext, CreateContext, HostContext, HostContextMut, InitContext, JoinContext, - StatusContext, WorkflowDispatch, - WorkflowRegistry, + StatusContext, WorkflowDispatch, WorkflowRegistry, }, - watch, }; -pub fn create(args: &CreateArgs) -> Result<()> { - let registry = workflow::default_registry()?; - create_with_registry(args, ®istry) -} - pub fn create_with_registry(args: &CreateArgs, registry: &WorkflowRegistry) -> Result<()> { let session_type = match args.session_type { SessionTypeArg::Plan => SessionType::Plan, @@ -45,93 +39,96 @@ pub fn create_with_registry(args: &CreateArgs, registry: &WorkflowRegistry) -> R bail!("expected_agents must be at least 1"); } - let (todo_path, workspace, workflow_id, workflow_source, workflow_version, workflow_state, steps) = match session_type { - SessionType::Plan => ( - args.todo.as_ref().map(|p| p.display().to_string()), - args.workspace.as_ref().map(|p| p.display().to_string()), - None, - None, - None, - None, - Vec::new(), - ), - SessionType::Implement => { - let todo = args - .todo - .as_ref() - .ok_or_else(|| anyhow::anyhow!("--todo is required for implement sessions"))?; - let workspace = args - .workspace - .as_ref() - .ok_or_else(|| anyhow::anyhow!("--workspace is required for implement sessions"))?; - if !workspace.exists() || !workspace.is_dir() { - bail!( - "--workspace must be an existing directory: {}", - workspace.display() - ); - } - let git_check = ProcessCommand::new("git") - .arg("-C") - .arg(workspace) - .arg("rev-parse") - .arg("--is-inside-work-tree") - .output()?; - if !git_check.status.success() { - bail!( - "--workspace must be a git workspace: {}", - workspace.display() - ); - } - let steps = implement::parse_steps_from_todo(todo)?; - ( - Some(todo.display().to_string()), - Some(workspace.display().to_string()), - None, - None, - None, - None, - steps, - ) - } - SessionType::Workflow => { - let workflow_id = args - .workflow - .clone() - .ok_or_else(|| anyhow::anyhow!("--workflow is required for create workflow"))?; - if workflow::builtin::is_builtin_workflow_id(&workflow_id) { - bail!( - "workflow id '{}' is built-in; use `rally create plan` or `rally create implement`", - workflow_id - ); - } - let _ = registry.resolve_str(&workflow_id)?; - - if let Some(workspace) = &args.workspace - && (!workspace.exists() || !workspace.is_dir()) - { - bail!( - "--workspace must be an existing directory: {}", - workspace.display() - ); - } - - ( + let (todo_path, workspace, workflow_id, workflow_source, workflow_version, workflow_state) = + match session_type { + SessionType::Plan => ( args.todo.as_ref().map(|p| p.display().to_string()), args.workspace.as_ref().map(|p| p.display().to_string()), - Some(workflow_id), - Some("buildtime".to_string()), + Some(workflow::builtin::BUILTIN_PLAN_WORKFLOW_ID.to_string()), + Some("builtin".to_string()), None, - Some(json!({})), - Vec::new(), - ) - } - }; - - let implementer_agent = if session_type == SessionType::Implement { - Some(args.implementer.clone()) - } else { - None - }; + Some(plan::initial_workflow_state()?), + ), + SessionType::Implement => { + let todo = args + .todo + .as_ref() + .ok_or_else(|| anyhow::anyhow!("--todo is required for implement sessions"))?; + let workspace = args.workspace.as_ref().ok_or_else(|| { + anyhow::anyhow!("--workspace is required for implement sessions") + })?; + if !workspace.exists() || !workspace.is_dir() { + bail!( + "--workspace must be an existing directory: {}", + workspace.display() + ); + } + let git_check = ProcessCommand::new("git") + .arg("-C") + .arg(workspace) + .arg("rev-parse") + .arg("--is-inside-work-tree") + .output()?; + if !git_check.status.success() { + bail!( + "--workspace must be a git workspace: {}", + workspace.display() + ); + } + let steps = implement::parse_steps_from_todo(todo)?; + ( + Some(todo.display().to_string()), + Some(workspace.display().to_string()), + Some(workflow::builtin::BUILTIN_BUILD_WORKFLOW_ID.to_string()), + Some("builtin".to_string()), + None, + Some(implement::initial_workflow_state( + args.implementer.clone(), + steps, + )?), + ) + } + SessionType::Workflow => { + let workflow_id = args + .workflow + .clone() + .ok_or_else(|| anyhow::anyhow!("--workflow is required for create workflow"))?; + if workflow::builtin::is_builtin_workflow_id(&workflow_id) { + bail!( + "workflow id '{}' is built-in; use `rally create plan` or `rally create implement`", + workflow_id + ); + } + let _ = registry.resolve_str(&workflow_id)?; + if workflow_id == workflow::builtin::COMPOSED_NEGOTIATE_WORKFLOW_ID + && expected_agents < 3 + { + bail!( + "workflow '{}' requires --agents >= 3 (got {})", + workflow_id, + expected_agents + ); + } + + if let Some(workspace) = &args.workspace + && (!workspace.exists() || !workspace.is_dir()) + { + bail!( + "--workspace must be an existing directory: {}", + workspace.display() + ); + } + + ( + args.todo.as_ref().map(|p| p.display().to_string()), + args.workspace.as_ref().map(|p| p.display().to_string()), + Some(workflow_id), + Some("buildtime".to_string()), + None, + Some(json!({})), + ) + } + }; let phase = SessionPhase::Registration; let state = SessionState { @@ -146,9 +143,6 @@ pub fn create_with_registry(args: &CreateArgs, registry: &WorkflowRegistry) -> R review_timeout_secs: args.review_timeout_secs, }, agents: Default::default(), - turn: None, - issues: Vec::new(), - finalization: FinalizationState::default(), topic: args.topic.clone(), todo_path, workspace, @@ -156,12 +150,6 @@ pub fn create_with_registry(args: &CreateArgs, registry: &WorkflowRegistry) -> R workflow_source, workflow_version, workflow_state, - implementer_agent, - steps, - current_step_idx: 0, - checkpoint: None, - reviews: Vec::new(), - pending_feedback: Vec::new(), }; let created = create_session_state(&args.name, state)?; @@ -191,11 +179,6 @@ pub fn create_with_registry(args: &CreateArgs, registry: &WorkflowRegistry) -> R Ok(()) } -pub fn join(args: &JoinArgs) -> Result<()> { - let registry = workflow::default_registry()?; - join_with_registry(args, ®istry) -} - pub fn join_with_registry(args: &JoinArgs, registry: &WorkflowRegistry) -> Result<()> { let wait_start = std::time::Instant::now(); let state_path = session_dir(&args.session).join("state.json"); @@ -217,8 +200,6 @@ pub fn join_with_registry(args: &JoinArgs, registry: &WorkflowRegistry) -> Resul let mut state = handle.load_state()?; let now_ts = now(); - let assign_implementer = - state.session_type == SessionType::Implement && state.implementer_agent.is_none(); state .agents @@ -234,11 +215,8 @@ pub fn join_with_registry(args: &JoinArgs, registry: &WorkflowRegistry) -> Resul rounds: 0, }); - if assign_implementer { - state.implementer_agent = Some(args.agent.to_string()); - } if state.session_type == SessionType::Implement { - let role_status = if state.implementer_agent.as_deref() == Some(args.agent.as_str()) { + let role_status = if implement::is_implementer(&state, &args.agent) { "implementer" } else { "reviewer" @@ -267,29 +245,14 @@ pub fn join_with_registry(args: &JoinArgs, registry: &WorkflowRegistry) -> Resul Ok(()) } -pub fn next(args: &NextArgs) -> Result { - let registry = workflow::default_registry()?; - next_with_registry(args, ®istry) -} - pub fn next_with_registry(args: &NextArgs, registry: &WorkflowRegistry) -> Result { watch::next_with_registry(&args.session, &args.agent, args.timeout, registry) } -pub fn done(args: &DoneArgs) -> Result<()> { - let registry = workflow::default_registry()?; - done_with_registry(args, ®istry) -} - pub fn done_with_registry(args: &DoneArgs, registry: &WorkflowRegistry) -> Result<()> { watch::done_with_registry(&args.session, &args.agent, registry) } -pub fn file_issue(args: &FileIssueArgs) -> Result<()> { - let registry = workflow::default_registry()?; - file_issue_with_registry(args, ®istry) -} - pub fn file_issue_with_registry(args: &FileIssueArgs, registry: &WorkflowRegistry) -> Result<()> { let handle = SessionHandle::open(&args.session)?; let mut state = handle.load_state()?; @@ -326,11 +289,6 @@ pub fn file_issue_with_registry(args: &FileIssueArgs, registry: &WorkflowRegistr Ok(()) } -pub fn challenge(args: &ChallengeArgs) -> Result<()> { - let registry = workflow::default_registry()?; - challenge_with_registry(args, ®istry) -} - pub fn challenge_with_registry(args: &ChallengeArgs, registry: &WorkflowRegistry) -> Result<()> { let handle = SessionHandle::open(&args.session)?; let mut state = handle.load_state()?; @@ -362,11 +320,6 @@ pub fn challenge_with_registry(args: &ChallengeArgs, registry: &WorkflowRegistry Ok(()) } -pub fn checkpoint(args: &CheckpointArgs) -> Result<()> { - let registry = workflow::default_registry()?; - checkpoint_with_registry(args, ®istry) -} - pub fn checkpoint_with_registry(args: &CheckpointArgs, registry: &WorkflowRegistry) -> Result<()> { let handle = SessionHandle::open(&args.session)?; let mut state = handle.load_state()?; @@ -397,11 +350,6 @@ pub fn checkpoint_with_registry(args: &CheckpointArgs, registry: &WorkflowRegist Ok(()) } -pub fn agree(args: &AgreeArgs) -> Result<()> { - let registry = workflow::default_registry()?; - agree_with_registry(args, ®istry) -} - pub fn agree_with_registry(args: &AgreeArgs, registry: &WorkflowRegistry) -> Result<()> { let handle = SessionHandle::open(&args.session)?; let mut state = handle.load_state()?; @@ -433,11 +381,6 @@ pub fn agree_with_registry(args: &AgreeArgs, registry: &WorkflowRegistry) -> Res Ok(()) } -pub fn review(args: &ReviewArgs) -> Result<()> { - let registry = workflow::default_registry()?; - review_with_registry(args, ®istry) -} - pub fn review_with_registry(args: &ReviewArgs, registry: &WorkflowRegistry) -> Result<()> { let handle = SessionHandle::open(&args.session)?; let mut state = handle.load_state()?; @@ -476,7 +419,7 @@ pub fn review_with_registry(args: &ReviewArgs, registry: &WorkflowRegistry) -> R Ok(()) } -pub fn chain(args: &ChainArgs) -> Result<()> { +pub fn chain_with_registry(args: &ChainArgs, registry: &WorkflowRegistry) -> Result<()> { let plan_session_dir; loop { let handle = SessionHandle::open(&args.plan)?; @@ -521,7 +464,7 @@ pub fn chain(args: &ChainArgs) -> Result<()> { review_timeout_secs: 1200, }; - create(&create_args)?; + create_with_registry(&create_args, registry)?; println!( "chained plan '{}' into implement session '{}'", args.plan, args.implement @@ -542,11 +485,6 @@ pub fn sessions() -> Result<()> { Ok(()) } -pub fn status(args: &StatusArgs) -> Result<()> { - let registry = workflow::default_registry()?; - status_with_registry(args, ®istry) -} - pub fn status_with_registry(args: &StatusArgs, registry: &WorkflowRegistry) -> Result<()> { let handle = SessionHandle::open(&args.session)?; let state = handle.load_state()?; @@ -611,11 +549,6 @@ pub fn workflow_list(registry: &WorkflowRegistry) -> Result<()> { Ok(()) } -pub fn workflow_action(args: &WorkflowActionArgs) -> Result<()> { - let registry = workflow::default_registry()?; - workflow_action_with_registry(args, ®istry) -} - pub fn workflow_action_with_registry( args: &WorkflowActionArgs, registry: &WorkflowRegistry, diff --git a/src/lib.rs b/src/lib.rs index 2f0e51c..6b2d69b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,5 @@ pub mod cli; pub mod commands; -pub mod implement; -pub mod plan; pub mod state; pub mod watch; pub mod workflow; @@ -12,10 +10,6 @@ use cli::{BuildSubcommand, Cli, Command, PlanSubcommand, WorkflowSubcommand}; pub use workflow::{Workflow, WorkflowId, WorkflowRegistry}; -pub fn run_with_default_registry() -> Result { - run_with_registry(workflow::default_registry()?) -} - pub fn run_with_registry(registry: WorkflowRegistry) -> Result { let cli = Cli::parse(); run_cli_with_registry(cli, ®istry) @@ -65,12 +59,14 @@ pub fn run_cli_with_registry(cli: Cli, registry: &WorkflowRegistry) -> Result { - commands::chain(&args)?; + commands::chain_with_registry(&args, registry)?; Ok(0) } Command::Plan(args) => { match args.command { - PlanSubcommand::FileIssue(inner) => commands::file_issue_with_registry(&inner, registry)?, + PlanSubcommand::FileIssue(inner) => { + commands::file_issue_with_registry(&inner, registry)? + } PlanSubcommand::Challenge(inner) => { commands::challenge_with_registry(&inner, registry)? } diff --git a/src/main.rs b/src/main.rs index 9eb1945..76001af 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,11 @@ fn main() { - let code = match rally::run_with_default_registry() { + let code = match (|| -> anyhow::Result { + let mut registry = rally::WorkflowRegistry::new(); + registry.register(rally::workflow::builtin::PlanWorkflow)?; + registry.register(rally::workflow::builtin::BuildWorkflow)?; + registry.register(rally::workflow::builtin::ComposedNegotiateWorkflow)?; + rally::run_with_registry(registry) + })() { Ok(code) => code, Err(err) => { eprintln!("error: {err:#}"); diff --git a/src/plan.rs b/src/plan.rs deleted file mode 100644 index f0e7a10..0000000 --- a/src/plan.rs +++ /dev/null @@ -1,750 +0,0 @@ -use std::{ - collections::{BTreeMap, VecDeque}, - fs, - path::{Path, PathBuf}, -}; - -use anyhow::{Result, bail}; -use chrono::Duration; - -use crate::implement; -use crate::state::{IssueState, IssueStatus, SessionPhase, SessionState, TurnState, now}; - -pub fn get_instruction(state: &SessionState, agent: &str, session_dir: &Path) -> Option { - match state.phase { - SessionPhase::Proposal => { - let done_key = "proposal_done"; - let agent_state = state.agents.get(agent)?; - if agent_state.phase_status == done_key { - None - } else { - Some(format!( - "Write your proposal to: {}\nSession topic: {}\nAfter writing it, run: rally done --session {} --as {}", - session_dir - .join("sources") - .join(format!("{agent}-proposal.md")) - .display(), - state - .topic - .clone() - .unwrap_or_else(|| "(no topic provided)".to_string()), - state.name, - agent - )) - } - } - SessionPhase::Analysis => { - let done_key = "analysis_done"; - let agent_state = state.agents.get(agent)?; - if agent_state.phase_status == done_key { - None - } else { - let proposal_files = list_source_proposals(session_dir); - let proposal_lines = if proposal_files.is_empty() { - format!( - "- (none found yet) in {}", - session_dir.join("sources").display() - ) - } else { - proposal_files - .iter() - .map(|p| format!("- {}", p.display())) - .collect::>() - .join("\n") - }; - Some(format!( - "Read proposal files:\n{}\nWrite divergence analysis to {}.\nFile issues with: rally plan file-issue --session {} --as {} --title \"...\"\nWhen complete, run: rally done --session {} --as {}", - proposal_lines, - session_dir - .join("analysis") - .join(format!("{agent}.md")) - .display(), - state.name, - agent, - state.name, - agent - )) - } - } - SessionPhase::Negotiation => { - let turn = state.turn.as_ref()?; - if turn.holder != agent { - return None; - } - - let open_issues = state - .issues - .iter() - .filter(|i| i.status == IssueStatus::Open) - .collect::>(); - if open_issues.is_empty() { - return Some("No open issues remain; waiting for phase transition.".to_string()); - } - - let mut lines = Vec::new(); - lines.push(format!( - "It is your turn in negotiation (round {}).", - turn.round - )); - lines.push("Open issues:".to_string()); - - for issue in open_issues { - let positions = join_or_none(issue.positions.iter().cloned().collect()); - let challenges = join_or_none(issue.challenges.iter().cloned().collect()); - let can_agree = issue.challenges.iter().any(|a| a != agent) - && issue.positions.iter().any(|a| a != agent); - - let hint = if !issue.positions.contains(agent) { - "You haven't written a position yet" - } else if can_agree { - "You may agree — challenge exists from another agent" - } else if !issue.challenges.contains(agent) { - "Consider challenging another position if needed" - } else { - "Update or refine your position/challenge" - }; - - lines.push(format!( - "- #{:02} {} ({}) | positions: [{}] | challenges: [{}] | hint: {}", - issue.id, - issue.file, - format_status(issue), - positions, - challenges, - hint - )); - lines.push(format!( - " Position file: {}", - position_path(session_dir, issue, agent).display() - )); - } - - lines.push(format!( - "When done for this turn, run: rally done --session {} --as {}", - state.name, agent - )); - Some(lines.join("\n")) - } - SessionPhase::FinalizationWrite => { - if state.finalization.writer.as_deref() == Some(agent) - || state.finalization.writer.is_none() - { - Some(format!( - "Write final artifacts: {} and {}.\n`final.md` must use write-todo format:\n- include `## Spec`\n- include `## Plan`\n- use numbered steps (`1.`, `2.`, ...)\n- include acceptance criteria per step\n- include a final manual QA step\nThen run: rally done --session {} --as {}", - session_dir.join("coverage-audit.md").display(), - session_dir.join("final.md").display(), - state.name, - agent - )) - } else { - None - } - } - SessionPhase::FinalizationReview => { - if state.finalization.writer.as_deref() == Some(agent) { - None - } else { - Some(format!( - "Review {} and {}. If approved, run rally done --session {} --as {}. If rejected, file issues with rally plan file-issue, which returns the session to negotiation.", - session_dir.join("coverage-audit.md").display(), - session_dir.join("final.md").display(), - state.name, - agent - )) - } - } - _ => None, - } -} - -pub fn get_wait_hint(state: &SessionState, agent: &str) -> Option { - match state.phase { - SessionPhase::FinalizationReview => { - if state.finalization.writer.as_deref() == Some(agent) { - Some(format!( - "Waiting for another agent to review final.md in session '{}'.", - state.name - )) - } else { - None - } - } - _ => None, - } -} - -pub fn prepare_instruction(state: &mut SessionState, agent: &str) -> bool { - if state.phase == SessionPhase::FinalizationWrite && state.finalization.writer.is_none() { - state.finalization.writer = Some(agent.to_string()); - return true; - } - false -} - -pub fn file_issue( - state: &mut SessionState, - session_dir: &Path, - agent: &str, - title: &str, - question: Option<&str>, - context: Option<&str>, -) -> Result { - if state.phase != SessionPhase::Analysis && state.phase != SessionPhase::FinalizationReview { - bail!("file-issue is only valid during analysis or finalization review"); - } - - let next_id = state.issues.iter().map(|i| i.id).max().unwrap_or(0) + 1; - let slug = slugify(title); - let file_name = format!("{:02}-{slug}.md", next_id); - let issue_path = session_dir.join("issues").join(&file_name); - - let question = question.unwrap_or(""); - let context = context.unwrap_or(""); - let template = format!( - "# Issue {:02}: {}\n\nTitle: {}\nQuestion: {}\nContext: {}\n\n## Position A\n\n## Position B\n", - next_id, title, title, question, context - ); - - fs::write(&issue_path, template)?; - - state.issues.push(IssueState { - id: next_id, - slug, - file: file_name, - title: title.to_string(), - author: agent.to_string(), - status: IssueStatus::Open, - positions: Default::default(), - challenges: Default::default(), - agreed_by: None, - created_at: now(), - }); - - if state.phase == SessionPhase::FinalizationReview { - state.finalization.reopened_for_issues = true; - state.phase = SessionPhase::Negotiation; - for agent_state in state.agents.values_mut() { - agent_state.rounds = 0; - } - initialize_turn(state)?; - } - - Ok(issue_path.display().to_string()) -} - -pub fn challenge_issue( - state: &mut SessionState, - session_dir: &Path, - agent: &str, - issue_id: u32, -) -> Result { - ensure_turn_holder(state, agent)?; - refresh_negotiation_state(state, session_dir)?; - - let issue = state - .issues - .iter_mut() - .find(|i| i.id == issue_id) - .ok_or_else(|| anyhow::anyhow!("issue {:02} not found", issue_id))?; - - if issue.status != IssueStatus::Open { - bail!("issue {:02} is not open", issue_id); - } - - let position = position_path(session_dir, issue, agent); - if let Some(parent) = position.parent() { - fs::create_dir_all(parent)?; - } - - if !position.exists() { - fs::write( - &position, - format!("# Position for issue {:02}\n\n", issue.id), - )?; - } - - let marker = format!( - "\n## Challenge {}\n- Add your challenge details here.\n", - now().to_rfc3339() - ); - append_text(&position, &marker)?; - - issue.positions.insert(agent.to_string()); - issue.challenges.insert(agent.to_string()); - - Ok(format!( - "challenge recorded for issue {:02} at {}", - issue.id, - position.display() - )) -} - -pub fn agree_issue( - state: &mut SessionState, - session_dir: &Path, - agent: &str, - issue_id: u32, -) -> Result { - ensure_turn_holder(state, agent)?; - refresh_negotiation_state(state, session_dir)?; - - let issue = state - .issues - .iter_mut() - .find(|i| i.id == issue_id) - .ok_or_else(|| anyhow::anyhow!("issue {:02} not found", issue_id))?; - - if issue.status != IssueStatus::Open { - bail!("issue {:02} is not open", issue_id); - } - - if !issue.positions.iter().any(|a| a != agent) { - bail!( - "cannot agree issue {:02}: only your own positions exist", - issue_id - ); - } - - if !issue.challenges.iter().any(|a| a != agent) { - bail!( - "cannot agree issue {:02}: no challenge from another agent exists", - issue_id - ); - } - - issue.status = IssueStatus::Agreed; - issue.agreed_by = Some(agent.to_string()); - - let mut msg = format!("issue {:02} marked AGREED by {}", issue_id, agent); - if maybe_resolve_negotiation(state) { - msg.push_str("; all issues resolved, advanced to finalization"); - } - Ok(msg) -} - -pub fn mark_done(state: &mut SessionState, agent: &str, session_dir: &Path) -> Result { - let agent_state = state - .agents - .get_mut(agent) - .ok_or_else(|| anyhow::anyhow!("agent '{}' is not joined", agent))?; - - match state.phase { - SessionPhase::Proposal => { - let proposal_path = session_dir - .join("sources") - .join(format!("{agent}-proposal.md")); - if !proposal_path.exists() { - bail!( - "missing proposal file {}; write it before running done", - proposal_path.display() - ); - } - - agent_state.phase_status = "proposal_done".to_string(); - if state - .agents - .values() - .all(|a| a.phase_status == "proposal_done") - { - for a in state.agents.values_mut() { - a.phase_status = "analysis_pending".to_string(); - } - state.phase = SessionPhase::Analysis; - Ok("proposal phase complete; advanced to analysis".to_string()) - } else { - Ok("marked proposal done".to_string()) - } - } - SessionPhase::Analysis => { - let analysis_path = session_dir.join("analysis").join(format!("{agent}.md")); - if !analysis_path.exists() { - bail!( - "missing analysis file {}; write it before running done", - analysis_path.display() - ); - } - - agent_state.phase_status = "analysis_done".to_string(); - if state - .agents - .values() - .all(|a| a.phase_status == "analysis_done") - { - renumber_issues_interleaved(state, session_dir)?; - state.phase = SessionPhase::Negotiation; - initialize_turn(state)?; - Ok( - "analysis phase complete; issues normalized; advanced to negotiation" - .to_string(), - ) - } else { - Ok("marked analysis done".to_string()) - } - } - SessionPhase::Negotiation => { - ensure_turn_holder(state, agent)?; - refresh_negotiation_state(state, session_dir)?; - if state.phase != SessionPhase::Negotiation { - return Ok("issues already resolved; advanced phase".to_string()); - } - - advance_turn(state)?; - if state.phase == SessionPhase::FinalizationWrite { - Ok( - "max rounds reached or all issues resolved; advanced to finalization" - .to_string(), - ) - } else { - Ok("advanced negotiation turn".to_string()) - } - } - SessionPhase::FinalizationWrite => { - if state.finalization.writer.as_deref() != Some(agent) { - bail!( - "only '{}' can complete finalization write", - state - .finalization - .writer - .as_deref() - .unwrap_or("") - ); - } - let coverage = session_dir.join("coverage-audit.md"); - let final_doc = session_dir.join("final.md"); - if !coverage.exists() { - bail!("missing {}", coverage.display()); - } - if !final_doc.exists() { - bail!("missing {}", final_doc.display()); - } - implement::parse_steps_from_todo(&final_doc).map_err(|err| { - anyhow::anyhow!( - "final.md must be write-todo format (## Spec + ## Plan with numbered steps): {err}" - ) - })?; - if state.config.expected_agents <= 1 || state.agents.len() <= 1 { - return finalize_and_complete(state, session_dir, agent, "single-agent"); - } - state.phase = SessionPhase::FinalizationReview; - Ok("finalization write completed; advanced to review".to_string()) - } - SessionPhase::FinalizationReview => { - if state.finalization.writer.as_deref() == Some(agent) { - if state.config.expected_agents <= 1 || state.agents.len() <= 1 { - return finalize_and_complete(state, session_dir, agent, "single-agent"); - } - bail!("the final writer cannot self-approve"); - } - finalize_and_complete(state, session_dir, agent, "reviewer-approved") - } - SessionPhase::Done => Ok("session already complete".to_string()), - _ => bail!("done is not valid during {:?}", state.phase), - } -} - -fn finalize_and_complete( - state: &mut SessionState, - session_dir: &Path, - approver: &str, - mode: &str, -) -> Result { - let final_doc = session_dir.join("final.md"); - if !final_doc.exists() { - bail!("missing {}", final_doc.display()); - } - let todo_doc = session_dir.join("todo.md"); - fs::copy(&final_doc, &todo_doc)?; - state.finalization.reviewer = Some(approver.to_string()); - state.finalization.todo_ready = true; - state.phase = SessionPhase::Done; - Ok(format!( - "finalization ({mode}); wrote {} and completed session", - todo_doc.display() - )) -} - -pub fn enforce_turn_timeout(state: &mut SessionState) -> Result { - if state.phase != SessionPhase::Negotiation { - return Ok(false); - } - - let Some(turn) = state.turn.as_ref() else { - initialize_turn(state)?; - return Ok(true); - }; - - let elapsed = now().signed_duration_since(turn.started_at); - if elapsed >= Duration::seconds(state.config.turn_timeout_secs as i64) { - advance_turn(state)?; - return Ok(true); - } - - Ok(false) -} - -pub fn refresh_negotiation_state(state: &mut SessionState, session_dir: &Path) -> Result { - if state.phase != SessionPhase::Negotiation { - return Ok(false); - } - - let mut changed = false; - for issue in &mut state.issues { - let dir = issue_dir_path(session_dir, issue.id, &issue.slug); - if !dir.exists() { - continue; - } - - for entry in fs::read_dir(&dir)? { - let entry = entry?; - if !entry.file_type()?.is_file() { - continue; - } - - let path = entry.path(); - if path.extension().and_then(|s| s.to_str()) != Some("md") { - continue; - } - - let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else { - continue; - }; - - if issue.positions.insert(stem.to_string()) { - changed = true; - } - - let content = fs::read_to_string(&path).unwrap_or_default(); - if content.contains("## Challenge") && issue.challenges.insert(stem.to_string()) { - changed = true; - } - } - } - - if maybe_resolve_negotiation(state) { - changed = true; - } - - Ok(changed) -} - -pub fn initialize_turn(state: &mut SessionState) -> Result<()> { - let mut agents = state.agents.keys().cloned().collect::>(); - agents.sort(); - - let first = agents - .first() - .ok_or_else(|| anyhow::anyhow!("cannot initialize turn with no agents"))? - .clone(); - - state.turn = Some(TurnState { - holder: first, - started_at: now(), - round: 1, - }); - - Ok(()) -} - -pub fn advance_turn(state: &mut SessionState) -> Result<()> { - let mut agents = state.agents.keys().cloned().collect::>(); - agents.sort(); - - if agents.is_empty() { - bail!("cannot advance turn with no agents"); - } - - let current_holder = state - .turn - .as_ref() - .map(|t| t.holder.clone()) - .unwrap_or_else(|| agents[0].clone()); - - if let Some(agent_state) = state.agents.get_mut(¤t_holder) { - agent_state.rounds += 1; - } - - if maybe_resolve_negotiation(state) { - state.turn = None; - return Ok(()); - } - - let idx = agents - .iter() - .position(|a| *a == current_holder) - .unwrap_or(0usize); - let next = agents[(idx + 1) % agents.len()].clone(); - - let round = state.turn.as_ref().map(|t| t.round + 1).unwrap_or(1); - state.turn = Some(TurnState { - holder: next, - started_at: now(), - round, - }); - - Ok(()) -} - -fn maybe_resolve_negotiation(state: &mut SessionState) -> bool { - if state.phase != SessionPhase::Negotiation { - return false; - } - - let mut changed = false; - let open_count = state - .issues - .iter() - .filter(|i| i.status == IssueStatus::Open) - .count(); - - if open_count > 0 - && !state.agents.is_empty() - && state - .agents - .values() - .all(|a| a.rounds >= state.config.max_rounds) - { - for issue in &mut state.issues { - if issue.status == IssueStatus::Open { - issue.status = IssueStatus::Escalated; - changed = true; - } - } - } - - if state.issues.iter().all(|i| i.status != IssueStatus::Open) { - state.phase = SessionPhase::FinalizationWrite; - state.turn = None; - changed = true; - } - - changed -} - -fn renumber_issues_interleaved(state: &mut SessionState, session_dir: &Path) -> Result<()> { - let mut by_author: BTreeMap> = BTreeMap::new(); - for issue in state.issues.drain(..) { - by_author - .entry(issue.author.clone()) - .or_default() - .push_back(issue); - } - - let mut interleaved = Vec::new(); - loop { - let mut progressed = false; - let authors = by_author.keys().cloned().collect::>(); - for author in authors { - let queue = by_author.get_mut(&author).expect("author key must exist"); - if let Some(issue) = queue.pop_front() { - interleaved.push(issue); - progressed = true; - } - } - if !progressed { - break; - } - } - - for (idx, issue) in interleaved.iter_mut().enumerate() { - let new_id = (idx + 1) as u32; - let new_file = format!("{:02}-{}.md", new_id, issue.slug); - - let old_path = session_dir.join("issues").join(&issue.file); - let new_path = session_dir.join("issues").join(&new_file); - if old_path != new_path && old_path.exists() { - fs::rename(old_path, &new_path)?; - } - - issue.id = new_id; - issue.file = new_file; - } - - state.issues = interleaved; - Ok(()) -} - -fn ensure_turn_holder(state: &SessionState, agent: &str) -> Result<()> { - if state.phase != SessionPhase::Negotiation { - bail!("command is only valid during negotiation phase") - } - - let turn = state - .turn - .as_ref() - .ok_or_else(|| anyhow::anyhow!("turn state is not initialized"))?; - if turn.holder != agent { - bail!("it is currently '{}' turn", turn.holder) - } - - Ok(()) -} - -fn position_path(session_dir: &Path, issue: &IssueState, agent: &str) -> PathBuf { - issue_dir_path(session_dir, issue.id, &issue.slug).join(format!("{agent}.md")) -} - -fn issue_dir_path(session_dir: &Path, issue_id: u32, slug: &str) -> PathBuf { - session_dir - .join("positions") - .join(format!("{:02}-{slug}", issue_id)) -} - -fn append_text(path: &Path, text: &str) -> Result<()> { - let existing = fs::read_to_string(path).unwrap_or_default(); - fs::write(path, format!("{existing}{text}"))?; - Ok(()) -} - -fn format_status(issue: &IssueState) -> &'static str { - match issue.status { - IssueStatus::Open => "OPEN", - IssueStatus::Agreed => "AGREED", - IssueStatus::Escalated => "ESCALATED", - } -} - -fn join_or_none(mut items: Vec) -> String { - if items.is_empty() { - return "none".to_string(); - } - items.sort(); - items.join(", ") -} - -fn list_source_proposals(session_dir: &Path) -> Vec { - let sources_dir = session_dir.join("sources"); - let Ok(entries) = fs::read_dir(&sources_dir) else { - return Vec::new(); - }; - - let mut files = entries - .filter_map(|entry| entry.ok()) - .map(|entry| entry.path()) - .filter(|path| path.is_file()) - .collect::>(); - files.sort(); - files -} - -fn slugify(input: &str) -> String { - let mut out = String::with_capacity(input.len()); - let mut last_dash = false; - - for ch in input.chars() { - let c = ch.to_ascii_lowercase(); - if c.is_ascii_alphanumeric() { - out.push(c); - last_dash = false; - } else if !last_dash { - out.push('-'); - last_dash = true; - } - } - - let trimmed = out.trim_matches('-').to_string(); - if trimmed.is_empty() { - "issue".to_string() - } else { - trimmed - } -} diff --git a/src/state.rs b/src/state.rs index 00ae49e..bbc36f3 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,163 +1,13 @@ use std::{ - collections::{BTreeMap, BTreeSet}, fs::{self, File, OpenOptions}, io::{Read, Write}, path::{Path, PathBuf}, }; use anyhow::{Context, Result, anyhow, bail}; -use chrono::{DateTime, Utc}; use fs2::FileExt; -use serde::{Deserialize, Serialize}; -use serde_json::Value; -#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] -#[serde(rename_all = "snake_case")] -pub enum SessionType { - Plan, - Implement, - Workflow, -} - -#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] -#[serde(rename_all = "snake_case")] -pub enum SessionPhase { - Registration, - Proposal, - Analysis, - Negotiation, - FinalizationWrite, - FinalizationReview, - Implement, - Review, - Done, -} - -#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] -#[serde(rename_all = "SCREAMING_SNAKE_CASE")] -pub enum IssueStatus { - Open, - Agreed, - Escalated, -} - -#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] -#[serde(rename_all = "SCREAMING_SNAKE_CASE")] -pub enum StepStatus { - Pending, - InProgress, - Approved, - Escalated, -} - -#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] -#[serde(rename_all = "SCREAMING_SNAKE_CASE")] -pub enum ReviewVerdict { - Approve, - ChangesRequested, - Blocked, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct Config { - pub expected_agents: u32, - pub max_rounds: u32, - pub turn_timeout_secs: u64, - pub review_timeout_secs: u64, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct AgentState { - pub name: String, - pub joined_at: DateTime, - pub last_seen: DateTime, - pub phase_status: String, - pub rounds: u32, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct TurnState { - pub holder: String, - pub started_at: DateTime, - pub round: u32, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct IssueState { - pub id: u32, - pub slug: String, - pub file: String, - pub title: String, - pub author: String, - pub status: IssueStatus, - pub positions: BTreeSet, - pub challenges: BTreeSet, - pub agreed_by: Option, - pub created_at: DateTime, -} - -#[derive(Clone, Debug, Serialize, Deserialize, Default)] -pub struct FinalizationState { - pub writer: Option, - pub reviewer: Option, - pub todo_ready: bool, - pub reopened_for_issues: bool, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct StepState { - pub number: u32, - pub title: String, - pub acceptance_criteria: Vec, - pub status: StepStatus, - pub escalated: bool, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct CheckpointState { - pub step_number: u32, - pub commit_sha: String, - pub created_at: DateTime, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct ReviewState { - pub reviewer: String, - pub verdict: ReviewVerdict, - pub message: Option, - pub created_at: DateTime, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct SessionState { - pub name: String, - pub session_type: SessionType, - pub phase: SessionPhase, - pub created_at: DateTime, - pub config: Config, - pub agents: BTreeMap, - pub turn: Option, - pub issues: Vec, - pub finalization: FinalizationState, - pub topic: Option, - pub todo_path: Option, - pub workspace: Option, - #[serde(default)] - pub workflow_id: Option, - #[serde(default)] - pub workflow_source: Option, - #[serde(default)] - pub workflow_version: Option, - #[serde(default)] - pub workflow_state: Option, - #[serde(default)] - pub implementer_agent: Option, - pub steps: Vec, - pub current_step_idx: usize, - pub checkpoint: Option, - pub reviews: Vec, - pub pending_feedback: Vec, -} +pub use rally_core::{AgentState, Config, SessionPhase, SessionState, SessionType, now}; #[cfg(test)] mod tests { @@ -171,7 +21,7 @@ mod tests { SessionState { name: "sample".to_string(), session_type: SessionType::Implement, - phase: SessionPhase::Implement, + phase: SessionPhase::Registration, created_at: now(), config: Config { expected_agents: 2, @@ -180,9 +30,6 @@ mod tests { review_timeout_secs: 1200, }, agents: BTreeMap::new(), - turn: None, - issues: Vec::new(), - finalization: FinalizationState::default(), topic: None, todo_path: None, workspace: Some("/tmp/workspace".to_string()), @@ -190,12 +37,6 @@ mod tests { workflow_source: None, workflow_version: None, workflow_state: None, - implementer_agent: Some("implementer".to_string()), - steps: Vec::new(), - current_step_idx: 0, - checkpoint: None, - reviews: Vec::new(), - pending_feedback: Vec::new(), } } @@ -232,10 +73,7 @@ mod tests { let encoded = serde_json::to_string(&state).expect("encode state"); let decoded: SessionState = serde_json::from_str(&encoded).expect("decode state"); - assert_eq!( - decoded.workflow_id.as_deref(), - Some("myco/security-review") - ); + assert_eq!(decoded.workflow_id.as_deref(), Some("myco/security-review")); assert_eq!(decoded.workflow_source.as_deref(), Some("buildtime")); assert_eq!(decoded.workflow_version.as_deref(), Some("1.2.3")); assert_eq!( @@ -311,10 +149,6 @@ impl Drop for SessionHandle { } } -pub fn now() -> DateTime { - Utc::now() -} - pub fn rally_home() -> Result { let home = dirs::home_dir().ok_or_else(|| anyhow!("unable to resolve home directory"))?; Ok(home.join(".rally").join("sessions")) diff --git a/src/watch.rs b/src/watch.rs index ebe7f4a..411643d 100644 --- a/src/watch.rs +++ b/src/watch.rs @@ -13,11 +13,6 @@ use crate::{ }, }; -pub fn next(session: &str, agent: &str, timeout_secs: Option) -> Result { - let registry = workflow::default_registry()?; - next_with_registry(session, agent, timeout_secs, ®istry) -} - pub fn next_with_registry( session: &str, agent: &str, @@ -100,11 +95,6 @@ pub fn next_with_registry( } } -pub fn done(session: &str, agent: &str) -> Result<()> { - let registry = workflow::default_registry()?; - done_with_registry(session, agent, ®istry) -} - pub fn done_with_registry(session: &str, agent: &str, registry: &WorkflowRegistry) -> Result<()> { let handle = SessionHandle::open(session)?; let mut state = handle.load_state()?; diff --git a/src/workflow/builtin.rs b/src/workflow/builtin.rs index a2dff30..1f91939 100644 --- a/src/workflow/builtin.rs +++ b/src/workflow/builtin.rs @@ -1,10 +1,12 @@ use anyhow::{Result, anyhow, bail}; +use rally_workflow_build as implement; +use rally_workflow_plan::{ + self as plan, DefaultPlanPolicy, PlanDoneDispatch, PlanEngine, PlanPollDispatch, + StrictNegotiationPolicy, +}; use serde_json::Value; -use crate::{ - implement, plan, - state::{IssueStatus, ReviewVerdict, SessionPhase, StepStatus}, -}; +use crate::state::SessionPhase; use super::{ ActionContext, ActionDispatch, DoneContext, DoneDispatch, JoinContext, JoinDispatch, @@ -13,6 +15,7 @@ use super::{ pub const BUILTIN_PLAN_WORKFLOW_ID: &str = "builtin/plan"; pub const BUILTIN_BUILD_WORKFLOW_ID: &str = "builtin/build"; +pub const COMPOSED_NEGOTIATE_WORKFLOW_ID: &str = "demo/negotiate"; pub fn is_builtin_workflow_id(id: &str) -> bool { id == BUILTIN_PLAN_WORKFLOW_ID || id == BUILTIN_BUILD_WORKFLOW_ID @@ -20,6 +23,7 @@ pub fn is_builtin_workflow_id(id: &str) -> bool { pub struct PlanWorkflow; pub struct BuildWorkflow; +pub struct ComposedNegotiateWorkflow; impl Workflow for PlanWorkflow { fn id(&self) -> &str { @@ -30,114 +34,122 @@ impl Workflow for PlanWorkflow { &["file-issue", "challenge", "agree"] } - fn on_join(&self, _ctx: &mut JoinContext<'_>) -> Result { + fn on_join(&self, ctx: &mut JoinContext<'_>) -> Result { + let engine = PlanEngine::new(DefaultPlanPolicy); + engine.join(ctx.host.state)?; Ok(JoinDispatch::default()) } fn poll_next(&self, ctx: &mut NextPollContext<'_>) -> Result { - plan::refresh_negotiation_state(ctx.host.state, ctx.host.session_dir)?; - plan::enforce_turn_timeout(ctx.host.state)?; - plan::prepare_instruction(ctx.host.state, ctx.agent); - - if let Some(instruction) = plan::get_instruction(ctx.host.state, ctx.agent, ctx.host.session_dir) { - return Ok(NextPollDispatch::Instruction { body: instruction }); + let engine = PlanEngine::new(DefaultPlanPolicy); + match engine.poll_next(ctx.host.state, ctx.host.session_dir, ctx.agent)? { + PlanPollDispatch::Instruction { body } => Ok(NextPollDispatch::Instruction { body }), + PlanPollDispatch::Wait { hint } => Ok(NextPollDispatch::Wait { hint }), + PlanPollDispatch::Complete { message } => Ok(NextPollDispatch::Complete { message }), } - - if ctx.host.state.phase == SessionPhase::Done { - return Ok(NextPollDispatch::Complete { - message: "session already complete".to_string(), - }); - } - - Ok(NextPollDispatch::Wait { - hint: plan::get_wait_hint(ctx.host.state, ctx.agent), - }) } fn on_done(&self, ctx: &mut DoneContext<'_>) -> Result { - let message = plan::mark_done(ctx.host.state, ctx.agent, ctx.host.session_dir)?; + let engine = PlanEngine::new(DefaultPlanPolicy); + let PlanDoneDispatch { + message, + next_phase, + } = engine.done(ctx.host.state, ctx.host.session_dir, ctx.agent)?; Ok(DoneDispatch { message, - next_phase: Some(ctx.host.state.phase.clone()), + next_phase, }) } fn on_action(&self, ctx: &mut ActionContext<'_>) -> Result { - let message = match ctx.name { - "file-issue" => { - let title = required_string(ctx.args, "title", ctx.name)?; - let question = optional_string(ctx.args, "question"); - let context = optional_string(ctx.args, "context"); - let path = plan::file_issue( - ctx.host.state, - ctx.host.session_dir, - ctx.agent, - &title, - question.as_deref(), - context.as_deref(), - )?; - format!("filed issue at {path}") - } - "challenge" => { - let issue = required_u32(ctx.args, "issue", ctx.name)?; - plan::challenge_issue(ctx.host.state, ctx.host.session_dir, ctx.agent, issue)? - } - "agree" => { - let issue = required_u32(ctx.args, "issue", ctx.name)?; - plan::agree_issue(ctx.host.state, ctx.host.session_dir, ctx.agent, issue)? - } - _ => { - bail!( - "unknown action '{}' for workflow '{}'; supported actions: file-issue, challenge, agree", - ctx.name, - self.id() - ) - } - }; - - Ok(ActionDispatch { message }) + let engine = PlanEngine::new(DefaultPlanPolicy); + Ok(ActionDispatch { + message: engine.action( + ctx.host.state, + ctx.host.session_dir, + ctx.agent, + ctx.name, + ctx.args, + )?, + }) } fn format_status(&self, ctx: &StatusContext<'_>) -> Result { - let mut lines = Vec::new(); + let engine = PlanEngine::new(DefaultPlanPolicy); + Ok(StatusDispatch { + lines: engine.status(ctx.host.state)?, + }) + } +} - if let Some(turn) = &ctx.host.state.turn { - lines.push(format!( - "Turn: holder={} round={} started={}", - turn.holder, - turn.round, - turn.started_at.to_rfc3339() - )); - } else { - lines.push("Turn: none".to_string()); +impl Workflow for ComposedNegotiateWorkflow { + fn id(&self) -> &str { + COMPOSED_NEGOTIATE_WORKFLOW_ID + } + + fn supported_actions(&self) -> &'static [&'static str] { + &["file-issue", "challenge", "agree"] + } + + fn on_create(&self, ctx: &mut super::CreateContext<'_>) -> Result { + ctx.host.state.workflow_state = Some(plan::initial_workflow_state()?); + for p in [ + "sources", + "analysis", + "issues", + "positions", + "checkpoints", + "reviews", + ] { + std::fs::create_dir_all(ctx.host.session_dir.join(p))?; } + Ok(Default::default()) + } - let open = ctx - .host - .state - .issues - .iter() - .filter(|i| i.status == IssueStatus::Open) - .count(); - let agreed = ctx - .host - .state - .issues - .iter() - .filter(|i| i.status == IssueStatus::Agreed) - .count(); - let escalated = ctx - .host - .state - .issues - .iter() - .filter(|i| i.status == IssueStatus::Escalated) - .count(); - lines.push(format!( - "Issue summary: {} open, {} agreed, {} escalated", - open, agreed, escalated - )); + fn on_join(&self, ctx: &mut JoinContext<'_>) -> Result { + let engine = PlanEngine::new(StrictNegotiationPolicy); + engine.join(ctx.host.state)?; + Ok(JoinDispatch::default()) + } + fn poll_next(&self, ctx: &mut NextPollContext<'_>) -> Result { + let engine = PlanEngine::new(StrictNegotiationPolicy); + match engine.poll_next(ctx.host.state, ctx.host.session_dir, ctx.agent)? { + PlanPollDispatch::Instruction { body } => Ok(NextPollDispatch::Instruction { body }), + PlanPollDispatch::Wait { hint } => Ok(NextPollDispatch::Wait { hint }), + PlanPollDispatch::Complete { message } => Ok(NextPollDispatch::Complete { message }), + } + } + + fn on_done(&self, ctx: &mut DoneContext<'_>) -> Result { + let engine = PlanEngine::new(StrictNegotiationPolicy); + let PlanDoneDispatch { + message, + next_phase, + } = engine.done(ctx.host.state, ctx.host.session_dir, ctx.agent)?; + Ok(DoneDispatch { + message, + next_phase, + }) + } + + fn on_action(&self, ctx: &mut ActionContext<'_>) -> Result { + let engine = PlanEngine::new(StrictNegotiationPolicy); + Ok(ActionDispatch { + message: engine.action( + ctx.host.state, + ctx.host.session_dir, + ctx.agent, + ctx.name, + ctx.args, + )?, + }) + } + + fn format_status(&self, ctx: &StatusContext<'_>) -> Result { + let engine = PlanEngine::new(StrictNegotiationPolicy); + let mut lines = engine.status(ctx.host.state)?; + lines.insert(0, "Policy: strict-negotiate (composed demo)".to_string()); Ok(StatusDispatch { lines }) } } @@ -209,26 +221,23 @@ impl Workflow for BuildWorkflow { } fn format_status(&self, ctx: &StatusContext<'_>) -> Result { + let workflow_state = implement::read_workflow_state(ctx.host.state)?; let mut lines = Vec::new(); - lines.push(format!( - "Implementer: {}", - ctx.host - .state - .implementer_agent - .clone() - .unwrap_or_else(|| "(unassigned)".to_string()) - )); - let total = ctx.host.state.steps.len(); - let completed = ctx - .host - .state + lines.push(format!("Implementer: {}", workflow_state.implementer_agent)); + let total = workflow_state.steps.len(); + let completed = workflow_state .steps .iter() - .filter(|s| matches!(s.status, StepStatus::Approved | StepStatus::Escalated)) + .filter(|s| { + matches!( + s.status, + implement::StepStatus::Approved | implement::StepStatus::Escalated + ) + }) .count(); lines.push(format!("Step progress: {}/{}", completed, total)); - if let Some(step) = ctx.host.state.steps.get(ctx.host.state.current_step_idx) { + if let Some(step) = workflow_state.steps.get(workflow_state.current_step_idx) { lines.push(format!("Current step: {}. {}", step.number, step.title)); lines.push(format!("Current step status: {:?}", step.status)); } else { @@ -236,7 +245,7 @@ impl Workflow for BuildWorkflow { } if ctx.host.state.phase == SessionPhase::Review { - if let Some(cp) = &ctx.host.state.checkpoint { + if let Some(cp) = &workflow_state.checkpoint { lines.push(format!( "Review checkpoint: step {} @ {}", cp.step_number, cp.commit_sha @@ -244,14 +253,14 @@ impl Workflow for BuildWorkflow { } lines.push(format!( "Review submissions: {}", - ctx.host.state.reviews.len() + workflow_state.reviews.len() )); } - if !ctx.host.state.pending_feedback.is_empty() { + if !workflow_state.pending_feedback.is_empty() { lines.push(format!( "Pending feedback items: {}", - ctx.host.state.pending_feedback.len() + workflow_state.pending_feedback.len() )); } @@ -272,22 +281,13 @@ fn optional_string(args: &Value, key: &str) -> Option { .map(ToString::to_string) } -fn required_u32(args: &Value, key: &str, action: &str) -> Result { - let value = args - .get(key) - .and_then(Value::as_u64) - .ok_or_else(|| anyhow!("action '{}' requires numeric field '{}'", action, key))?; - u32::try_from(value) - .map_err(|_| anyhow!("action '{}' field '{}' is out of range for u32", action, key)) -} - -fn parse_review_verdict(raw: &str) -> Result { +fn parse_review_verdict(raw: &str) -> Result { match raw { - "APPROVE" | "approve" => Ok(ReviewVerdict::Approve), + "APPROVE" | "approve" => Ok(implement::ReviewVerdict::Approve), "CHANGES_REQUESTED" | "changes_requested" | "changes-requested" => { - Ok(ReviewVerdict::ChangesRequested) + Ok(implement::ReviewVerdict::ChangesRequested) } - "BLOCKED" | "blocked" => Ok(ReviewVerdict::Blocked), + "BLOCKED" | "blocked" => Ok(implement::ReviewVerdict::Blocked), _ => bail!( "unknown review verdict '{}'; expected APPROVE|CHANGES_REQUESTED|BLOCKED", raw diff --git a/src/workflow/interop.rs b/src/workflow/interop.rs new file mode 100644 index 0000000..ef5743b --- /dev/null +++ b/src/workflow/interop.rs @@ -0,0 +1,412 @@ +use std::path::Path; + +use anyhow::{Result, anyhow, bail}; +use serde_json::{Map, Value}; + +use super::{ + ActionContext, ActionDispatch, DoneContext, DoneDispatch, HostContext, HostContextMut, + JoinContext, JoinDispatch, NextPollContext, NextPollDispatch, StatusContext, StatusDispatch, + WorkflowDispatch, WorkflowRegistry, dispatch_action, dispatch_done, dispatch_join, + dispatch_next_poll, dispatch_status, +}; +use crate::state::SessionState; + +const CHILDREN_KEY: &str = "_children"; + +#[derive(Clone, Copy, Debug)] +pub struct DelegationTarget<'a> { + pub workflow_id: &'a str, + pub namespace: &'a str, +} + +pub fn delegate_join( + parent: &mut SessionState, + session_dir: &Path, + registry: &WorkflowRegistry, + target: DelegationTarget<'_>, + agent: &str, +) -> Result { + with_child_session(parent, session_dir, registry, target, |session_dir, workflow, child| { + let mut ctx = JoinContext { + host: HostContextMut { + session_dir, + state: child, + }, + agent, + }; + match dispatch_join(workflow.as_ref(), &mut ctx)? { + WorkflowDispatch::Join(dispatch) => Ok(dispatch), + other => bail!( + "workflow '{}' returned unexpected delegation dispatch for join: {:?}", + workflow.id(), + other + ), + } + }) +} + +pub fn delegate_next_poll( + parent: &mut SessionState, + session_dir: &Path, + registry: &WorkflowRegistry, + target: DelegationTarget<'_>, + agent: &str, +) -> Result { + with_child_session(parent, session_dir, registry, target, |session_dir, workflow, child| { + let mut ctx = NextPollContext { + host: HostContextMut { + session_dir, + state: child, + }, + agent, + }; + match dispatch_next_poll(workflow.as_ref(), &mut ctx)? { + WorkflowDispatch::NextPoll(dispatch) => Ok(dispatch), + other => bail!( + "workflow '{}' returned unexpected delegation dispatch for next poll: {:?}", + workflow.id(), + other + ), + } + }) +} + +pub fn delegate_done( + parent: &mut SessionState, + session_dir: &Path, + registry: &WorkflowRegistry, + target: DelegationTarget<'_>, + agent: &str, +) -> Result { + with_child_session(parent, session_dir, registry, target, |session_dir, workflow, child| { + let mut ctx = DoneContext { + host: HostContextMut { + session_dir, + state: child, + }, + agent, + }; + match dispatch_done(workflow.as_ref(), &mut ctx)? { + WorkflowDispatch::Done(dispatch) => Ok(dispatch), + other => bail!( + "workflow '{}' returned unexpected delegation dispatch for done: {:?}", + workflow.id(), + other + ), + } + }) +} + +pub fn delegate_action( + parent: &mut SessionState, + session_dir: &Path, + registry: &WorkflowRegistry, + target: DelegationTarget<'_>, + agent: &str, + name: &str, + args: &Value, +) -> Result { + with_child_session(parent, session_dir, registry, target, |session_dir, workflow, child| { + let mut ctx = ActionContext { + host: HostContextMut { + session_dir, + state: child, + }, + agent, + name, + args, + }; + match dispatch_action(workflow.as_ref(), &mut ctx)? { + WorkflowDispatch::Action(dispatch) => Ok(dispatch), + other => bail!( + "workflow '{}' returned unexpected delegation dispatch for action '{}': {:?}", + workflow.id(), + name, + other + ), + } + }) +} + +pub fn delegate_status( + parent: &SessionState, + session_dir: &Path, + registry: &WorkflowRegistry, + target: DelegationTarget<'_>, + agent: Option<&str>, +) -> Result { + let child = projected_child_session(parent, target)?; + let workflow = registry.resolve_str(target.workflow_id)?; + let ctx = StatusContext { + host: HostContext { + session_dir, + state: &child, + }, + agent, + }; + match dispatch_status(workflow.as_ref(), &ctx)? { + WorkflowDispatch::Status(dispatch) => Ok(dispatch), + other => bail!( + "workflow '{}' returned unexpected delegation dispatch for status: {:?}", + workflow.id(), + other + ), + } +} + +pub fn child_workflow_state(parent: &SessionState, namespace: &str) -> Result { + extract_child_state(parent, namespace) +} + +fn with_child_session( + parent: &mut SessionState, + session_dir: &Path, + registry: &WorkflowRegistry, + target: DelegationTarget<'_>, + run: F, +) -> Result +where + F: FnOnce(&Path, super::WorkflowRef, &mut SessionState) -> Result, +{ + validate_namespace(target.namespace)?; + let workflow = registry.resolve_str(target.workflow_id)?; + let mut child = projected_child_session(parent, target)?; + + let result = run(session_dir, workflow, &mut child)?; + commit_child_session(parent, &child, target.namespace)?; + Ok(result) +} + +fn projected_child_session(parent: &SessionState, target: DelegationTarget<'_>) -> Result { + validate_namespace(target.namespace)?; + let mut child = parent.clone(); + child.workflow_id = Some(target.workflow_id.to_string()); + child.workflow_state = Some(extract_child_state(parent, target.namespace)?); + Ok(child) +} + +fn commit_child_session( + parent: &mut SessionState, + child: &SessionState, + namespace: &str, +) -> Result<()> { + validate_namespace(namespace)?; + + let mut root = root_state_object(parent.workflow_state.take())?; + let mut children = root + .remove(CHILDREN_KEY) + .and_then(|v| v.as_object().cloned()) + .unwrap_or_default(); + let child_state = child + .workflow_state + .clone() + .unwrap_or_else(|| Value::Object(Map::new())); + children.insert(namespace.to_string(), child_state); + root.insert(CHILDREN_KEY.to_string(), Value::Object(children)); + + parent.workflow_state = Some(Value::Object(root)); + parent.phase = child.phase.clone(); + parent.agents = child.agents.clone(); + + Ok(()) +} + +fn extract_child_state(parent: &SessionState, namespace: &str) -> Result { + validate_namespace(namespace)?; + + let root = root_state_object(parent.workflow_state.clone())?; + let child = root + .get(CHILDREN_KEY) + .and_then(Value::as_object) + .and_then(|children| children.get(namespace)) + .cloned() + .unwrap_or_else(|| Value::Object(Map::new())); + + Ok(child) +} + +fn root_state_object(workflow_state: Option) -> Result> { + match workflow_state { + None => Ok(Map::new()), + Some(Value::Object(map)) => Ok(map), + Some(other) => Err(anyhow!( + "delegation requires object workflow_state root, got {}", + json_type_name(&other) + )), + } +} + +fn validate_namespace(namespace: &str) -> Result<()> { + if namespace.is_empty() { + bail!("delegation namespace cannot be empty"); + } + if namespace + .chars() + .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-') + { + return Ok(()); + } + bail!( + "delegation namespace '{}' is invalid; use [a-zA-Z0-9_-]", + namespace + ) +} + +fn json_type_name(value: &Value) -> &'static str { + match value { + Value::Null => "null", + Value::Bool(_) => "boolean", + Value::Number(_) => "number", + Value::String(_) => "string", + Value::Array(_) => "array", + Value::Object(_) => "object", + } +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeMap; + use std::path::Path; + + use anyhow::Result; + use serde_json::json; + + use super::*; + use crate::state::{AgentState, Config, SessionPhase, SessionState, SessionType, now}; + use crate::workflow::{Workflow, WorkflowRegistry}; + + struct ChildWorkflow; + + impl Workflow for ChildWorkflow { + fn id(&self) -> &str { + "test/child" + } + + fn on_join(&self, _ctx: &mut JoinContext<'_>) -> Result { + Ok(JoinDispatch::default()) + } + + fn poll_next(&self, _ctx: &mut NextPollContext<'_>) -> Result { + Ok(NextPollDispatch::Wait { hint: None }) + } + + fn on_done(&self, _ctx: &mut DoneContext<'_>) -> Result { + Ok(DoneDispatch { + message: "done".to_string(), + next_phase: None, + }) + } + + fn on_action(&self, ctx: &mut ActionContext<'_>) -> Result { + if ctx.name == "fail" { + ctx.host.state.workflow_state = Some(json!({"mutated": true})); + anyhow::bail!("intentional child failure") + } + ctx.host.state.workflow_state = Some(json!({"ok": true})); + Ok(ActionDispatch { + message: "ok".to_string(), + }) + } + + fn format_status(&self, _ctx: &StatusContext<'_>) -> Result { + Ok(StatusDispatch { lines: vec![] }) + } + } + + fn parent_state() -> SessionState { + let mut agents = BTreeMap::new(); + let ts = now(); + agents.insert( + "implementer".to_string(), + AgentState { + name: "implementer".to_string(), + joined_at: ts, + last_seen: ts, + phase_status: "joined".to_string(), + rounds: 0, + }, + ); + + SessionState { + name: "session".to_string(), + session_type: SessionType::Workflow, + phase: SessionPhase::Implement, + created_at: ts, + config: Config { + expected_agents: 1, + max_rounds: 1, + turn_timeout_secs: 1, + review_timeout_secs: 1, + }, + agents, + topic: None, + todo_path: None, + workspace: None, + workflow_id: Some("parent/workflow".to_string()), + workflow_source: None, + workflow_version: None, + workflow_state: Some(json!({"parent": true})), + } + } + + #[test] + fn delegation_success_commits_namespaced_child_state() -> Result<()> { + let mut registry = WorkflowRegistry::new(); + registry.register(ChildWorkflow)?; + + let mut parent = parent_state(); + let args = json!({}); + let dispatch = delegate_action( + &mut parent, + Path::new("/tmp"), + ®istry, + DelegationTarget { + workflow_id: "test/child", + namespace: "child_a", + }, + "implementer", + "ack", + &args, + )?; + + assert_eq!(dispatch.message, "ok"); + assert_eq!( + parent.workflow_state, + Some(json!({ + "parent": true, + "_children": { + "child_a": { "ok": true } + } + })) + ); + Ok(()) + } + + #[test] + fn delegation_failure_does_not_mutate_parent_state() -> Result<()> { + let mut registry = WorkflowRegistry::new(); + registry.register(ChildWorkflow)?; + + let mut parent = parent_state(); + let before = parent.workflow_state.clone(); + let args = json!({}); + + let err = delegate_action( + &mut parent, + Path::new("/tmp"), + ®istry, + DelegationTarget { + workflow_id: "test/child", + namespace: "child_b", + }, + "implementer", + "fail", + &args, + ) + .expect_err("delegation should fail"); + assert!(err.to_string().contains("intentional child failure")); + assert_eq!(parent.workflow_state, before); + + Ok(()) + } +} diff --git a/src/workflow/mod.rs b/src/workflow/mod.rs index 9a1a7d7..b01dbc3 100644 --- a/src/workflow/mod.rs +++ b/src/workflow/mod.rs @@ -3,9 +3,10 @@ use std::{collections::BTreeMap, fmt, path::Path, sync::Arc}; use anyhow::{Result, anyhow, bail}; use serde_json::Value; -use crate::state::{SessionPhase, SessionState, SessionType}; +use crate::state::{SessionPhase, SessionState}; pub mod builtin; +pub mod interop; #[derive(Debug)] pub struct HostContextMut<'a> { @@ -220,27 +221,14 @@ fn format_available_ids(workflows: &BTreeMap) -> String .join(", ") } -pub fn default_registry() -> Result { - let mut registry = WorkflowRegistry::new(); - registry.register(builtin::PlanWorkflow)?; - registry.register(builtin::BuildWorkflow)?; - Ok(registry) -} - -pub fn workflow_id_for_session_type(session_type: &SessionType) -> Result { - let raw = match session_type { - SessionType::Plan => builtin::BUILTIN_PLAN_WORKFLOW_ID, - SessionType::Implement => builtin::BUILTIN_BUILD_WORKFLOW_ID, - SessionType::Workflow => bail!("workflow sessions require an explicit workflow_id"), - }; - WorkflowId::parse(raw) -} - pub fn workflow_id_for_session(state: &SessionState) -> Result { - if let Some(id) = &state.workflow_id { - return WorkflowId::parse(id); - } - workflow_id_for_session_type(&state.session_type) + let id = state.workflow_id.as_deref().ok_or_else(|| { + anyhow!( + "session '{}' is missing workflow_id; recreate with an explicit workflow id", + state.name + ) + })?; + WorkflowId::parse(id) } pub trait Workflow: Send + Sync { @@ -279,15 +267,24 @@ pub trait Workflow: Send + Sync { fn format_status(&self, ctx: &StatusContext<'_>) -> Result; } -pub fn dispatch_create(workflow: &dyn Workflow, ctx: &mut CreateContext<'_>) -> Result { +pub fn dispatch_create( + workflow: &dyn Workflow, + ctx: &mut CreateContext<'_>, +) -> Result { Ok(WorkflowDispatch::Create(workflow.on_create(ctx)?)) } -pub fn dispatch_init(workflow: &dyn Workflow, ctx: &mut InitContext<'_>) -> Result { +pub fn dispatch_init( + workflow: &dyn Workflow, + ctx: &mut InitContext<'_>, +) -> Result { Ok(WorkflowDispatch::Init(workflow.on_init(ctx)?)) } -pub fn dispatch_join(workflow: &dyn Workflow, ctx: &mut JoinContext<'_>) -> Result { +pub fn dispatch_join( + workflow: &dyn Workflow, + ctx: &mut JoinContext<'_>, +) -> Result { Ok(WorkflowDispatch::Join(workflow.on_join(ctx)?)) } @@ -298,15 +295,24 @@ pub fn dispatch_next_poll( Ok(WorkflowDispatch::NextPoll(workflow.poll_next(ctx)?)) } -pub fn dispatch_done(workflow: &dyn Workflow, ctx: &mut DoneContext<'_>) -> Result { +pub fn dispatch_done( + workflow: &dyn Workflow, + ctx: &mut DoneContext<'_>, +) -> Result { Ok(WorkflowDispatch::Done(workflow.on_done(ctx)?)) } -pub fn dispatch_action(workflow: &dyn Workflow, ctx: &mut ActionContext<'_>) -> Result { +pub fn dispatch_action( + workflow: &dyn Workflow, + ctx: &mut ActionContext<'_>, +) -> Result { Ok(WorkflowDispatch::Action(workflow.on_action(ctx)?)) } -pub fn dispatch_status(workflow: &dyn Workflow, ctx: &StatusContext<'_>) -> Result { +pub fn dispatch_status( + workflow: &dyn Workflow, + ctx: &StatusContext<'_>, +) -> Result { Ok(WorkflowDispatch::Status(workflow.format_status(ctx)?)) } @@ -317,7 +323,7 @@ mod tests { use serde_json::json; use super::*; - use crate::state::{Config, FinalizationState, SessionType}; + use crate::state::{Config, SessionType}; #[derive(Default)] struct MockWorkflow { @@ -426,9 +432,6 @@ mod tests { review_timeout_secs: 1, }, agents: BTreeMap::new(), - turn: None, - issues: Vec::new(), - finalization: FinalizationState::default(), topic: None, todo_path: None, workspace: None, @@ -436,12 +439,6 @@ mod tests { workflow_source: None, workflow_version: None, workflow_state: None, - implementer_agent: None, - steps: Vec::new(), - current_step_idx: 0, - checkpoint: None, - reviews: Vec::new(), - pending_feedback: Vec::new(), } } @@ -575,7 +572,15 @@ mod tests { let calls = workflow.calls.lock().expect("lock").clone(); assert_eq!( calls, - vec!["create", "init", "join", "next_poll", "done", "action", "status"] + vec![ + "create", + "init", + "join", + "next_poll", + "done", + "action", + "status" + ] ); } @@ -646,8 +651,9 @@ mod tests { let err = registry .register(MockWorkflow::default()) .expect_err("duplicate should fail"); - assert!(err - .to_string() - .contains("workflow id 'test/mock' is already registered")); + assert!( + err.to_string() + .contains("workflow id 'test/mock' is already registered") + ); } } diff --git a/tests/extensibility_mvp.rs b/tests/extensibility_mvp.rs index 9f60117..1cad118 100644 --- a/tests/extensibility_mvp.rs +++ b/tests/extensibility_mvp.rs @@ -16,8 +16,8 @@ use rally::{ state::{self, SessionHandle, SessionPhase}, workflow::{ self, ActionContext, ActionDispatch, CreateContext, DoneContext, DoneDispatch, - InitContext, JoinContext, JoinDispatch, NextPollContext, NextPollDispatch, StatusContext, - StatusDispatch, + HostContextMut, InitContext, JoinContext, JoinDispatch, NextPollContext, NextPollDispatch, + StatusContext, StatusDispatch, WorkflowDispatch, }, }; @@ -54,11 +54,19 @@ fn run_cli(args: &[&str], registry: &rally::WorkflowRegistry) -> Result { run_cli_with_registry(cli, registry) } +fn builtin_registry() -> Result { + let mut registry = rally::WorkflowRegistry::new(); + registry.register(workflow::builtin::PlanWorkflow)?; + registry.register(workflow::builtin::BuildWorkflow)?; + registry.register(workflow::builtin::ComposedNegotiateWorkflow)?; + Ok(registry) +} + static DUPLICATE_CREATE_HOOK_CALLS: AtomicUsize = AtomicUsize::new(0); #[test] fn regression_plan_and_build_happy_paths_with_namespaced_commands() -> Result<()> { - let registry = workflow::default_registry()?; + let registry = builtin_registry()?; let workspace = PathBuf::from(env!("CARGO_MANIFEST_DIR")); let build_session = unique_name("rally-build-it"); @@ -214,7 +222,10 @@ fn regression_plan_and_build_happy_paths_with_namespaced_commands() -> Result<() )?, 0 ); - write_file(&plan_dir.join("analysis").join("planner.md"), "# Analysis\n\nNo issues."); + write_file( + &plan_dir.join("analysis").join("planner.md"), + "# Analysis\n\nNo issues.", + ); commands::done_with_registry( &DoneArgs { session: plan_session.clone(), @@ -262,7 +273,7 @@ fn regression_plan_and_build_happy_paths_with_namespaced_commands() -> Result<() #[test] fn namespaced_plan_file_issue_cli_path_executes() -> Result<()> { - let registry = workflow::default_registry()?; + let registry = builtin_registry()?; let session = unique_name("rally-plan-cmd-it"); let _ = fs::remove_dir_all(state::session_dir(&session)); @@ -312,16 +323,185 @@ fn namespaced_plan_file_issue_cli_path_executes() -> Result<()> { let handle = SessionHandle::open(&session)?; let state = handle.load_state()?; - assert_eq!(state.issues.len(), 1); + let plan_state = rally_workflow_plan::read_workflow_state(&state)?; + assert_eq!(plan_state.issues.len(), 1); + drop(handle); + + let _ = fs::remove_dir_all(state::session_dir(&session)); + Ok(()) +} + +#[test] +fn composed_negotiate_reuses_templates_and_enforces_strict_policy() -> Result<()> { + let registry = builtin_registry()?; + let session = unique_name("rally-negotiate-it"); + let _ = fs::remove_dir_all(state::session_dir(&session)); + + run_cli( + &[ + "rally", + "create", + "workflow", + "--name", + &session, + "--workflow", + "demo/negotiate", + "--agents", + "3", + "--topic", + "stability", + ], + ®istry, + )?; + for agent in ["agent-a", "agent-b", "agent-c"] { + commands::join_with_registry( + &JoinArgs { + session: session.clone(), + agent: agent.to_string(), + timeout: None, + }, + ®istry, + )?; + } + + let handle = SessionHandle::open(&session)?; + let mut state = handle.load_state()?; + let workflow = registry.resolve_str("demo/negotiate")?; + let mut ctx = NextPollContext { + host: HostContextMut { + session_dir: &handle.session_dir, + state: &mut state, + }, + agent: "agent-a", + }; + let body = match workflow::dispatch_next_poll(workflow.as_ref(), &mut ctx)? { + WorkflowDispatch::NextPoll(NextPollDispatch::Instruction { body }) => body, + other => panic!("unexpected poll dispatch: {other:?}"), + }; + let expected = rally_workflow_plan::proposal_instruction( + &handle + .session_dir + .join("sources") + .join("agent-a-proposal.md") + .display() + .to_string(), + "stability", + &session, + "agent-a", + ); + assert_eq!(body, expected); + + state.phase = SessionPhase::Analysis; + handle.save_state(&state)?; + drop(handle); + + run_cli( + &[ + "rally", + "workflow", + "action", + "--session", + &session, + "--as", + "agent-a", + "--name", + "file-issue", + "--args-json", + r#"{"title":"Issue Merge Rule"}"#, + ], + ®istry, + )?; + let err = run_cli( + &[ + "rally", + "workflow", + "action", + "--session", + &session, + "--as", + "agent-b", + "--name", + "file-issue", + "--args-json", + r#"{"title":"Issue Merge Rule"}"#, + ], + ®istry, + ) + .expect_err("duplicate issue title must be rejected"); + assert!(err.to_string().contains("duplicate issue title")); + + let handle = SessionHandle::open(&session)?; + let mut state = handle.load_state()?; + state.phase = SessionPhase::Negotiation; + let mut plan_state = rally_workflow_plan::read_workflow_state(&state)?; + let issue = plan_state + .issues + .iter_mut() + .find(|issue| issue.id == 1) + .expect("issue 1 should exist"); + issue.positions.insert("agent-a".to_string()); + issue.positions.insert("agent-b".to_string()); + issue.challenges.clear(); + issue.challenges.insert("agent-b".to_string()); + plan_state.turn = Some(rally_workflow_plan::TurnState { + holder: "agent-a".to_string(), + started_at: state::now(), + round: 1, + }); + state.workflow_state = Some(serde_json::to_value(plan_state)?); + handle.save_state(&state)?; + drop(handle); + + let err = run_cli( + &[ + "rally", + "workflow", + "action", + "--session", + &session, + "--as", + "agent-a", + "--name", + "agree", + "--args-json", + r#"{"issue":1}"#, + ], + ®istry, + ) + .expect_err("strict agreement gate should reject insufficient cross-agent challenges"); + assert!( + err.to_string() + .contains("at least two cross-agent challenges") + ); + + let handle = SessionHandle::open(&session)?; + let mut state = handle.load_state()?; + state.phase = SessionPhase::FinalizationReview; + let mut plan_state = rally_workflow_plan::read_workflow_state(&state)?; + for issue in &mut plan_state.issues { + issue.status = rally_workflow_plan::IssueStatus::Escalated; + } + state.workflow_state = Some(serde_json::to_value(plan_state)?); + handle.save_state(&state)?; drop(handle); + let err = run_cli( + &["rally", "done", "--session", &session, "--as", "agent-b"], + ®istry, + ) + .expect_err("finalization should fail without AGREED issues"); + assert!( + err.to_string() + .contains("at least one AGREED issue before finalization") + ); + let _ = fs::remove_dir_all(state::session_dir(&session)); Ok(()) } #[test] fn create_workflow_rejects_builtin_workflow_ids() -> Result<()> { - let registry = workflow::default_registry()?; + let registry = builtin_registry()?; let session = unique_name("rally-builtin-reject-it"); let err = run_cli( &[ @@ -338,7 +518,33 @@ fn create_workflow_rejects_builtin_workflow_ids() -> Result<()> { ®istry, ) .expect_err("builtin workflow id should be rejected"); - assert!(err.to_string().contains("is built-in; use `rally create plan`")); + assert!( + err.to_string() + .contains("is built-in; use `rally create plan`") + ); + assert!(!state::session_dir(&session).join("state.json").exists()); + let _ = fs::remove_dir_all(state::session_dir(&session)); + Ok(()) +} + +#[test] +fn create_demo_negotiate_requires_minimum_agents() -> Result<()> { + let registry = builtin_registry()?; + let session = unique_name("rally-negotiate-default-agents-it"); + let err = run_cli( + &[ + "rally", + "create", + "workflow", + "--name", + &session, + "--workflow", + "demo/negotiate", + ], + ®istry, + ) + .expect_err("demo/negotiate should fail fast without --agents >= 3"); + assert!(err.to_string().contains("requires --agents >= 3")); assert!(!state::session_dir(&session).join("state.json").exists()); let _ = fs::remove_dir_all(state::session_dir(&session)); Ok(()) @@ -401,7 +607,7 @@ impl Workflow for LifecycleWorkflow { #[test] fn lifecycle_hooks_create_init_join_are_invoked_by_real_commands() -> Result<()> { - let mut registry = workflow::default_registry()?; + let mut registry = builtin_registry()?; registry.register(LifecycleWorkflow)?; let session = unique_name("rally-lifecycle-it"); let _ = fs::remove_dir_all(state::session_dir(&session)); @@ -493,7 +699,7 @@ impl Workflow for DuplicateCreateHookWorkflow { #[test] fn duplicate_create_does_not_run_lifecycle_hooks() -> Result<()> { DUPLICATE_CREATE_HOOK_CALLS.store(0, Ordering::SeqCst); - let mut registry = workflow::default_registry()?; + let mut registry = builtin_registry()?; registry.register(DuplicateCreateHookWorkflow)?; let session = unique_name("rally-dup-create-hook-it"); @@ -569,7 +775,7 @@ impl Workflow for FailingCreateHookWorkflow { #[test] fn create_hook_failure_rolls_back_session_dir() -> Result<()> { - let mut registry = workflow::default_registry()?; + let mut registry = builtin_registry()?; registry.register(FailingCreateHookWorkflow)?; let session = unique_name("rally-failing-create-hook-it"); let _ = fs::remove_dir_all(state::session_dir(&session)); @@ -589,9 +795,7 @@ fn create_hook_failure_rolls_back_session_dir() -> Result<()> { ®istry, ) .expect_err("create should fail due to create hook"); - assert!(err - .to_string() - .contains("intentional create hook failure")); + assert!(err.to_string().contains("intentional create hook failure")); assert!( !state::session_dir(&session).exists(), "failed create should not leave session dir on disk" @@ -602,7 +806,7 @@ fn create_hook_failure_rolls_back_session_dir() -> Result<()> { #[test] fn create_hook_failure_preserves_preexisting_session_files() -> Result<()> { - let mut registry = workflow::default_registry()?; + let mut registry = builtin_registry()?; registry.register(FailingCreateHookWorkflow)?; let session = unique_name("rally-preexisting-create-hook-it"); let dir = state::session_dir(&session); @@ -626,9 +830,7 @@ fn create_hook_failure_preserves_preexisting_session_files() -> Result<()> { ®istry, ) .expect_err("create should fail due to create hook"); - assert!(err - .to_string() - .contains("intentional create hook failure")); + assert!(err.to_string().contains("intentional create hook failure")); assert!(sentinel.exists(), "preexisting file must be preserved"); assert!(!dir.join("state.json").exists()); assert!(!dir.join("state.lock").exists()); @@ -702,7 +904,7 @@ impl Workflow for CustomWorkflow { #[test] fn custom_buildtime_workflow_create_action_done_loop() -> Result<()> { - let mut registry = workflow::default_registry()?; + let mut registry = builtin_registry()?; registry.register(CustomWorkflow)?; let session = unique_name("rally-custom-it"); @@ -822,7 +1024,7 @@ impl Workflow for ErrorWorkflow { #[test] fn workflow_errors_do_not_partially_write_state() -> Result<()> { - let mut registry = workflow::default_registry()?; + let mut registry = builtin_registry()?; registry.register(ErrorWorkflow)?; let session = unique_name("rally-lock-it");