diff --git a/dabgent/Cargo.lock b/dabgent/Cargo.lock index ef3309b0..fe2980d0 100644 --- a/dabgent/Cargo.lock +++ b/dabgent/Cargo.lock @@ -638,6 +638,7 @@ dependencies = [ name = "dabgent_agent" version = "0.1.0" dependencies = [ + "async-trait", "chrono", "clap", "dabgent_integrations", diff --git a/dabgent/dabgent_agent/Cargo.toml b/dabgent/dabgent_agent/Cargo.toml index 6225ece5..7d0661e7 100644 --- a/dabgent/dabgent_agent/Cargo.toml +++ b/dabgent/dabgent_agent/Cargo.toml @@ -6,6 +6,7 @@ edition = "2024" [dependencies] tokio = { version = "1", features = ["full"] } serde = { version = "1", features = ["derive"] } +async-trait = "0.1" eyre = "0.6" chrono = { version = "0.4", features = ["serde"] } serde_json = "1" diff --git a/dabgent/dabgent_agent/examples/basic.rs b/dabgent/dabgent_agent/examples/basic.rs index abbd1fe4..4f6a1474 100644 --- a/dabgent/dabgent_agent/examples/basic.rs +++ b/dabgent/dabgent_agent/examples/basic.rs @@ -40,9 +40,10 @@ pub async fn pipeline_fn(stream_id: &str, store: impl EventStore) -> Result<()> store.clone(), ); let tool_processor = ToolProcessor::new(sandbox.boxed(), store.clone(), tools, None); + let completion_processor = dabgent_agent::processor::CompletionProcessor::new(store.clone()); let pipeline = Pipeline::new( store.clone(), - vec![thread_processor.boxed(), tool_processor.boxed()], + vec![thread_processor.boxed(), tool_processor.boxed(), completion_processor.boxed()], ); pipeline.run(stream_id.clone()).await?; Ok(()) diff --git a/dabgent/dabgent_agent/examples/databricks_explorer.rs b/dabgent/dabgent_agent/examples/databricks_explorer.rs index a2d790ca..b57a1ff8 100644 --- a/dabgent/dabgent_agent/examples/databricks_explorer.rs +++ b/dabgent/dabgent_agent/examples/databricks_explorer.rs @@ -1,7 +1,8 @@ -use dabgent_agent::processor::{CompactProcessor, Pipeline, Processor, ThreadProcessor, ToolProcessor}; +use dabgent_agent::processor::{DelegationProcessor, Pipeline, Processor, ThreadProcessor, ToolProcessor}; +use dabgent_agent::processor::delegation::compaction::CompactionHandler; use dabgent_agent::toolbox::{databricks::databricks_toolset, ToolDyn}; use dabgent_mq::{EventStore, create_store, StoreConfig}; -use dabgent_sandbox::Sandbox; +use dabgent_sandbox::{Sandbox, NoOpSandbox}; use eyre::Result; use rig::client::ProviderClient; @@ -53,23 +54,28 @@ async fn main() -> Result<()> { // Set up processors let thread_processor = ThreadProcessor::new(llm, store.clone()); let tool_processor = ToolProcessor::new( - DummySandbox::new().boxed(), + NoOpSandbox::new().boxed(), store.clone(), tools, None, ); - let compact_processor = CompactProcessor::new( + + // Set up delegation processor with compaction handler + let compaction_handler = CompactionHandler::new(2048)?; // Compact threshold + let delegation_processor = DelegationProcessor::new( store.clone(), - 2048, // Compact threshold - keep context manageable MODEL.to_string(), + vec![Box::new(compaction_handler)], ); + let completion_processor = dabgent_agent::processor::CompletionProcessor::new(store.clone()); let pipeline = Pipeline::new( store, vec![ thread_processor.boxed(), tool_processor.boxed(), - compact_processor.boxed(), + completion_processor.boxed(), + delegation_processor.boxed(), ], ); @@ -91,56 +97,6 @@ Please help me locate tables that contain bakery or food sales information. I'm Can you explore the Unity Catalog and tell me what bakery-related sales data is available? "#; -/// Dummy sandbox for Databricks tools that don't need actual file operations -struct DummySandbox; - -impl DummySandbox { - fn new() -> Self { - Self - } -} - -impl Sandbox for DummySandbox { - async fn exec(&mut self, _command: &str) -> Result { - Ok(dabgent_sandbox::ExecResult { - exit_code: 0, - stdout: String::new(), - stderr: String::new(), - }) - } - - async fn write_file(&mut self, _path: &str, _content: &str) -> Result<()> { - Ok(()) - } - - async fn write_files(&mut self, _files: Vec<(&str, &str)>) -> Result<()> { - Ok(()) - } - - async fn read_file(&self, _path: &str) -> Result { - Ok(String::new()) - } - - async fn delete_file(&mut self, _path: &str) -> Result<()> { - Ok(()) - } - - async fn list_directory(&self, _path: &str) -> Result> { - Ok(Vec::new()) - } - - async fn set_workdir(&mut self, _path: &str) -> Result<()> { - Ok(()) - } - - async fn export_directory(&self, _container_path: &str, _host_path: &str) -> Result { - Ok(String::new()) - } - - async fn fork(&self) -> Result { - Ok(DummySandbox) - } -} async fn push_llm_config( store: &S, diff --git a/dabgent/dabgent_agent/examples/planning.rs b/dabgent/dabgent_agent/examples/planning.rs index 8a93434a..9cc7967e 100644 --- a/dabgent/dabgent_agent/examples/planning.rs +++ b/dabgent/dabgent_agent/examples/planning.rs @@ -95,9 +95,10 @@ pub async fn planning_pipeline(stream_id: &str, store: impl EventStore + Clone, Some("planner".to_string()), ); + let planning_completion_processor = dabgent_agent::processor::CompletionProcessor::new(store.clone()); let planning_pipeline = Pipeline::new( store.clone(), - vec![planning_thread.boxed(), planning_tool_processor.boxed()], + vec![planning_thread.boxed(), planning_tool_processor.boxed(), planning_completion_processor.boxed()], ); let pipeline_handle = tokio::spawn({ @@ -155,9 +156,10 @@ pub async fn planning_pipeline(stream_id: &str, store: impl EventStore + Clone, None, ); + let execution_completion_processor = dabgent_agent::processor::CompletionProcessor::new(store.clone()); let execution_pipeline = Pipeline::new( store.clone(), - vec![execution_thread.boxed(), execution_tool_processor.boxed()], + vec![execution_thread.boxed(), execution_tool_processor.boxed(), execution_completion_processor.boxed()], ); let exec_handle = tokio::spawn({ diff --git a/dabgent/dabgent_agent/src/event.rs b/dabgent/dabgent_agent/src/event.rs index 1b64494d..a9dabe67 100644 --- a/dabgent/dabgent_agent/src/event.rs +++ b/dabgent/dabgent_agent/src/event.rs @@ -11,7 +11,10 @@ pub struct ParentAggregate { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub enum ToolKind { Done, - Other(String), + ExploreDatabricksCatalog, + FinishDelegation, + CompactError, + Regular(String), } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -40,6 +43,7 @@ pub enum Event { ArtifactsCollected(HashMap), TaskCompleted { success: bool, + summary: String, }, SeedSandboxFromTemplate { template_path: String, @@ -58,6 +62,16 @@ pub enum Event { PlanUpdated { tasks: Vec, }, + DelegateWork { + agent_type: String, + prompt: String, + parent_tool_id: String, + }, + WorkComplete { + agent_type: String, + result: String, + parent: ParentAggregate, + }, } impl dabgent_mq::Event for Event { @@ -76,6 +90,8 @@ impl dabgent_mq::Event for Event { Event::PipelineShutdown => "pipeline_shutdown", Event::PlanCreated { .. } => "plan_created", Event::PlanUpdated { .. } => "plan_updated", + Event::DelegateWork { .. } => "delegate_work", + Event::WorkComplete { .. } => "work_complete", } } } diff --git a/dabgent/dabgent_agent/src/llm.rs b/dabgent/dabgent_agent/src/llm.rs index 962fb0ef..75dbaeb2 100644 --- a/dabgent/dabgent_agent/src/llm.rs +++ b/dabgent/dabgent_agent/src/llm.rs @@ -4,9 +4,9 @@ use std::pin::Pin; use std::time::Duration; use tokio::time::sleep; -const MAX_COMPLETION_ATTEMPTS: usize = 4; +const MAX_COMPLETION_ATTEMPTS: usize = 7; const BASE_BACKOFF_MS: u64 = 250; -const MAX_BACKOFF_MS: u64 = 5000; +const MAX_BACKOFF_MS: u64 = 10000; fn backoff_delay_ms(attempt: usize) -> u64 { let exp = BASE_BACKOFF_MS.saturating_mul(1 << (attempt.saturating_sub(1))); @@ -178,6 +178,23 @@ impl RetryingLLM { impl LLMClient for RetryingLLM { async fn completion(&self, completion: Completion) -> eyre::Result { + // Log payload information before making LLM call + let history_size = completion.history.len(); + let tools_count = completion.tools.len(); + let prompt_size = serde_json::to_string(&completion.prompt).map(|s| s.len()).unwrap_or(0); + let total_history_size = completion.history.iter() + .map(|m| serde_json::to_string(m).map(|s| s.len()).unwrap_or(0)) + .sum::(); + + tracing::info!( + model = %completion.model, + history_messages = history_size, + tools_count = tools_count, + prompt_size_bytes = prompt_size, + total_history_size_bytes = total_history_size, + "Starting LLM completion request" + ); + for attempt in 1..=self.max_attempts { match self.inner.completion(completion.clone()).await { Ok(resp) => return Ok(resp), diff --git a/dabgent/dabgent_agent/src/processor/compaction.rs b/dabgent/dabgent_agent/src/processor/compaction.rs deleted file mode 100644 index 4ac34f42..00000000 --- a/dabgent/dabgent_agent/src/processor/compaction.rs +++ /dev/null @@ -1,322 +0,0 @@ -use super::{Aggregate, Processor}; -use crate::event::{Event, ParentAggregate, TypedToolResult, ToolKind}; -use crate::processor::thread; -use dabgent_mq::{EventDb, EventStore, Query}; -use regex::Regex; -use uuid::Uuid; - -const COMPACTION_SYSTEM_PROMPT: &str = r#" -You are an error message compactor. Your role is to reduce verbose error messages while preserving critical debugging information. - -## Objectives -- Reduce error messages to the specified character limit while maintaining clarity -- Preserve essential information: error types, file paths, line numbers, root causes -- Remove unnecessary elements: repetitive stack traces, verbose details, redundant information - -## Output Format -Always wrap your compacted error message in tags. - -## Examples - -### Example 1: Python Traceback -Input: A 800-character Python traceback with multiple stack frames -``` -Traceback (most recent call last): - File "/app/main.py", line 15, in - result = process_data() - File "/app/main.py", line 10, in process_data - return data.split(',') -AttributeError: 'NoneType' object has no attribute 'split' -[... additional verbose stack frames ...] -``` -Output: AttributeError in main.py:10 - 'NoneType' object has no attribute 'split' - -### Example 2: Validation Errors -Input: Verbose validation error with nested field details -``` -ValidationError: Multiple validation errors occurred: -- Field 'name': This field is required and cannot be empty -- Field 'age': Value must be greater than or equal to 0 -- Field 'email': Invalid email format, must contain @ symbol -[... additional validation context ...] -``` -Output: ValidationError: 3 fields failed - name (required), age (min:0), email (invalid format) - -### Example 3: Build/Compilation Errors -Input: Long compilation error with multiple issues -``` -Error: Failed to compile TypeScript -src/components/UserForm.tsx(42,15): error TS2322: Type 'string' is not assignable to type 'number' -src/components/UserForm.tsx(45,8): error TS2304: Cannot find name 'useState' -[... additional compilation context ...] -``` -Output: TypeScript errors: UserForm.tsx:42 (string→number), :45 (useState undefined) - -## Instructions -Focus on the core issue and location. Remove implementation details that don't help identify the root cause. -"#; - -pub struct CompactProcessor { - event_store: E, - compaction_threshold: usize, // Character threshold to trigger compaction - compaction_model: String, -} - -impl Processor for CompactProcessor { - async fn run(&mut self, event: &EventDb) -> eyre::Result<()> { - match &event.data { - Event::ToolResult(content) if self.is_done_tool_result(content) && self.should_compact(content) => { - tracing::info!( - "Compaction triggered for aggregate {}", - event.aggregate_id, - ); - self.handle_compaction_request(event, content).await?; - } - Event::AgentMessage { response, recipient } if recipient.as_deref() == Some("compact_worker") => { - tracing::info!( - "Compaction received for aggregate {}", - event.aggregate_id, - ); - self.handle_compaction_response(event, response).await?; - } - Event::ToolResult(content) if !self.is_done_tool_result(content) || !self.should_compact(content) => { - self.handle_passthrough_tool_result(event, content).await?; - } - _ => {} - } - Ok(()) - } -} - -impl CompactProcessor { - pub fn new( - event_store: E, - compaction_threshold: usize, - compaction_model: String, - ) -> Self { - Self { - event_store, - compaction_threshold, - compaction_model, - } - } - - async fn handle_compaction_request( - &mut self, - event: &EventDb, - content: &[TypedToolResult], - ) -> eyre::Result<()> { - // Create compaction thread - let compact_thread_id = format!("compact_{}", Uuid::new_v4()); - - // Extract original tool_id for restoration later - let original_tool_id = content.first().map(|t| t.result.id.clone()); - - // Send LLMConfig first with parent tracking - self.event_store - .push_event( - &event.stream_id, - &compact_thread_id, - &Event::LLMConfig { - model: self.compaction_model.clone(), - temperature: 0.0, - max_tokens: 8192, - preamble: Some(COMPACTION_SYSTEM_PROMPT.to_string()), - tools: None, - recipient: Some("compact_worker".to_string()), - parent: Some(ParentAggregate { - aggregate_id: event.aggregate_id.clone(), - tool_id: original_tool_id, - }), - }, - &Default::default(), - ) - .await?; - - // Build compaction prompt and send UserMessage - let prompt = self.build_compaction_prompt(content); - self.event_store - .push_event( - &event.stream_id, - &compact_thread_id, - &Event::UserMessage(rig::OneOrMany::one( - rig::message::UserContent::Text(prompt), - )), - &Default::default(), - ) - .await?; - - Ok(()) - } - - async fn handle_compaction_response( - &mut self, - event: &EventDb, - response: &crate::llm::CompletionResponse, - ) -> eyre::Result<()> { - // Load compaction thread to get parent info from LLMConfig - let compact_query = Query::stream(&event.stream_id).aggregate(&event.aggregate_id); - let compact_events = self.event_store.load_events::(&compact_query, None).await?; - - // Find the LLMConfig event to get parent info - let parent_info = compact_events.iter() - .find_map(|e| match e { - Event::LLMConfig { parent, .. } => parent.as_ref(), - _ => None, - }); - - if let Some(parent) = parent_info { - if let Some(tool_id) = &parent.tool_id { - // Extract compacted content from LLM response - let compacted_text = self.extract_compacted_content(response); - - // Create compacted ToolResult with original tool_id, wrapped as TypedToolResult - let compacted_result = vec![TypedToolResult { - tool_name: ToolKind::Done, - result: rig::message::ToolResult { - id: tool_id.clone(), - call_id: None, - content: rig::OneOrMany::one(rig::message::ToolResultContent::Text( - compacted_text.into() - )), - }, - }]; - - // Convert compacted ToolResult directly to UserMessage for original thread - let tools = compacted_result.iter().map(|t| rig::message::UserContent::ToolResult(t.result.clone())); - let user_content = rig::OneOrMany::many(tools)?; - - // Load original thread state and process - let original_query = Query::stream(&event.stream_id).aggregate(&parent.aggregate_id); - let events = self.event_store.load_events::(&original_query, None).await?; - let mut thread = thread::Thread::fold(&events); - let new_events = thread.process(thread::Command::User(user_content))?; - - for new_event in new_events.iter() { - self.event_store - .push_event( - &event.stream_id, - &parent.aggregate_id, - new_event, - &Default::default(), - ) - .await?; - } - } - } - - Ok(()) - } - - async fn handle_passthrough_tool_result( - &mut self, - event: &EventDb, - content: &[TypedToolResult], - ) -> eyre::Result<()> { - // Convert to UserMessage for original thread (same aggregate) - let tools = content.iter().map(|t| rig::message::UserContent::ToolResult(t.result.clone())); - let user_content = rig::OneOrMany::many(tools)?; - - // Load original thread state and process - let original_query = Query::stream(&event.stream_id).aggregate(&event.aggregate_id); - let events = self.event_store.load_events::(&original_query, None).await?; - let mut thread = thread::Thread::fold(&events); - let new_events = thread.process(thread::Command::User(user_content))?; - - for new_event in new_events.iter() { - self.event_store - .push_event( - &event.stream_id, - &event.aggregate_id, - new_event, - &Default::default(), - ) - .await?; - } - - Ok(()) - } - - fn is_done_tool_result(&self, results: &[TypedToolResult]) -> bool { - results.iter().any(|t| t.tool_name == ToolKind::Done) - } - - fn should_compact(&self, results: &[TypedToolResult]) -> bool { - let size = self.calculate_text_size(results); - size > self.compaction_threshold - } - - fn calculate_text_size(&self, results: &[TypedToolResult]) -> usize { - results - .iter() - .map(|result| { - result.result - .content - .iter() - .map(|content| match content { - rig::message::ToolResultContent::Text(text) => text.text.len(), - _ => 0, // Skip non-text content for size calculation - }) - .sum::() - }) - .sum() - } - - - fn extract_text_content(&self, results: &[TypedToolResult]) -> String { - results - .iter() - .flat_map(|result| { - result.result.content.iter().filter_map(|content| match content { - rig::message::ToolResultContent::Text(text) => Some(text.text.clone()), - _ => None, // Skip non-text content - }) - }) - .collect::>() - .join("\n") - } - - fn build_compaction_prompt(&self, content: &[TypedToolResult]) -> rig::message::Text { - let text_content = self.extract_text_content(content); - rig::message::Text { - text: format!( - "Compact this error message to under {} characters:\n\n{}", - self.compaction_threshold, text_content - ), - } - } - - fn extract_tag(source: &str, tag: &str) -> Option { - // Match Python implementation: rf"<{tag}>(.*?)" with DOTALL - let pattern = format!(r"(?s)<{}>(.*?)", regex::escape(tag), regex::escape(tag)); - if let Ok(regex) = Regex::new(&pattern) { - if let Some(captures) = regex.captures(source) { - if let Some(content) = captures.get(1) { - return Some(content.as_str().trim().to_string()); - } - } - } - None - } - - fn extract_compacted_content(&self, response: &crate::llm::CompletionResponse) -> String { - let raw_response = response - .choice - .iter() - .filter_map(|c| match c { - rig::message::AssistantContent::Text(t) => Some(t.text.clone()), - _ => None, - }) - .collect::>() - .join("\n"); - - // Try to extract content from tags first - if let Some(extracted) = Self::extract_tag(&raw_response, "error") { - extracted - } else { - // If no tags found, return the raw response - tracing::warn!("LLM response did not contain tags, using raw response"); - raw_response - } - } -} diff --git a/dabgent/dabgent_agent/src/processor/completion.rs b/dabgent/dabgent_agent/src/processor/completion.rs new file mode 100644 index 00000000..189b7941 --- /dev/null +++ b/dabgent/dabgent_agent/src/processor/completion.rs @@ -0,0 +1,134 @@ +use super::Processor; +use crate::event::{Event, TypedToolResult, ToolKind}; +use dabgent_mq::{EventDb, EventStore, Query}; +use eyre::Result; + +pub struct CompletionProcessor { + event_store: E, +} + +impl CompletionProcessor { + pub fn new(event_store: E) -> Self { + Self { event_store } + } + + async fn emit_task_completed( + &mut self, + event: &EventDb, + result: &TypedToolResult, + ) -> Result<()> { + // extract summary from tool result content + let summary = result.result.content.iter() + .filter_map(|content| match content { + rig::message::ToolResultContent::Text(text) => Some(text.text.clone()), + _ => None, + }) + .collect::>() + .join("\n"); + + // check if Done tool succeeded by examining the tool result content structure + let success = result.result.content.iter().all(|content| { + match content { + rig::message::ToolResultContent::Text(text) => { + // parse as JSON - if it has "error" field, Done failed + if let Ok(parsed) = serde_json::from_str::(&text.text) { + if let Some(obj) = parsed.as_object() { + !obj.contains_key("error") + } else { + true + } + } else { + true + } + } + _ => true, + } + }); + + if success { + tracing::info!("Task completed successfully, emitting TaskCompleted event"); + } else { + tracing::info!("Task completed with errors, emitting TaskCompleted event with success=false"); + } + + let task_completed_event = Event::TaskCompleted { + success, + summary: if summary.is_empty() { "Task completed".to_string() } else { summary } + }; + + self.event_store.push_event( + &event.stream_id, + &event.aggregate_id, + &task_completed_event, + &Default::default(), + ).await?; + + Ok(()) + } + + async fn emit_work_complete( + &mut self, + event: &EventDb, + result: &TypedToolResult, + ) -> Result<()> { + // load thread history to get parent info from LLMConfig + let query = Query::stream(&event.stream_id).aggregate(&event.aggregate_id); + let events = self.event_store.load_events::(&query, None).await?; + + // find LLMConfig with parent field + let parent = events.iter() + .find_map(|e| match e { + Event::LLMConfig { parent: Some(p), .. } => Some(p.clone()), + _ => None, + }) + .ok_or_else(|| eyre::eyre!("Missing parent info in LLMConfig for finish_delegation"))?; + + // extract result from tool result content + let summary = result.result.content.iter() + .filter_map(|content| match content { + rig::message::ToolResultContent::Text(text) => Some(text.text.clone()), + _ => None, + }) + .collect::>() + .join("\n"); + + tracing::info!("Delegated work completed, emitting WorkComplete event"); + + let work_complete_event = Event::WorkComplete { + agent_type: "delegated_worker".to_string(), + result: summary, + parent, + }; + + self.event_store.push_event( + &event.stream_id, + &event.aggregate_id, + &work_complete_event, + &Default::default(), + ).await?; + + Ok(()) + } +} + +impl Processor for CompletionProcessor { + async fn run(&mut self, event: &EventDb) -> eyre::Result<()> { + match &event.data { + Event::ToolResult(results) => { + for result in results { + match &result.tool_name { + ToolKind::Done => { + self.emit_task_completed(event, result).await?; + } + ToolKind::FinishDelegation => { + self.emit_work_complete(event, result).await?; + } + _ => {} + } + } + } + _ => {} + } + Ok(()) + } +} \ No newline at end of file diff --git a/dabgent/dabgent_agent/src/processor/delegation/compaction.rs b/dabgent/dabgent_agent/src/processor/delegation/compaction.rs new file mode 100644 index 00000000..d74e84fc --- /dev/null +++ b/dabgent/dabgent_agent/src/processor/delegation/compaction.rs @@ -0,0 +1,238 @@ +use super::{DelegationHandler, DelegationContext, DelegationResult, FinishDelegationTool}; +use async_trait::async_trait; +use crate::event::{Event, ParentAggregate, TypedToolResult, ToolKind}; +use crate::toolbox::ToolDyn; +use dabgent_sandbox::{SandboxDyn, NoOpSandbox, Sandbox}; +use eyre::Result; +use uuid::Uuid; + +const COMPACTION_SYSTEM_PROMPT: &str = r#" +You are an error message compactor. Your role is to reduce verbose error messages while preserving critical debugging information. + +## Objectives +- Reduce error messages to the specified character limit while maintaining clarity +- Preserve essential information: error types, file paths, line numbers, root causes +- Remove unnecessary elements: repetitive stack traces, verbose details, redundant information + +## Output Format +When you have compacted the error message, call the `finish_delegation` tool with your compacted result. + +## Examples + +### Example 1: Python Traceback +Input: A 800-character Python traceback with multiple stack frames +``` +Traceback (most recent call last): + File "/app/main.py", line 15, in + result = process_data() + File "/app/main.py", line 10, in process_data + return data.split(',') +AttributeError: 'NoneType' object has no attribute 'split' +[... additional verbose stack frames ...] +``` +Output: `finish_delegation(result="AttributeError in main.py:10 - 'NoneType' object has no attribute 'split'")` + +### Example 2: Validation Errors +Input: Verbose validation error with nested field details +``` +ValidationError: Multiple validation errors occurred: +- Field 'name': This field is required and cannot be empty +- Field 'age': Value must be greater than or equal to 0 +- Field 'email': Invalid email format, must contain @ symbol +[... additional validation context ...] +``` +Output: `finish_delegation(result="ValidationError: 3 fields failed - name (required), age (min:0), email (invalid format)")` + +### Example 3: Build/Compilation Errors +Input: Long compilation error with multiple issues +``` +Error: Failed to compile TypeScript +src/components/UserForm.tsx(42,15): error TS2322: Type 'string' is not assignable to type 'number' +src/components/UserForm.tsx(45,8): error TS2304: Cannot find name 'useState' +[... additional compilation context ...] +``` +Output: `finish_delegation(result="TypeScript errors: UserForm.tsx:42 (string→number), :45 (useState undefined)")` + +## Instructions +Focus on the core issue and location. Remove implementation details that don't help identify the root cause. +Always call `finish_delegation` with your compacted result when done. +"#; + +pub const TRIGGER_TOOL: &str = "compact_error"; +pub const THREAD_PREFIX: &str = "compact_"; +pub const WORKER_NAME: &str = "compact_worker"; + +pub struct CompactionHandler { + sandbox: Box, + tools: Vec>, + compaction_threshold: usize, +} + +impl CompactionHandler { + pub fn new(compaction_threshold: usize) -> Result { + let tools = vec![ + Box::new(FinishDelegationTool) as Box + ]; + + Ok(Self { + sandbox: NoOpSandbox::new().boxed(), + tools, + compaction_threshold, + }) + } + + pub fn compaction_threshold(&self) -> usize { + self.compaction_threshold + } +} + +#[async_trait] +impl DelegationHandler for CompactionHandler { + fn trigger_tool(&self) -> &str { + TRIGGER_TOOL + } + + fn thread_prefix(&self) -> &str { + THREAD_PREFIX + } + + fn worker_name(&self) -> &str { + WORKER_NAME + } + + fn tools(&self) -> &[Box] { + &self.tools + } + + fn sandbox_mut(&mut self) -> &mut Box { + &mut self.sandbox + } + + async fn execute_tool_by_name( + &mut self, + tool_name: &str, + args: serde_json::Value + ) -> eyre::Result> { + let tool = self.tools + .iter() + .find(|t| t.name() == tool_name) + .ok_or_else(|| eyre::eyre!("Tool '{}' not found", tool_name))?; + + tool.call(args, &mut self.sandbox).await + } + + fn create_context(&self, _tool_call: &rig::message::ToolCall) -> Result { + Ok(DelegationContext::Compaction { + threshold: self.compaction_threshold, + }) + } + + fn create_completion_result(&self, summary: &str, parent_tool_id: &str) -> crate::event::TypedToolResult { + use crate::event::{TypedToolResult, ToolKind}; + + // For compaction, we return a Done tool result (replacing the original large Done result) + TypedToolResult { + tool_name: ToolKind::Done, + result: rig::message::ToolResult { + id: parent_tool_id.to_string(), + call_id: None, + content: rig::OneOrMany::one(rig::message::ToolResultContent::Text( + summary.into() + )), + }, + } + } + + fn handle( + &self, + context: DelegationContext, + error_text: &str, + model: &str, + parent_aggregate_id: &str, + parent_tool_id: &str + ) -> Result { + let threshold = match context { + DelegationContext::Compaction { threshold } => threshold, + _ => return Err(eyre::eyre!("Invalid context for compaction handler")), + }; + + let task_thread_id = format!("compact_{}", Uuid::new_v4()); + let prompt = format!("Compact this error message to under {} characters:\n\n{}", + threshold, error_text); + + let tool_definitions: Vec = self.tools + .iter() + .map(|tool| tool.definition()) + .collect(); + + let config_event = Event::LLMConfig { + model: model.to_string(), + temperature: 0.0, + max_tokens: 8192, + preamble: Some(COMPACTION_SYSTEM_PROMPT.to_string()), + tools: Some(tool_definitions), + recipient: Some(WORKER_NAME.to_string()), + parent: Some(ParentAggregate { + aggregate_id: parent_aggregate_id.to_string(), + tool_id: Some(parent_tool_id.to_string()), + }), + }; + + let user_event = Event::UserMessage(rig::OneOrMany::one( + rig::message::UserContent::Text(rig::message::Text { + text: prompt, + }), + )); + + Ok(DelegationResult { + task_thread_id, + config_event, + user_event, + }) + } + + fn format_result(&self, summary: &str) -> String { + summary.to_string() + } + + fn extract_prompt(&self, _tool_call: &rig::message::ToolCall, tool_result: &TypedToolResult) -> String { + // For Done results, extract error text from result content + // For explicit compact_error calls, extract from tool call arguments (using default impl) + if matches!(tool_result.tool_name, ToolKind::Done) { + tool_result.result.content.iter() + .filter_map(|content| match content { + rig::message::ToolResultContent::Text(text) => Some(text.text.as_str()), + _ => None, + }) + .collect::>() + .join("\n") + } else { + // For explicit compact_error calls, use default extraction from tool call arguments + _tool_call.function.arguments.get("error_text") + .or_else(|| _tool_call.function.arguments.get("prompt")) + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string() + } + } + + fn should_handle(&self, result: &TypedToolResult) -> bool { + match &result.tool_name { + // Handle explicit compact_error calls + ToolKind::CompactError => true, + // Handle large Done results + ToolKind::Done => { + // Extract text from tool result to check size + let total_size: usize = result.result.content.iter() + .map(|content| match content { + rig::message::ToolResultContent::Text(t) => t.text.len(), + _ => 0, + }) + .sum(); + total_size > self.compaction_threshold + } + _ => false, + } + } +} + diff --git a/dabgent/dabgent_agent/src/processor/delegation/databricks.rs b/dabgent/dabgent_agent/src/processor/delegation/databricks.rs new file mode 100644 index 00000000..004800dc --- /dev/null +++ b/dabgent/dabgent_agent/src/processor/delegation/databricks.rs @@ -0,0 +1,192 @@ +use super::{DelegationHandler, DelegationContext, DelegationResult}; +use async_trait::async_trait; +use crate::event::{Event, ParentAggregate, TypedToolResult, ToolKind}; +use crate::toolbox::{databricks::databricks_toolset, ToolDyn}; +use dabgent_sandbox::{SandboxDyn, NoOpSandbox, Sandbox}; +use eyre::Result; +use uuid::Uuid; + +pub const DATABRICKS_SYSTEM_PROMPT: &str = r#" +You are a Databricks catalog explorer. Your role is to explore Unity Catalog to understand available data structures and provide detailed table schemas. + +## Your Task +Explore the specified Databricks catalog and provide a comprehensive summary of: +- Available schemas and their purposes +- Tables within each schema with descriptions +- **DETAILED column structure for each relevant table** including: + - Column names and data types + - Sample values from each column + - Any constraints or key information +- Relationships between tables if apparent + +## Focus Areas +When exploring data for DataApp creation: +- Look for tables that contain business-relevant data +- Identify primary keys and foreign key relationships +- **Use `databricks_describe_table` to get full column details and sample data for each relevant table** +- Note columns that would make good API fields + +## Output Format +Provide your findings in a structured markdown format with: +1. **Catalog Overview**: Brief description +2. **Schemas Found**: List with purposes +3. **Key Tables**: For each table include: + - Table name and purpose + - **Complete column list with data types** + - **Sample data rows showing actual values** + - Row counts and other metadata +4. **Recommendations**: Which tables/columns would work well for a DataApp API with specific column mappings + +## Completion +When you have completed your exploration and analysis, call the `finish_delegation` tool with a comprehensive summary that includes: +- Brief overview of what you discovered +- Key schemas and table counts +- **Detailed table structures for each relevant table** including: + - Full column specifications (name: data_type) + - Sample data showing what the columns contain +- Specific API endpoint recommendations with exact column mappings + +Example: `finish_delegation(result="Explored catalog 'main': Found bakery schema with 3 tables. products table (id: bigint, name: string, price: decimal, category: string) contains 500 bakery items like 'Chocolate Croissant', $4.50, 'pastry'. orders table (order_id: bigint, customer_id: bigint, product_id: bigint, quantity: int, order_date: timestamp) has 10K orders. Recommend /api/products endpoint returning {id, name, price, category} and /api/orders endpoint returning {order_id, customer_id, product_id, quantity, order_date}.")` + +**IMPORTANT**: Always use `databricks_describe_table` on relevant tables to get complete column details and sample data. This detailed structure information is critical for API design. +"#; + +pub const TRIGGER_TOOL: &str = "explore_databricks_catalog"; +pub const THREAD_PREFIX: &str = "databricks_"; +pub const WORKER_NAME: &str = "databricks_worker"; + + +pub struct DatabricksHandler { + sandbox: Box, + tools: Vec>, +} + +impl DatabricksHandler { + pub fn new() -> Result { + Ok(Self { + sandbox: NoOpSandbox::new().boxed(), // Databricks uses NoOp for API calls + tools: databricks_toolset()?, + }) + } +} + +#[async_trait] +impl DelegationHandler for DatabricksHandler { + fn trigger_tool(&self) -> &str { + TRIGGER_TOOL + } + + fn thread_prefix(&self) -> &str { + THREAD_PREFIX + } + + fn worker_name(&self) -> &str { + WORKER_NAME + } + + fn tools(&self) -> &[Box] { + &self.tools + } + + fn sandbox_mut(&mut self) -> &mut Box { + &mut self.sandbox + } + + async fn execute_tool_by_name( + &mut self, + tool_name: &str, + args: serde_json::Value + ) -> eyre::Result> { + let tool = self.tools + .iter() + .find(|t| t.name() == tool_name) + .ok_or_else(|| eyre::eyre!("Tool '{}' not found", tool_name))?; + + tool.call(args, &mut self.sandbox).await + } + + fn create_context(&self, tool_call: &rig::message::ToolCall) -> Result { + let catalog = tool_call.function.arguments + .get("catalog") + .and_then(|v| v.as_str()) + .unwrap_or("main") + .to_string(); + Ok(DelegationContext::Databricks { catalog }) + } + + fn create_completion_result(&self, summary: &str, parent_tool_id: &str) -> crate::event::TypedToolResult { + use crate::event::{TypedToolResult, ToolKind}; + + let result_content = self.format_result(summary); + + TypedToolResult { + tool_name: ToolKind::ExploreDatabricksCatalog, + result: rig::message::ToolResult { + id: parent_tool_id.to_string(), + call_id: None, + content: rig::OneOrMany::one(rig::message::ToolResultContent::Text( + rig::message::Text { text: result_content } + )), + }, + } + } + + fn handle( + &self, + context: DelegationContext, + prompt_arg: &str, + model: &str, + parent_aggregate_id: &str, + parent_tool_id: &str + ) -> Result { + let catalog = match context { + DelegationContext::Databricks { catalog } => catalog, + _ => return Err(eyre::eyre!("Invalid context for databricks handler")), + }; + + let task_thread_id = format!("databricks_{}", Uuid::new_v4()); + let prompt = format!("Explore catalog '{}': {}", catalog, prompt_arg); + + let tool_definitions: Vec = self.tools + .iter() + .map(|tool| tool.definition()) + .collect(); + + let config_event = Event::LLMConfig { + model: model.to_string(), + temperature: 0.0, + max_tokens: 16384, + preamble: Some(DATABRICKS_SYSTEM_PROMPT.to_string()), + tools: Some(tool_definitions), + recipient: Some(WORKER_NAME.to_string()), + parent: Some(ParentAggregate { + aggregate_id: parent_aggregate_id.to_string(), + tool_id: Some(parent_tool_id.to_string()), + }), + }; + + let user_event = Event::UserMessage(rig::OneOrMany::one( + rig::message::UserContent::Text(rig::message::Text { + text: prompt, + }), + )); + + Ok(DelegationResult { + task_thread_id, + config_event, + user_event, + }) + } + + fn format_result(&self, summary: &str) -> String { + format!( + "## Databricks Exploration Results\n\n{}\n\n*This data was discovered from your Databricks catalog and can be used to build your DataApp API.*", + summary + ) + } + + fn should_handle(&self, result: &TypedToolResult) -> bool { + matches!(result.tool_name, ToolKind::ExploreDatabricksCatalog) + } +} + diff --git a/dabgent/dabgent_agent/src/processor/delegation/mod.rs b/dabgent/dabgent_agent/src/processor/delegation/mod.rs new file mode 100644 index 00000000..4f581dba --- /dev/null +++ b/dabgent/dabgent_agent/src/processor/delegation/mod.rs @@ -0,0 +1,542 @@ +use super::{Aggregate, Processor}; +use crate::event::{Event, TypedToolResult, ToolKind}; +use crate::processor::thread; +use crate::toolbox::{Tool, ToolDyn, ToolCallExt}; +use crate::llm::CompletionResponse; +use dabgent_mq::{EventDb, EventStore, Query}; +use dabgent_sandbox::SandboxDyn; +use async_trait::async_trait; +use eyre::Result; +use serde::{Deserialize, Serialize}; +use serde_json::json; + +pub mod databricks; +pub mod compaction; + +#[derive(Debug, Clone)] +pub enum DelegationContext { + Databricks { catalog: String }, + Compaction { threshold: usize }, +} + +#[derive(Debug)] +pub struct DelegationResult { + pub task_thread_id: String, + pub config_event: Event, + pub user_event: Event, +} + +#[async_trait] +pub trait DelegationHandler: Send + Sync { + fn trigger_tool(&self) -> &str; + fn thread_prefix(&self) -> &str; + fn worker_name(&self) -> &str; + + // Handler owns its sandbox and tools + fn tools(&self) -> &[Box]; + fn sandbox_mut(&mut self) -> &mut Box; + + // Execute a tool by name - this avoids borrowing conflicts + async fn execute_tool_by_name( + &mut self, + tool_name: &str, + args: serde_json::Value + ) -> eyre::Result>; + + // Check if this handler should process a specific event + fn should_handle_tools(&self, event: &EventDb) -> bool { + if let Event::AgentMessage { recipient: Some(r), .. } = &event.data { + r == self.worker_name() && event.aggregate_id.starts_with(self.thread_prefix()) + } else { + false + } + } + + // Create context from tool call arguments + fn create_context(&self, tool_call: &rig::message::ToolCall) -> Result; + + // Create completion result for returning to parent thread + fn create_completion_result(&self, summary: &str, parent_tool_id: &str) -> TypedToolResult; + + // Determine if this handler should handle a specific tool result + fn should_handle(&self, result: &TypedToolResult) -> bool; + + // Extract prompt argument from tool call or tool result (handler-specific logic) + fn extract_prompt(&self, tool_call: &rig::message::ToolCall, _tool_result: &TypedToolResult) -> String { + // Default: extract from tool call arguments + tool_call.function.arguments.get("prompt") + .and_then(|v| v.as_str()) + .unwrap_or("Explore the catalog for relevant data") + .to_string() + } + + fn handle( + &self, + context: DelegationContext, + prompt: &str, + model: &str, + parent_aggregate_id: &str, + parent_tool_id: &str + ) -> Result; + fn format_result(&self, summary: &str) -> String; +} + +pub struct DelegationProcessor { + event_store: E, + default_model: String, + handlers: Vec>, +} + +impl Processor for DelegationProcessor { + async fn run(&mut self, event: &EventDb) -> eyre::Result<()> { + match &event.data { + Event::AgentMessage { response, .. } if self.has_delegation_trigger_tool_call(response) => { + tracing::info!( + "Delegation trigger tool call detected for aggregate {}", + event.aggregate_id + ); + self.handle_delegation_trigger(event, response).await?; + } + Event::ToolResult(tool_results) if self.is_delegation_tool_result(tool_results) => { + tracing::info!( + "Delegation tool result detected for aggregate {}", + event.aggregate_id + ); + self.handle_delegation_request(event, tool_results).await?; + } + Event::AgentMessage { response, .. } if self.is_delegated_tool_execution(event) => { + tracing::info!( + "Tool execution detected for delegated thread {}", + event.aggregate_id + ); + self.handle_tool_execution(event, response).await?; + } + Event::ToolResult(tool_results) if !self.is_delegation_tool_result(tool_results) => { + // Skip non-delegation tool results - they're handled by their respective ToolProcessors + } + Event::WorkComplete { result, .. } if self.is_delegated_thread(&event.aggregate_id) => { + tracing::info!( + "Delegated work completed successfully for aggregate {}", + event.aggregate_id, + ); + self.handle_work_completion(event, result).await?; + } + Event::DelegateWork { agent_type, prompt, parent_tool_id } => { + tracing::info!( + "Delegation work request detected for agent_type {} in aggregate {}", + agent_type, event.aggregate_id + ); + self.handle_delegate_work(event, agent_type, prompt, parent_tool_id).await?; + } + _ => {} + } + Ok(()) + } +} + +impl DelegationProcessor { + pub fn new(event_store: E, default_model: String, handlers: Vec>) -> Self { + Self { + event_store, + default_model, + handlers, + } + } + + fn has_delegation_trigger_tool_call(&self, response: &CompletionResponse) -> bool { + response.choice.iter().any(|content| { + if let rig::message::AssistantContent::ToolCall(call) = content { + self.handlers.iter().any(|h| h.trigger_tool() == call.function.name) + } else { + false + } + }) + } + + async fn handle_delegation_trigger(&mut self, event: &EventDb, response: &CompletionResponse) -> eyre::Result<()> { + // Extract trigger tool calls and start delegation for each + for content in response.choice.iter() { + if let rig::message::AssistantContent::ToolCall(call) = content { + if let Some(handler_idx) = self.handlers.iter().position(|h| h.trigger_tool() == call.function.name) { + let prompt = call.function.arguments + .get("prompt") + .and_then(|v| v.as_str()) + .unwrap_or(""); + + // Create context using handler's own logic + let context = self.handlers[handler_idx].create_context(call)?; + + // Create delegation using handler + let result = self.handlers[handler_idx] + .handle(context, prompt, &self.default_model, &event.aggregate_id, &call.id)?; + + // Push events to start delegation + self.event_store.push_event( + &event.stream_id, + &result.task_thread_id, + &result.config_event, + &Default::default() + ).await?; + + self.event_store.push_event( + &event.stream_id, + &result.task_thread_id, + &result.user_event, + &Default::default() + ).await?; + } + } + } + Ok(()) + } + + fn is_delegated_thread(&self, aggregate_id: &str) -> bool { + self.handlers.iter().any(|h| aggregate_id.starts_with(h.thread_prefix())) + } + + fn is_delegation_tool_result(&self, tool_results: &[crate::event::TypedToolResult]) -> bool { + // Check if any handler should handle any of the tool results + tool_results.iter().any(|result| { + self.handlers.iter().any(|handler| handler.should_handle(result)) + }) + } + + + + async fn handle_delegation_request( + &mut self, + event: &EventDb, + tool_results: &[crate::event::TypedToolResult], + ) -> eyre::Result<()> { + // Find a tool result that a handler can handle + for delegation_result in tool_results.iter() { + // Find matching handler using should_handle + let handler_idx = self.handlers.iter() + .position(|h| h.should_handle(delegation_result)); + + if let Some(handler_idx) = handler_idx { + let parent_tool_id = delegation_result.result.id.clone(); + // Load events to find the original tool call with arguments + let query = Query::stream(&event.stream_id).aggregate(&event.aggregate_id); + let events = self.event_store.load_events::(&query, None).await?; + + // Find the most recent AgentMessage with the matching tool call + let tool_call = events.iter().rev() + .find_map(|e| match e { + Event::AgentMessage { response, .. } => { + response.choice.iter().find_map(|content| { + if let rig::message::AssistantContent::ToolCall(call) = content { + if call.id == parent_tool_id { + Some(call) + } else { + None + } + } else { + None + } + }) + } + _ => None, + }); + + if let Some(tool_call) = tool_call { + // Let handler extract prompt using its own logic (may extract from tool call args or tool result content) + let prompt_arg = self.handlers[handler_idx].extract_prompt(tool_call, delegation_result); + + // Create context using handler's own logic + let context = self.handlers[handler_idx].create_context(tool_call)?; + + self.handle_delegation_by_index(event, handler_idx, context, &prompt_arg, &parent_tool_id).await?; + } else { + return Err(eyre::eyre!( + "Could not find original tool call with id '{}' for delegation", + parent_tool_id + )); + } + } + } + + Ok(()) + } + + async fn handle_delegate_work( + &mut self, + event: &EventDb, + agent_type: &str, + prompt: &str, + parent_tool_id: &str, + ) -> eyre::Result<()> { + // Find handler based on worker name (agent_type maps to worker_name) + let handler_idx = self.handlers.iter() + .position(|h| h.worker_name() == agent_type) + .ok_or_else(|| eyre::eyre!("No handler found for agent_type '{}'", agent_type))?; + + // Create a dummy tool call to extract context from handler + // For DelegateWork events triggered by ToolProcessor, we don't have a real tool call + // So we create a minimal one with empty arguments - handlers should use defaults + let dummy_call = rig::message::ToolCall { + id: parent_tool_id.to_string(), + call_id: None, + function: rig::message::ToolFunction { + name: self.handlers[handler_idx].trigger_tool().to_string(), + arguments: serde_json::Value::Object(Default::default()), + }, + }; + + // Create context using handler's own logic + let context = self.handlers[handler_idx].create_context(&dummy_call)?; + + // Delegate work to the appropriate handler + self.handle_delegation_by_index(event, handler_idx, context, prompt, parent_tool_id).await + } + + async fn handle_delegation_by_index( + &mut self, + event: &EventDb, + handler_idx: usize, + context: DelegationContext, + prompt_arg: &str, + parent_tool_id: &str, + ) -> eyre::Result<()> { + let result = self.handlers[handler_idx].handle( + context, + prompt_arg, + &self.default_model, + &event.aggregate_id, + parent_tool_id, + )?; + + // Send LLMConfig first with parent tracking + self.event_store + .push_event( + &event.stream_id, + &result.task_thread_id, + &result.config_event, + &Default::default(), + ) + .await?; + + // Send the exploration task + self.event_store + .push_event( + &event.stream_id, + &result.task_thread_id, + &result.user_event, + &Default::default(), + ) + .await?; + + Ok(()) + } + + async fn handle_work_completion( + &mut self, + event: &EventDb, + summary: &str, + ) -> eyre::Result<()> { + // Load task thread to get parent info from LLMConfig + let task_query = Query::stream(&event.stream_id).aggregate(&event.aggregate_id); + let task_events = self.event_store.load_events::(&task_query, None).await?; + + // Find the LLMConfig event to get parent info + let parent_info = task_events.iter() + .find_map(|e| match e { + Event::LLMConfig { parent, .. } => parent.as_ref(), + _ => None, + }); + + if let Some(parent) = parent_info { + // Find matching handler based on thread prefix + let handler = self.handlers.iter() + .find(|h| event.aggregate_id.starts_with(h.thread_prefix())); + + if let Some(handler) = handler { + if let Some(tool_id) = &parent.tool_id { + // Use handler's create_completion_result to get the appropriate result type + let completion_result = vec![handler.create_completion_result(summary, tool_id)]; + + // Convert ToolResult to UserMessage for thread processing + let tools = completion_result.iter().map(|t| rig::message::UserContent::ToolResult(t.result.clone())); + let user_content = rig::OneOrMany::many(tools)?; + + // Load original thread state and process + let original_query = Query::stream(&event.stream_id).aggregate(&parent.aggregate_id); + let events = self.event_store.load_events::(&original_query, None).await?; + let mut thread = thread::Thread::fold(&events); + let new_events = thread.process(thread::Command::User(user_content))?; + + for new_event in new_events.iter() { + self.event_store + .push_event( + &event.stream_id, + &parent.aggregate_id, + new_event, + &Default::default(), + ) + .await?; + } + } + } + } + + Ok(()) + } + + fn is_delegated_tool_execution(&self, event: &EventDb) -> bool { + self.handlers.iter().any(|h| h.should_handle_tools(event)) + } + + async fn handle_tool_execution( + &mut self, + event: &EventDb, + response: &CompletionResponse + ) -> eyre::Result<()> { + // Find the handler for this event + let handler_idx = self.handlers.iter() + .position(|h| h.should_handle_tools(event)) + .ok_or_else(|| eyre::eyre!("No handler found for tool execution"))?; + + let mut tool_results = Vec::new(); + + // Collect tool calls first to avoid borrowing issues + let mut tool_calls = Vec::new(); + for content in response.choice.iter() { + if let rig::message::AssistantContent::ToolCall(call) = content { + tool_calls.push(call.clone()); + } + } + + // Execute each tool call using the handler's execute_tool_by_name method + for call in tool_calls { + let tool_name = call.function.name.clone(); + let args = call.function.arguments.clone(); + + // Execute using the handler's method which handles borrowing internally + let result = self.handlers[handler_idx] + .execute_tool_by_name(&tool_name, args) + .await?; + + let tool_kind = match tool_name.as_str() { + "finish_delegation" => ToolKind::FinishDelegation, + other => ToolKind::Regular(other.to_string()), + }; + + let tool_result = call.to_result(result); + tool_results.push(TypedToolResult { + tool_name: tool_kind, + result: tool_result, + }); + } + + if !tool_results.is_empty() { + // Push the ToolResult event first + self.event_store.push_event( + &event.stream_id, + &event.aggregate_id, + &Event::ToolResult(tool_results.clone()), + &Default::default() + ).await?; + + // Convert ToolResults to UserMessage only if they're not from terminal tools + // Terminal tools complete the delegated work and don't need further LLM processing + let non_terminal_results: Vec<_> = tool_results.iter() + .filter(|tr| { + match &tr.tool_name { + ToolKind::Regular(tool_name) => { + // Check if this tool is terminal by finding it in handler tools + let is_terminal = self.handlers[handler_idx].tools() + .iter() + .find(|t| t.name() == *tool_name) + .map(|t| t.is_terminal()) + .unwrap_or(false); + !is_terminal + } + ToolKind::FinishDelegation => false, // Terminal tool + _ => true, // Other ToolKind variants are not terminal + } + }) + .collect(); + + if !non_terminal_results.is_empty() { + let tools = non_terminal_results.iter().map(|t| + rig::message::UserContent::ToolResult(t.result.clone()) + ); + let user_content = rig::OneOrMany::many(tools)?; + + // Load thread state and process the UserMessage + let query = Query::stream(&event.stream_id).aggregate(&event.aggregate_id); + let events = self.event_store.load_events::(&query, None).await?; + let mut thread = thread::Thread::fold(&events); + let new_events = thread.process(thread::Command::User(user_content))?; + + // Push the new events (including UserMessage and any LLM responses) + for new_event in new_events.iter() { + self.event_store + .push_event( + &event.stream_id, + &event.aggregate_id, + new_event, + &Default::default(), + ) + .await?; + } + } + } + + Ok(()) + } +} + +// Unified terminal tool for all delegation handlers +#[derive(Deserialize, Serialize)] +pub struct FinishDelegationArgs { + pub result: String, +} + +#[derive(Serialize)] +pub struct FinishDelegationOutput { + pub success: String, +} + +pub struct FinishDelegationTool; + +impl Tool for FinishDelegationTool { + type Args = FinishDelegationArgs; + type Output = FinishDelegationOutput; + type Error = serde_json::Value; + + fn name(&self) -> String { + "finish_delegation".to_string() + } + + fn definition(&self) -> rig::completion::ToolDefinition { + rig::completion::ToolDefinition { + name: Tool::name(self), + description: "Complete the delegated work with a result summary".to_string(), + parameters: json!({ + "type": "object", + "properties": { + "result": { + "type": "string", + "description": "The result of the delegated work" + } + }, + "required": ["result"] + }), + } + } + + fn is_terminal(&self) -> bool { + true + } + + async fn call( + &self, + args: Self::Args, + _sandbox: &mut Box, + ) -> Result> { + Ok(Ok(FinishDelegationOutput { + success: format!("Delegated work completed: {}", args.result), + })) + } +} \ No newline at end of file diff --git a/dabgent/dabgent_agent/src/processor/finish.rs b/dabgent/dabgent_agent/src/processor/finish.rs index aeb7cb70..51c2b307 100644 --- a/dabgent/dabgent_agent/src/processor/finish.rs +++ b/dabgent/dabgent_agent/src/processor/finish.rs @@ -127,7 +127,7 @@ impl FinishProcessor { impl Processor for FinishProcessor { async fn run(&mut self, event: &EventDb) -> eyre::Result<()> { match &event.data { - Event::TaskCompleted { success: true } => { + Event::TaskCompleted { success: true, .. } => { // Check event-sourced shutdown guard: if PipelineShutdown already exists, skip let query = Query::stream(&event.stream_id).aggregate(&event.aggregate_id); let prior_events = self.event_store.load_events::(&query, None).await?; @@ -178,17 +178,9 @@ impl Processor for FinishProcessor { - tracing::warn!("Task completed with failure, skipping export and shutting down"); - let shutdown_event = Event::PipelineShutdown; - self.event_store - .push_event( - &event.stream_id, - &event.aggregate_id, - &shutdown_event, - &Default::default(), - ) - .await?; + Event::TaskCompleted { success: false, .. } => { + tracing::info!("Task completed with failure, allowing pipeline to continue for retry"); + // Don't shutdown - let the agent fix issues and retry } _ => {} } diff --git a/dabgent/dabgent_agent/src/processor/mod.rs b/dabgent/dabgent_agent/src/processor/mod.rs index 299d28b6..86dd7d74 100644 --- a/dabgent/dabgent_agent/src/processor/mod.rs +++ b/dabgent/dabgent_agent/src/processor/mod.rs @@ -2,7 +2,8 @@ pub mod finish; pub mod sandbox; pub mod thread; pub mod replay; -pub mod compaction; +pub mod delegation; +pub mod completion; use dabgent_mq::{EventDb, EventStore, Query}; use std::pin::Pin; use tokio::sync::mpsc; @@ -10,7 +11,8 @@ use tokio::sync::mpsc; pub use finish::FinishProcessor; pub use sandbox::ToolProcessor; pub use thread::ThreadProcessor; -pub use compaction::CompactProcessor; +pub use delegation::DelegationProcessor; +pub use completion::CompletionProcessor; pub trait Aggregate: Default { type Command; diff --git a/dabgent/dabgent_agent/src/processor/sandbox.rs b/dabgent/dabgent_agent/src/processor/sandbox.rs index 7d836135..f3e60314 100644 --- a/dabgent/dabgent_agent/src/processor/sandbox.rs +++ b/dabgent/dabgent_agent/src/processor/sandbox.rs @@ -1,8 +1,8 @@ -use super::Processor; +use super::{Aggregate, Processor, thread}; use crate::event::{Event, TypedToolResult, ToolKind}; use crate::llm::{CompletionResponse, FinishReason}; use crate::toolbox::{ToolCallExt, ToolDyn}; -use dabgent_mq::{EventDb, EventStore}; +use dabgent_mq::{EventDb, EventStore, Query}; use dabgent_sandbox::SandboxDyn; use eyre::Result; use std::path::Path; @@ -59,15 +59,42 @@ impl Processor for ToolProcessor { } if response.finish_reason == FinishReason::ToolUse && recipient.eq(&self.recipient) => { - let tool_results = self.run_tools(&response, &event.stream_id, &event.aggregate_id).await?; - let tool_result_event = Event::ToolResult(tool_results); + let tool_results = self.run_tools(&response).await?; - self.event_store.push_event( - &event.stream_id, - &event.aggregate_id, - &tool_result_event, - &Default::default(), - ).await?; + if !tool_results.is_empty() { + // Emit tool results as-is + let tool_result_event = Event::ToolResult(tool_results.clone()); + self.event_store.push_event( + &event.stream_id, + &event.aggregate_id, + &tool_result_event, + &Default::default(), + ).await?; + + // Convert to UserMessage for normal processing + let tools = tool_results.iter().map(|t| + rig::message::UserContent::ToolResult(t.result.clone()) + ); + let user_content = rig::OneOrMany::many(tools)?; + + // Load thread state and process the UserMessage + let query = Query::stream(&event.stream_id).aggregate(&event.aggregate_id); + let events = self.event_store.load_events::(&query, None).await?; + let mut thread = thread::Thread::fold(&events); + let new_events = thread.process(thread::Command::User(user_content))?; + + // Push the new events (including UserMessage and any LLM responses) + for new_event in new_events.iter() { + self.event_store + .push_event( + &event.stream_id, + &event.aggregate_id, + new_event, + &Default::default(), + ) + .await?; + } + } } _ => {} @@ -94,8 +121,6 @@ impl ToolProcessor { async fn run_tools( &mut self, response: &CompletionResponse, - stream_id: &str, - aggregate_id: &str, ) -> Result> { let mut results = Vec::new(); for content in response.choice.iter() { @@ -106,27 +131,30 @@ impl ToolProcessor { let args = call.function.arguments.clone(); let tool_result = tool.call(args, &mut self.sandbox).await?; - // Check if this is a successful DoneTool call - if call.function.name == "done" && tool_result.is_ok() { - tracing::info!("Task completed successfully, emitting TaskCompleted event"); - let task_completed_event = Event::TaskCompleted { success: true }; - self.event_store - .push_event( - stream_id, - aggregate_id, - &task_completed_event, - &Default::default(), - ) - .await?; - } tool_result } None => { - let error = format!("{} not found", call.function.name); + let available_tools: Vec = self.tools.iter() + .map(|tool| tool.name()) + .collect(); + let error = format!( + "Tool '{}' does not exist. Available tools: [{}]", + call.function.name, + available_tools.join(", ") + ); Err(serde_json::json!(error)) } }; - results.push(TypedToolResult { tool_name: match call.function.name.as_str() { "done" => ToolKind::Done, other => ToolKind::Other(other.to_string()) }, result: call.to_result(result) }); + results.push(TypedToolResult { + tool_name: match call.function.name.as_str() { + "done" => ToolKind::Done, + "explore_databricks_catalog" => ToolKind::ExploreDatabricksCatalog, + "finish_delegation" => ToolKind::FinishDelegation, + "compact_error" => ToolKind::CompactError, + other => ToolKind::Regular(other.to_string()) + }, + result: call.to_result(result) + }); } } diff --git a/dabgent/dabgent_agent/src/toolbox/basic.rs b/dabgent/dabgent_agent/src/toolbox/basic.rs index 509ddf65..0d072662 100644 --- a/dabgent/dabgent_agent/src/toolbox/basic.rs +++ b/dabgent/dabgent_agent/src/toolbox/basic.rs @@ -3,6 +3,59 @@ use dabgent_sandbox::SandboxDyn; use eyre::Result; use serde::{Deserialize, Serialize}; +#[derive(Serialize, Deserialize)] +pub struct DoneToolArgs { + pub summary: String, +} + +#[derive(Serialize, Deserialize)] +pub struct FinishDelegationArgs { + pub result: String, +} + +#[derive(Clone)] +pub struct FinishDelegationTool; + +impl Tool for FinishDelegationTool { + type Args = FinishDelegationArgs; + type Output = String; + type Error = String; + + fn name(&self) -> String { + "finish_delegation".to_string() + } + + fn definition(&self) -> rig::completion::ToolDefinition { + rig::completion::ToolDefinition { + name: self.name(), + description: "Complete delegated work and return summary to main thread".to_string(), + parameters: serde_json::json!({ + "type": "object", + "properties": { + "result": { + "type": "string", + "description": "Comprehensive summary of delegated work results" + } + }, + "required": ["result"] + }), + } + } + + fn is_terminal(&self) -> bool { + true + } + + async fn call( + &self, + args: Self::Args, + _sandbox: &mut Box, + ) -> eyre::Result> { + // This tool just returns the result - the actual event emission will be handled by ToolProcessor + Ok(Ok(args.result)) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct BashArgs { pub command: String, @@ -331,7 +384,7 @@ impl DoneTool { } impl Tool for DoneTool { - type Args = serde_json::Value; + type Args = DoneToolArgs; type Output = String; type Error = String; @@ -347,19 +400,25 @@ impl Tool for DoneTool { description: "Run checks, if successful mark task as finished".to_string(), parameters: serde_json::json!({ "type": "object", - "properties": {}, + "properties": { + "summary": { + "type": "string", + "description": "Summary of completed work or validation results" + } + }, + "required": ["summary"] }), } } async fn call( &self, - _args: Self::Args, + args: Self::Args, sandbox: &mut Box, ) -> eyre::Result> { match self.validator.run(sandbox).await { Ok(result) => Ok(match result { - Ok(_) => Ok("success".to_string()), + Ok(_) => Ok(args.summary), Err(err) => Err(format!("validation error: {}", err)), }), Err(e) => Ok(Err(format!("validator failed: {}", e))), diff --git a/dabgent/dabgent_agent/src/toolbox/databricks.rs b/dabgent/dabgent_agent/src/toolbox/databricks.rs index dec1dea3..339e908b 100644 --- a/dabgent/dabgent_agent/src/toolbox/databricks.rs +++ b/dabgent/dabgent_agent/src/toolbox/databricks.rs @@ -1,4 +1,4 @@ -use crate::toolbox::{ClientTool, ClientToolAdapter, ToolDyn}; +use crate::toolbox::{ClientTool, ClientToolAdapter, ToolDyn, basic::FinishDelegationTool}; use dabgent_integrations::databricks::DatabricksRestClient; use eyre::Result; use serde::{Deserialize, Serialize}; @@ -7,6 +7,30 @@ use std::sync::Arc; // Args structs matching the Python implementation +fn default_limit() -> usize { + 1000 +} + +// Helper functions for pagination and filtering +fn apply_pagination(items: Vec, limit: usize, offset: usize) -> (Vec, String) { + let total = items.len(); + let paginated: Vec = items.into_iter().skip(offset).take(limit).collect(); + let shown = paginated.len(); + + let pagination_info = if total > limit + offset { + format!("Showing {} items (offset {}, limit {}). Total: {}", shown, offset, limit, total) + } else if offset > 0 { + format!("Showing {} items (offset {}). Total: {}", shown, offset, total) + } else if total > limit { + format!("Showing {} items (limit {}). Total: {}", shown, limit, total) + } else { + format!("Showing all {} items", total) + }; + + (paginated, pagination_info) +} + + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DatabricksListCatalogsArgs { // No parameters needed - lists all available catalogs @@ -15,6 +39,12 @@ pub struct DatabricksListCatalogsArgs { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DatabricksListSchemasArgs { pub catalog_name: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub filter: Option, + #[serde(default = "default_limit")] + pub limit: usize, + #[serde(default)] + pub offset: usize, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -133,7 +163,7 @@ impl ClientTool for DatabricksListSchemas { fn definition(&self) -> rig::completion::ToolDefinition { rig::completion::ToolDefinition { name: self.name(), - description: "List all schemas in a specific catalog".to_string(), + description: "List all schemas in a specific catalog with optional filtering and pagination".to_string(), parameters: serde_json::json!({ "type": "object", "properties": { @@ -141,6 +171,20 @@ impl ClientTool for DatabricksListSchemas { "type": "string", "description": "Name of the catalog to list schemas from", }, + "filter": { + "type": "string", + "description": "Optional filter to search for schemas by name (case-insensitive substring match)", + }, + "limit": { + "type": "integer", + "description": "Maximum number of schemas to return (default: 1000)", + "default": 1000, + }, + "offset": { + "type": "integer", + "description": "Number of schemas to skip (default: 0)", + "default": 0, + }, }, "required": ["catalog_name"], }), @@ -152,24 +196,36 @@ impl ClientTool for DatabricksListSchemas { } async fn call(&self, args: Self::Args) -> Result> { - match self.client.list_schemas(&args.catalog_name).await { + tracing::debug!("DatabricksListSchemas::call starting with catalog: {}", args.catalog_name); + match self.client.list_schemas(&args.catalog_name, args.filter.as_deref()).await { Ok(schemas) => { + tracing::debug!("DatabricksListSchemas::call succeeded, found {} schemas", schemas.len()); + if schemas.is_empty() { - Ok(Ok(format!("No schemas found in catalog '{}'.", args.catalog_name))) + let message = if args.filter.is_some() { + format!("No schemas found in catalog '{}' matching filter.", args.catalog_name) + } else { + format!("No schemas found in catalog '{}'.", args.catalog_name) + }; + Ok(Ok(message)) } else { - let mut result_lines = vec![ - format!("Found {} schemas in catalog '{}':", schemas.len(), args.catalog_name), - "".to_string(), - ]; + // Apply pagination + let (paginated_schemas, pagination_info) = apply_pagination(schemas, args.limit, args.offset); - for schema in &schemas { - result_lines.push(format!("• {}.{}", args.catalog_name, schema)); + let mut result_lines = vec![pagination_info, "".to_string()]; + + for schema in &paginated_schemas { + // Remove redundant catalog name from output + result_lines.push(format!("• {}", schema)); } Ok(Ok(result_lines.join("\n"))) } } - Err(e) => Ok(Err(format!("Failed to list schemas in catalog '{}': {}", args.catalog_name, e))), + Err(e) => { + tracing::debug!("DatabricksListSchemas::call failed with error: {}", e); + Ok(Err(format!("Failed to list schemas in catalog '{}': {}", args.catalog_name, e))) + } } } } @@ -468,7 +524,12 @@ fn format_value(value: &Value) -> String { // Public function to create a databricks toolset pub fn databricks_toolset() -> Result>> { - let client = Arc::new(DatabricksRestClient::new().map_err(|e| eyre::eyre!("{}", e))?); + tracing::info!("Creating Databricks toolset..."); + let client = Arc::new(DatabricksRestClient::new().map_err(|e| { + tracing::error!("Failed to create DatabricksRestClient: {}", e); + eyre::eyre!("{}", e) + })?); + tracing::info!("DatabricksRestClient created successfully"); Ok(vec![ Box::new(ClientToolAdapter::new(DatabricksListCatalogs::new(client.clone()))), @@ -476,5 +537,6 @@ pub fn databricks_toolset() -> Result>> { Box::new(ClientToolAdapter::new(DatabricksListTables::new(client.clone()))), Box::new(ClientToolAdapter::new(DatabricksDescribeTable::new(client.clone()))), Box::new(ClientToolAdapter::new(DatabricksExecuteQuery::new(client.clone()))), + Box::new(FinishDelegationTool), ]) } \ No newline at end of file diff --git a/dabgent/dabgent_agent/src/toolbox/mod.rs b/dabgent/dabgent_agent/src/toolbox/mod.rs index 2ff820f6..b7d9cbfc 100644 --- a/dabgent/dabgent_agent/src/toolbox/mod.rs +++ b/dabgent/dabgent_agent/src/toolbox/mod.rs @@ -15,6 +15,7 @@ pub trait Tool: Send + Sync { fn name(&self) -> String; fn definition(&self) -> rig::completion::ToolDefinition; fn needs_replay(&self) -> bool { true } + fn is_terminal(&self) -> bool { false } fn call( &self, args: Self::Args, @@ -28,6 +29,7 @@ pub trait ToolDyn: Send + Sync { fn name(&self) -> String; fn definition(&self) -> rig::completion::ToolDefinition; fn needs_replay(&self) -> bool; + fn is_terminal(&self) -> bool; fn call<'a>( &'a self, args: serde_json::Value, @@ -48,6 +50,10 @@ impl ToolDyn for T { Tool::needs_replay(self) } + fn is_terminal(&self) -> bool { + Tool::is_terminal(self) + } + fn call<'a>( &'a self, args: serde_json::Value, diff --git a/dabgent/dabgent_agent/src/toolbox/planning.rs b/dabgent/dabgent_agent/src/toolbox/planning.rs index 12b757c9..e638305f 100644 --- a/dabgent/dabgent_agent/src/toolbox/planning.rs +++ b/dabgent/dabgent_agent/src/toolbox/planning.rs @@ -373,7 +373,10 @@ impl NoSandboxTool for CompleteTaskTool { let task = tasks[args.task_index].clone(); // Create TaskCompleted event for the specific task - let event = crate::event::Event::TaskCompleted { success: true }; + let event = crate::event::Event::TaskCompleted { + success: true, + summary: "Planning task completed".to_string() + }; // Push event to store with the appropriate thread_id let thread_id = format!("task-{}", args.task_index); diff --git a/dabgent/dabgent_agent/tests/task_list_validator.rs b/dabgent/dabgent_agent/tests/task_list_validator.rs index 66e2bb1f..3694562f 100644 --- a/dabgent/dabgent_agent/tests/task_list_validator.rs +++ b/dabgent/dabgent_agent/tests/task_list_validator.rs @@ -71,7 +71,7 @@ async fn test_task_list_validator_with_done_tool() { // DoneTool should fail due to incomplete tasks let done_result = done_tool.call( - serde_json::json!({}), + serde_json::json!({"summary": "Task validation completed"}), &mut sandbox ).await?; @@ -89,12 +89,12 @@ async fn test_task_list_validator_with_done_tool() { ).await?; let done_result = done_tool.call( - serde_json::json!({}), + serde_json::json!({"summary": "All tasks completed successfully"}), &mut sandbox ).await?; assert!(done_result.is_ok(), "DoneTool should succeed with all tasks completed"); - assert_eq!(done_result.unwrap(), "success", "Should return success"); + assert_eq!(done_result.unwrap(), "All tasks completed successfully", "Should return summary"); Ok::<(), eyre::Error>(()) }).await; diff --git a/dabgent/dabgent_cli/src/widgets.rs b/dabgent/dabgent_cli/src/widgets.rs index c8adbdf8..53584e58 100644 --- a/dabgent/dabgent_cli/src/widgets.rs +++ b/dabgent/dabgent_cli/src/widgets.rs @@ -61,6 +61,8 @@ pub fn event_as_text(event: &AgentEvent) -> Text<'_> { AgentEvent::ToolResult(_) => Text::raw("Tool result"), AgentEvent::PlanCreated { tasks } => render_plan_created(tasks), AgentEvent::PlanUpdated { tasks } => render_plan_updated(tasks), + AgentEvent::DelegateWork { agent_type, .. } => Text::raw(format!("Delegating work to: {}", agent_type)), + AgentEvent::WorkComplete { agent_type, .. } => Text::raw(format!("Work completed by: {}", agent_type)), } } diff --git a/dabgent/dabgent_fastapi/examples/dataapps.rs b/dabgent/dabgent_fastapi/examples/dataapps.rs index f8d6f229..54bde9f2 100644 --- a/dabgent/dabgent_fastapi/examples/dataapps.rs +++ b/dabgent/dabgent_fastapi/examples/dataapps.rs @@ -1,4 +1,4 @@ -use dabgent_agent::processor::{CompactProcessor, FinishProcessor, Pipeline, Processor, ThreadProcessor, ToolProcessor}; +use dabgent_agent::processor::{CompletionProcessor, DelegationProcessor, FinishProcessor, Pipeline, Processor, ThreadProcessor, ToolProcessor}; use dabgent_agent::toolbox::ToolDyn; use dabgent_fastapi::{toolset::dataapps_toolset, validator::DataAppsValidator, artifact_preparer::DataAppsArtifactPreparer}; use dabgent_fastapi::templates::{EMBEDDED_TEMPLATES, DEFAULT_TEMPLATE_PATH}; @@ -20,7 +20,7 @@ async fn main() { let opts = ConnectOpts::default(); opts.connect(|client| async move { - let llm = rig::providers::gemini::Client::from_env(); + let claude_llm = rig::providers::anthropic::Client::from_env(); let store = create_store(Some(StoreConfig::from_env())).await?; tracing::info!("Event store initialized successfully"); let sandbox = create_sandbox(&client).await?; @@ -39,9 +39,9 @@ async fn main() { push_seed_sandbox(&store, STREAM_ID, AGGREGATE_ID, template_path, "/app").await?; push_prompt(&store, STREAM_ID, AGGREGATE_ID, USER_PROMPT).await?; - tracing::info!("Starting DataApps pipeline with model: {}", MODEL); + tracing::info!("Starting DataApps pipeline with main model: {} and delegation model: {}", MAIN_MODEL, DELEGATION_MODEL); - let thread_processor = ThreadProcessor::new(llm.clone(), store.clone()); + let thread_processor = ThreadProcessor::new(claude_llm.clone(), store.clone()); // Create export directory path with timestamp let timestamp = std::time::SystemTime::now() @@ -54,11 +54,13 @@ async fn main() { let completion_sandbox = sandbox.fork().await?; let tool_processor = ToolProcessor::new(dabgent_sandbox::Sandbox::boxed(sandbox), store.clone(), tool_processor_tools, None); - // Create CompactProcessor with small threshold for testing - let compact_processor = CompactProcessor::new( + let delegation_processor = DelegationProcessor::new( store.clone(), - 2048, - "gemini-2.5-flash".to_string(), // Use same model as main pipeline + DELEGATION_MODEL.to_string(), + vec![ + Box::new(dabgent_agent::processor::delegation::databricks::DatabricksHandler::new()?), + Box::new(dabgent_agent::processor::delegation::compaction::CompactionHandler::new(2048)?), + ], ); // FixMe: FinishProcessor should have no state, including export path @@ -70,12 +72,14 @@ async fn main() { DataAppsArtifactPreparer, ); + let completion_processor = CompletionProcessor::new(store.clone()); let pipeline = Pipeline::new( store.clone(), vec![ thread_processor.boxed(), - tool_processor.boxed(), - compact_processor.boxed(), + tool_processor.boxed(), // Handles main thread tools (recipient: None) + completion_processor.boxed(), // Handles Done and FinishDelegation completions + delegation_processor.boxed(), // Handles delegation AND delegated tool execution (including compaction) finish_processor.boxed(), ], ); @@ -97,43 +101,42 @@ Workspace Setup: - You have a pre-configured DataApps project structure in /app with backend and frontend directories - Backend is in /app/backend with Python, FastAPI, and uv package management - Frontend is in /app/frontend with React Admin and TypeScript -- Use 'uv run' for all Python commands (e.g., 'uv run python main.py') + +Data Sources: +- You have access to Databricks Unity Catalog with bakery business data +- Use the 'explore_databricks_catalog' tool to discover available tables and schemas +- The catalog contains real business data about products, sales, customers, and orders +- Once you explore the data, use the actual schema and sample data for your API design Your Task: -1. Create a simple data API with one endpoint that returns sample data -2. Configure React Admin UI to display this data in a table -3. Add proper logging and debugging throughout +1. First, explore the Databricks catalog to understand the data +2. Create a data API that serves real data from Databricks tables +3. Configure React Admin UI to display this data in tables 4. Ensure CORS is properly configured for React Admin +5. When the app is ready, you need to use tool Done to run the tests and linters. If there are any errors, fix them; otherwise, the tool will confirm completion. Implementation Details: -- Add /api/items endpoint in backend/main.py that returns a list of sample items -- Each item should have: id, name, description, category, created_at fields -- Update frontend/src/App.tsx to add a Resource for items with ListGuesser +- Start by exploring the Databricks catalog to find relevant tables +- Design API endpoints based on the actual data structure you discover +- Each endpoint should return data with fields matching the Databricks schema +- Update frontend/src/App.tsx to add Resources for the discovered data - Include X-Total-Count header for React Admin pagination -- Add debug logging in both backend (print/logging) and frontend (console.log) -Quality Requirements: -- Follow React Admin patterns for data providers -- Use proper REST API conventions (/api/resource) -- Handle errors gracefully with clear messages -- Run all linters and tests before completion - -Start by exploring the current project structure, then implement the required features. -Use the tools available to you as needed. "; const USER_PROMPT: &str = " -Create a simple DataApp with: +Create a data app to show the core sales data for a bakery. -1. Backend API endpoint `/api/items` that returns a list of sample items (each item should have id, name, description, category, created_at fields) -2. React Admin frontend that displays these items in a table with proper columns -3. Include debug logging in both backend and frontend -4. Make sure the React Admin data provider can fetch and display the items +1. First, explore the Databricks catalog to discover where bakery sales data is stored, i assume it is under `samples` but you need to confirm; +2. Based on what you find, create backend API endpoints with some sample data from those tables (real integration will be added later); +3. Build React Admin frontend that displays the discovered data in tables -The app should be functional. +Focus on creating a functional DataApp that showcases real bakery business data from Databricks. "; -const MODEL: &str = "gemini-2.5-flash"; + +const MAIN_MODEL: &str = "claude-sonnet-4-5"; +const DELEGATION_MODEL: &str = "claude-sonnet-4-5"; async fn create_sandbox(client: &dagger_sdk::DaggerConn) -> Result { tracing::info!("Setting up sandbox with DataApps template..."); @@ -168,7 +171,7 @@ async fn push_llm_config( .collect(); let event = dabgent_agent::event::Event::LLMConfig { - model: MODEL.to_owned(), + model: MAIN_MODEL.to_owned(), temperature: 0.0, max_tokens: 8192, preamble: Some(SYSTEM_PROMPT.to_owned()), diff --git a/dabgent/dabgent_fastapi/src/toolset.rs b/dabgent/dabgent_fastapi/src/toolset.rs index c0c9aee2..c48276d6 100644 --- a/dabgent/dabgent_fastapi/src/toolset.rs +++ b/dabgent/dabgent_fastapi/src/toolset.rs @@ -5,6 +5,8 @@ use serde::{Deserialize, Serialize}; pub struct UvAdd; +pub struct SpawnDatabricksExploration; + #[derive(Serialize, Deserialize)] pub struct UvAddArgs { pub package: String, @@ -12,6 +14,22 @@ pub struct UvAddArgs { pub dev: bool, } +#[derive(Serialize, Deserialize)] +pub struct SpawnDatabricksExplorationArgs { + #[serde(default = "default_catalog")] + pub catalog: String, + #[serde(default = "default_exploration_prompt")] + pub prompt: String, +} + +fn default_catalog() -> String { + "main".to_string() +} + +fn default_exploration_prompt() -> String { + "Explore the catalog and find tables that would be suitable for a bakery business DataApp. Focus on sales, products, customers, and orders data.".to_string() +} + impl Tool for UvAdd { type Args = UvAddArgs; type Output = String; @@ -64,6 +82,50 @@ impl Tool for UvAdd { } } +impl Tool for SpawnDatabricksExploration { + type Args = SpawnDatabricksExplorationArgs; + type Output = String; + type Error = String; + + fn name(&self) -> String { + "explore_databricks_catalog".to_string() + } + + fn definition(&self) -> rig::completion::ToolDefinition { + rig::completion::ToolDefinition { + name: Tool::name(self), + description: "Explore Databricks catalog to discover tables and data structure for building DataApp APIs".to_string(), + parameters: serde_json::json!({ + "type": "object", + "properties": { + "catalog": { + "type": "string", + "description": "Databricks catalog name to explore (default: 'main')", + "default": "main" + }, + "prompt": { + "type": "string", + "description": "Specific exploration instructions", + "default": "Explore the catalog and find tables that would be suitable for a bakery business DataApp. Focus on sales, products, customers, and orders data." + } + } + }), + } + } + + async fn call( + &self, + args: Self::Args, + _sandbox: &mut Box, + ) -> Result> { + let SpawnDatabricksExplorationArgs { catalog: _, prompt: _ } = args; + + // This tool triggers delegation to an independent Databricks exploration agent + // Return minimal response since the actual result will come from the delegated worker + Ok(Ok("Delegation triggered".to_string())) + } +} + pub fn dataapps_toolset(validator: T) -> Vec> { vec![ Box::new(WriteFile), @@ -72,6 +134,7 @@ pub fn dataapps_toolset(validator: T) -> V Box::new(LsDir), Box::new(RmFile), Box::new(UvAdd), + Box::new(SpawnDatabricksExploration), Box::new(DoneTool::new(validator)), ] } \ No newline at end of file diff --git a/dabgent/dabgent_integrations/src/databricks.rs b/dabgent/dabgent_integrations/src/databricks.rs index 7d98aa36..a97057d3 100644 --- a/dabgent/dabgent_integrations/src/databricks.rs +++ b/dabgent/dabgent_integrations/src/databricks.rs @@ -230,20 +230,31 @@ impl DatabricksRestClient { request = request.json(body); } + debug!("Sending HTTP request to Databricks API..."); let response = request .send() .await - .map_err(|e| anyhow!("HTTP request failed: {}", e))?; + .map_err(|e| { + info!("HTTP request failed: {}", e); + anyhow!("HTTP request failed: {}", e) + })?; let status = response.status(); + debug!("Received HTTP response with status: {}", status); + let response_text = response .text() .await - .map_err(|e| anyhow!("Failed to read response text: {}", e))?; + .map_err(|e| { + info!("Failed to read response text: {}", e); + anyhow!("Failed to read response text: {}", e) + })?; - debug!("Response status: {}, body: {}", status, response_text); + debug!("Response body length: {} characters", response_text.len()); + debug!("Response body: {}", response_text); if !status.is_success() { + info!("API request failed with status {}: {}", status, response_text); return Err(anyhow!( "API request failed with status {}: {}", status, @@ -251,7 +262,9 @@ impl DatabricksRestClient { )); } + debug!("Parsing JSON response..."); serde_json::from_str(&response_text).map_err(|e| { + info!("Failed to parse JSON response: {}. Response: {}", e, response_text); anyhow!( "Failed to parse JSON response: {}. Response: {}", e, @@ -461,7 +474,8 @@ impl DatabricksRestClient { Ok(all_catalogs) } - pub async fn list_schemas(&self, catalog_name: &str) -> Result> { + pub async fn list_schemas(&self, catalog_name: &str, name_filter: Option<&str>) -> Result> { + info!("Starting list_schemas for catalog: {}", catalog_name); let mut all_schemas = Vec::new(); let mut next_page_token: Option = None; @@ -476,13 +490,22 @@ impl DatabricksRestClient { url.push('?'); url.push_str(&query_params.join("&")); + debug!("About to make API request for schemas..."); let response: SchemasListResponse = self .api_request(reqwest::Method::GET, &url, None::<&()>) .await?; + debug!("Successfully received schemas response"); if let Some(schemas) = response.schemas { for schema in schemas { - all_schemas.push(schema.name); + // Apply filter if provided + if let Some(filter) = name_filter { + if schema.name.to_lowercase().contains(&filter.to_lowercase()) { + all_schemas.push(schema.name); + } + } else { + all_schemas.push(schema.name); + } } } @@ -493,6 +516,7 @@ impl DatabricksRestClient { } } + info!("Completed list_schemas, found {} schemas", all_schemas.len()); Ok(all_schemas) } @@ -514,7 +538,7 @@ impl DatabricksRestClient { // For each catalog, get schemas and then tables for catalog_name in catalog_names { let schema_names = if schema == "*" { - self.list_schemas(&catalog_name).await? + self.list_schemas(&catalog_name, None).await? } else { vec![schema.to_string()] }; diff --git a/dataapps/template_minimal/backend/pyproject.toml b/dataapps/template_minimal/backend/pyproject.toml index 484c409d..4824f130 100644 --- a/dataapps/template_minimal/backend/pyproject.toml +++ b/dataapps/template_minimal/backend/pyproject.toml @@ -27,7 +27,7 @@ dev = [ # Linting and formatting configuration [tool.ruff] -line-length = 100 +line-length = 120 target-version = "py312" [tool.ruff.lint]