Stop engineering prompts. Start declaring contracts.
Flock is a production-focused framework for orchestrating AI agents through declarative type contracts and blackboard architectureβproven patterns from distributed systems, decades of microservice experience, and classical AIβnow applied to modern LLMs.
π Read the full documentation β
Quick links:
- Getting Started - Installation and first steps
- Tutorials - Step-by-step learning path
- User Guides - In-depth feature documentation
- API Reference - Complete API documentation
- Roadmap - What's coming in v1.0
- Changelog - Recent new features and version history
Building production multi-agent systems today means dealing with:
π₯ Prompt Engineering Hell
prompt = """You are an expert code reviewer. When you receive code, you should...
[498 more lines of instructions that the LLM ignores half the time]"""
# 500-line prompt that breaks when models update
# How do I know this is the best prompt? (you don't)
# Proving 'best possible performance' is impossibleπ§ͺ Testing Nightmares
# How do you unit test this?
result = llm.invoke(prompt) # Hope for valid JSON
data = json.loads(result.content) # Crashes in productionπ Rigid Topology & Tight Coupling
# Want to add a new agent? Rewrite the entire graph.
workflow.add_edge("agent_a", "agent_b")
workflow.add_edge("agent_b", "agent_c")
# Add agent_d? Start rewiring...π Single Point of Failure
# Orchestrator dies? Everything dies.π§ God Object Anti-Pattern
# One orchestrator needs domain knowledge of 20+ agents to route correctly
# Orchestrator 'guesses' next agent based on natural language
# Not suitable for critical systemsThese aren't framework limitationsβthey're architectural choices that don't scale. Decades of microservice experience have taught us about decoupling, orchestration, and reliability. Let's apply those lessons!
Flock combines two proven patterns:
Traditional approach:
prompt = """You are an expert bug analyst. Analyze bug reports and provide structured diagnostics.
INSTRUCTIONS:
1. Read the bug report carefully
2. Determine severity (Critical|High|Medium|Low)
3. Classify bug category
4. Formulate root cause hypothesis (minimum 50 characters)
5. Assign confidence score (0.0-1.0)
OUTPUT FORMAT:
You MUST return valid JSON with this exact structure:
{
"severity": "string (Critical|High|Medium|Low)",
"category": "string",
"root_cause_hypothesis": "string (minimum 50 characters)",
"confidence_score": "number (0.0 to 1.0)"
}
VALIDATION RULES:
- severity: Must be exactly one of: Critical, High, Medium, Low
- category: Must be a single word or short phrase
- root_cause_hypothesis: Must be at least 50 characters
- confidence_score: Must be between 0.0 and 1.0
[...hundreds more lines...]"""
result = llm.invoke(prompt) # 500-line prompt that breaks
data = json.loads(result.content) # Crashes in production π₯The Flock way:
@flock_type
class BugDiagnosis(BaseModel):
severity: str = Field(pattern="^(Critical|High|Medium|Low)$")
category: str = Field(description="Bug category")
root_cause_hypothesis: str = Field(min_length=50)
confidence_score: float = Field(ge=0.0, le=1.0)
# The schema IS the instruction. No 500-line prompt needed.
agent.consumes(BugReport).publishes(BugDiagnosis)Why this matters:
- β Survives model upgrades - GPT-6 will still understand Pydantic schemas
- β Runtime validation - Errors caught at parse time, not in production
- β Testable - Mock inputs/outputs with concrete types
- β Self-documenting - The code tells you what agents do
Graph-based approach:
# Explicit workflow with hardcoded edges
workflow.add_edge("radiologist", "diagnostician")
workflow.add_edge("lab_tech", "diagnostician")
# Add performance_analyzer? Rewrite the graph.The Flock way (blackboard):
# Agents subscribe to types, workflows emerge
radiologist = flock.agent("radiologist").consumes(Scan).publishes(XRayAnalysis)
lab_tech = flock.agent("lab_tech").consumes(Scan).publishes(LabResults)
diagnostician = flock.agent("diagnostician").consumes(XRayAnalysis, LabResults).publishes(Diagnosis)
# Add performance_analyzer? Just subscribe it:
performance = flock.agent("perf").consumes(Scan).publishes(PerfAnalysis)
# Done. No graph rewiring. Diagnostician can optionally consume it.What just happened:
- β Parallel execution - Radiologist and lab_tech run concurrently (automatic)
- β Dependency resolution - Diagnostician waits for both inputs (automatic)
- β Loose coupling - Agents don't know about each other, just data types
- β Scalable - O(n) complexity, not O(nΒ²) edges
This is not a new idea. Blackboard architecture has powered AI systems since the 1970s (Hearsay-II, HASP/SIAP, BB1). We're applying proven patterns to modern LLMs.
pip install flock-core
export OPENAI_API_KEY="sk-..."
export DEFAULT_MODEL="openai/gpt-4.1" # Optional, has defaultsimport os
import asyncio
from pydantic import BaseModel, Field
from flock import Flock, flock_type
# 1. Define typed artifacts
@flock_type
class CodeSubmission(BaseModel):
code: str
language: str
@flock_type
class BugAnalysis(BaseModel):
bugs_found: list[str]
severity: str = Field(pattern="^(Critical|High|Medium|Low|None)$")
confidence: float = Field(ge=0.0, le=1.0)
@flock_type
class SecurityAnalysis(BaseModel):
vulnerabilities: list[str]
risk_level: str = Field(pattern="^(Critical|High|Medium|Low|None)$")
@flock_type
class FinalReview(BaseModel):
overall_assessment: str = Field(pattern="^(Approve|Approve with Changes|Reject)$")
action_items: list[str]
# 2. Create the blackboard
flock = Flock(os.getenv("DEFAULT_MODEL", "openai/gpt-4.1"))
# 3. Agents subscribe to types (NO graph wiring!)
bug_detector = flock.agent("bug_detector").consumes(CodeSubmission).publishes(BugAnalysis)
security_auditor = flock.agent("security_auditor").consumes(CodeSubmission).publishes(SecurityAnalysis)
# AND gate: This agent AUTOMATICALLY waits for BOTH analyses
final_reviewer = flock.agent("final_reviewer").consumes(BugAnalysis, SecurityAnalysis).publishes(FinalReview)
# 4. Run with real-time dashboard
async def main():
await flock.serve(dashboard=True)
asyncio.run(main())What happened:
- Bug detector and security auditor ran in parallel
- Final reviewer automatically waited for both
- Zero prompts written - types defined the behavior
- Zero graph edges - subscriptions created the workflow
- Full type safety - Pydantic validates all outputs
Every piece of data is a validated Pydantic model:
@flock_type
class PatientDiagnosis(BaseModel):
condition: str = Field(min_length=10)
confidence: float = Field(ge=0.0, le=1.0)
recommended_treatment: list[str] = Field(min_length=1)
follow_up_required: boolBenefits:
- Runtime validation ensures quality
- Field constraints prevent bad outputs
- Self-documenting data structures
- Version-safe (types survive model updates)
AND Gates - Wait for ALL types:
# Wait for BOTH types before triggering
diagnostician = flock.agent("diagnostician").consumes(XRayAnalysis, LabResults).publishes(Diagnosis)OR Gates - Trigger on ANY type:
# Trigger when EITHER type arrives (via chaining)
alert_handler = flock.agent("alerts").consumes(SystemAlert).consumes(UserAlert).publishes(Response)Count-Based AND Gates:
# Wait for THREE Orders
aggregator = flock.agent("aggregator").consumes(Order, Order, Order).publishes(BatchSummary)
# Wait for TWO Images AND ONE Metadata
validator = flock.agent("validator").consumes(Image, Image, Metadata).publishes(ValidationResult)Flock supports fan-out publishing so a single agent execution can generate multiple artifacts:
fan_out=10β fixed count (10 artifacts of a type).fan_out=(min, max)β dynamic fan-out where the engine decides how many artifacts to generate within a range, based on input complexity and quality filters.
from flock.core import FanOutRange
idea_generator = (
flock.agent("idea_generator")
.consumes(ProductBrief)
.publishes(
ProductIdea,
fan_out=(5, 20), # engine decides 5β20 ideas
where=lambda i: i.score >= 8, # filter AFTER range checks
)
)Dynamic fan-out is fully backward compatible with existing fan_out=int usage and is described in detail in the Fan-Out Publishing guide and examples/02-patterns/publish/06_dynamic_fan_out.py.
Match artifacts by MEANING, not keywords:
# Install semantic extras
pip install flock-core[semantic]
# Agents route based on semantic similarity
security_team = (
flock.agent("security_team")
.consumes(SupportTicket, semantic_match="security vulnerability exploit")
.publishes(SecurityAlert)
)
billing_team = (
flock.agent("billing_team")
.consumes(SupportTicket, semantic_match="payment charge refund billing")
.publishes(BillingResponse)
)
# Tickets route automatically based on MEANING!
# "SQL injection" β Security Team (no keyword "security" needed!)
# "charged twice" β Billing Team (semantic match to "payment")Advanced semantic filtering:
# Custom threshold (0.0-1.0, default 0.4)
.consumes(Ticket, semantic_match="urgent", semantic_threshold=0.7) # Strict
# Multiple criteria (ALL must match)
.consumes(Doc, semantic_match=["security", "compliance"]) # AND logic
# Field-specific matching
.consumes(Article, semantic_match={
"query": "machine learning",
"threshold": 0.6,
"field": "abstract" # Only match this field
})Why this is revolutionary:
- β No keyword brittleness - "SQL injection" matches "security vulnerability"
- β Better recall - Catches semantically similar content
- β Local embeddings - all-MiniLM-L6-v2 model (~90MB), no external API
- β Fast & cached - LRU cache with 10k entries, ~15ms per embedding
Predicates - Smart Filtering:
# Only process critical cases
urgent_care = flock.agent("urgent").consumes(
Diagnosis,
where=lambda d: d.severity in ["Critical", "High"]
)BatchSpec - Cost Optimization:
# Process 25 at once = 96% cheaper API calls!
payment_processor = flock.agent("payments").consumes(
Transaction,
batch=BatchSpec(size=25, timeout=timedelta(seconds=30))
)JoinSpec - Data Correlation:
# Match orders + shipments by ID
customer_service = flock.agent("notifications").consumes(
Order,
Shipment,
join=JoinSpec(by=lambda x: x.order_id, within=timedelta(hours=24))
)Combined - Production Pipelines:
# Correlate sensors, THEN batch for analysis
quality_control = flock.agent("qc").consumes(
TemperatureSensor,
PressureSensor,
join=JoinSpec(by=lambda x: x.device_id, within=timedelta(seconds=30)),
batch=BatchSpec(size=5, timeout=timedelta(seconds=45))
)Produce multiple outputs from a single execution:
# Generate 10 diverse product ideas from one brief
idea_generator = (
flock.agent("generator")
.consumes(ProductBrief)
.publishes(ProductIdea, fan_out=10)
)
# With quality filtering
idea_generator = (
flock.agent("generator")
.consumes(ProductBrief)
.publishes(
ProductIdea,
fan_out=20, # Generate 20 candidates
where=lambda idea: idea.score >= 8.0 # Only publish score >= 8
)
)Multi-Output Fan-Out (The Mind-Blowing Part):
# Generate 3 of EACH type = 9 total artifacts in ONE LLM call!
multi_master = (
flock.agent("multi_master")
.consumes(Idea)
.publishes(Movie, MovieScript, MovieCampaign, fan_out=3)
)
# Single execution produces:
# - 3 complete Movies (title, genre, cast, plot)
# - 3 complete MovieScripts (characters, scenes, pages)
# - 3 complete MovieCampaigns (taglines, posters)
# = 9 complex artifacts, 100+ fields, full validation, ONE LLM call!Run agents on schedules, not just events:
from datetime import timedelta, time
# Periodic health checks (every 30 seconds)
health_monitor = (
flock.agent("health_monitor")
.schedule(every=timedelta(seconds=30))
.publishes(HealthStatus)
)
# Daily reports (5 PM every day)
daily_report = (
flock.agent("daily_report")
.schedule(at=time(hour=17, minute=0))
.publishes(DailyReport)
)
# Cron expressions (every weekday at 9 AM UTC)
workday_report = (
flock.agent("workday_report")
.schedule(cron="0 9 * * 1-5") # Mon-Fri at 9 AM
.publishes(WorkdayReport)
)
# One-time scheduled task
scheduled_task = (
flock.agent("scheduled_task")
.schedule(at=datetime(2025, 12, 25, 9, 0)) # Christmas 9 AM
.publishes(TaskResult)
)Timer agents receive empty input with timer metadata:
async def health_check(ctx: AgentContext) -> HealthStatus:
# ctx.artifacts = [] # Empty for timer triggers
# ctx.trigger_type == "timer" # Know it's timer-triggered
# ctx.timer_iteration # How many times fired (0, 1, 2...)
# ctx.fire_time # When timer fired
# Access filtered blackboard context
recent_errors = ctx.get_artifacts(LogEntry) # Only ERROR logs
return HealthStatus(healthy=len(recent_errors) == 0)Why this is powerful:
- β No event dependency - Agents run independently on time
- β
Context filtering - Combine
.schedule()+.consumes()for filtered context - β Precise timing - Interval, daily, cron, or one-time execution
- β Lifecycle control - Initial delays, repeat limits, graceful shutdown
- β Production-ready - Timer state tracking, drift prevention, crash recovery
π Timer Scheduling Guide β
Built-in security (not bolt-on):
# Multi-tenancy (SaaS isolation)
agent.publishes(CustomerData, visibility=TenantVisibility(tenant_id="customer_123"))
# Explicit allowlist (HIPAA compliance)
agent.publishes(MedicalRecord, visibility=PrivateVisibility(agents={"physician", "nurse"}))
# Role-based access control
agent.identity(AgentIdentity(name="analyst", labels={"clearance:secret"}))
agent.publishes(IntelReport, visibility=LabelledVisibility(required_labels={"clearance:secret"}))
# Time-delayed release
artifact.visibility = AfterVisibility(ttl=timedelta(hours=24), then=PublicVisibility())Architecturally impossible to bypass: Every context provider inherits from BaseContextProvider, which enforces visibility filtering automatically. You literally cannot create a provider that forgets to check permissions.
Control what agents see:
from flock.context_provider import FilteredContextProvider, PasswordRedactorProvider
# Global filtering - all agents see only urgent items
flock = Flock(
"openai/gpt-4.1",
context_provider=FilteredContextProvider(FilterConfig(tags={"urgent"}))
)
# Per-agent overrides
error_agent.context_provider = FilteredContextProvider(FilterConfig(tags={"ERROR"}))
# Production-ready password filtering
flock = Flock(
"openai/gpt-4.1",
context_provider=PasswordRedactorProvider() # Auto-redacts secrets!
)Built-in providers (all visibility-filtered):
DefaultContextProvider- Full blackboard accessCorrelatedContextProvider- Workflow isolationRecentContextProvider- Token cost controlTimeWindowContextProvider- Time-based filteringSemanticContextProvider- Similarity-based retrieval (New!)EmptyContextProvider- Stateless agentsFilteredContextProvider- Custom filtering
Semantic Context Provider:
from flock.semantic import SemanticContextProvider
# Find similar historical incidents
provider = SemanticContextProvider(
query_text="database connection timeout",
threshold=0.4,
limit=5,
artifact_type=Incident,
where=lambda a: a.payload["resolved"] is True
)
similar = await provider.get_context(store)π Context Providers Guide β
Production durability with SQLite:
from flock.store import SQLiteBlackboardStore
store = SQLiteBlackboardStore(".flock/blackboard.db")
await store.ensure_schema()
flock = Flock("openai/gpt-4.1", store=store)What you get:
- Long-lived artifacts with full history
- Historical APIs with pagination
- Dashboard integration with retention windows
- CLI tools for maintenance and retention policies
Batch-then-execute pattern:
# β
EFFICIENT: Batch publish, then run in parallel
for review in customer_reviews:
await flock.publish(review) # Just scheduling work
await flock.run_until_idle() # All sentiment_analyzer agents run concurrently!
# Get all results
analyses = await flock.store.get_by_type(SentimentAnalysis)
# 100 analyses in ~1x single review time!Composable lifecycle hooks:
from flock.components import AgentComponent
class LoggingComponent(AgentComponent):
async def on_pre_evaluate(self, agent, ctx, inputs):
logger.info(f"Agent {agent.name} evaluating: {inputs}")
return inputs
async def on_post_evaluate(self, agent, ctx, inputs, result):
logger.info(f"Agent {agent.name} produced: {result}")
return result
analyzer.with_utilities(LoggingComponent())Built-in components: Rate limiting, caching, metrics, budget tracking, circuit breakers, deduplication
π Agent Components Guide β
Extend Flock's HTTP API with custom middleware, routes, and lifecycle management:
from flock.components.server import ServerComponent
class CustomAPIComponent(ServerComponent):
async def on_startup(self, orchestrator):
# Add custom routes, middleware, or startup logic
pass
async def on_shutdown(self, orchestrator):
# Cleanup resources
pass
# Register server component
flock.add_server_component(CustomAPIComponent())Built-in server components:
- TimerComponent - Manages scheduled agent execution
- ControlRoutesComponent - Agent/artifact management API
- GraphRoutesComponent - Dashboard graph data API
- TraceComponent - OpenTelemetry trace viewer
- StaticFilesComponent - Dashboard UI serving
Why this matters:
- β Modular architecture - Add features without modifying core
- β Lifecycle hooks - Startup/shutdown coordination
- β Custom endpoints - Extend API with domain-specific routes
- β Middleware support - Authentication, logging, rate limiting
- β Production-ready - Proper initialization order, error handling
π Orchestrator Components Guide β
Built-in safeguards:
# Circuit breakers (auto-added)
flock = Flock("openai/gpt-4.1") # CircuitBreakerComponent(max_iterations=1000)
# Feedback loop protection
critic.prevent_self_trigger(True) # Won't trigger itself infinitely
# Best-of-N execution
agent.best_of(5, score=lambda result: result.metrics["confidence"])Start with one line:
await flock.serve(dashboard=True)
Agent View: Real-time communication patterns
Features:
- Dual Modes: Agent view & Blackboard view
- Real-Time Updates: WebSocket streaming with live activation
- Interactive Graph: Drag, zoom, pan, 5 auto-layout algorithms
- Advanced Filtering: Correlation ID tracking, time ranges, autocomplete
- Control Panel: Publish artifacts, invoke agents from UI
- Keyboard Shortcuts: WCAG 2.1 AA compliant
Blackboard View: Data lineage and transformations
Jaeger-style tracing with 7 modes:
Timeline view with span hierarchies
7 Trace Modes:
- Timeline - Waterfall visualization
- Statistics - Sortable duration/error tracking
- RED Metrics - Rate, Errors, Duration monitoring
- Dependencies - Service communication analysis
- DuckDB SQL - Interactive query editor with CSV export
- Configuration - Real-time filtering
- Guide - Built-in documentation
One environment variable enables tracing:
export FLOCK_AUTO_TRACE=true
export FLOCK_TRACE_FILE=true
python your_app.py
# Traces stored in .flock/traces.duckdbAI-queryable debugging:
import duckdb
conn = duckdb.connect('.flock/traces.duckdb', read_only=True)
# Find bottlenecks
slow_ops = conn.execute("""
SELECT name, AVG(duration_ms) as avg_ms, COUNT(*) as count
FROM spans
WHERE duration_ms > 1000
GROUP BY name
ORDER BY avg_ms DESC
""").fetchall()
# Find errors with full context
errors = conn.execute("""
SELECT name, status_description,
json_extract(attributes, '$.input') as input,
json_extract(attributes, '$.output') as output
FROM spans
WHERE status_code = 'ERROR'
""").fetchall()Real debugging:
You: "My pizza agent is slow"
AI: [queries DuckDB]
"DSPyEngine.evaluate takes 23s on average.
Input size: 50KB of conversation history.
Recommendation: Limit context to last 5 messages."
Production-ready HTTP endpoints:
await flock.serve(dashboard=True) # API + Dashboard on port 8344
# API docs: http://localhost:8344/docsKey endpoints:
POST /api/v1/artifacts- Publish to blackboardGET /api/v1/artifacts- Query with filtering/paginationPOST /api/v1/agents/{name}/run- Direct agent invocationGET /api/v1/correlations/{id}/status- Workflow trackingGET /healthandGET /metrics- Monitoring
Features:
- β
OpenAPI 3.0 documentation at
/docs - β Pydantic validation
- β Correlation tracking
- β Consumption metadata
- β Prometheus-compatible metrics
| Dimension | Graph-Based | Chat-Based | Flock (Blackboard) |
|---|---|---|---|
| Pattern | Directed graph | Round-robin chat | Blackboard subscriptions |
| Coordination | Manual edges | Message passing | Type subscriptions |
| Parallelism | Manual split/join | Sequential | Automatic |
| Type Safety | Varies | Text messages | Pydantic + validation |
| Coupling | Tight | Medium | Loose |
| Adding Agents | Rewrite graph | Update flow | Just subscribe |
| Testing | Full graph | Full group | Individual isolation |
| Security | DIY | DIY | Built-in (5 types) |
| Scalability | O(nΒ²) | Limited | O(n) |
β Use Flock when you need:
- Parallel agent execution (automatic)
- Type-safe outputs (Pydantic validation)
- Minimal prompt engineering (schemas define behavior)
- Dynamic agent addition (no rewiring)
- Testing in isolation (unit test individual agents)
- Built-in security (HIPAA, SOC2, multi-tenancy)
- 10+ agents (linear complexity)
- Semantic routing (meaning-based matching)
- Extensive ecosystem integration needed
- Workflow is inherently sequential
- Battle-tested maturity required
- Team has existing expertise
- Conversation-based development preferred
- Turn-taking dialogue use case
- Specific ecosystem features needed
You trade:
- Ecosystem maturity (smaller community)
- Extensive documentation (catching up)
- Battle-tested age (newer architecture)
You gain:
- Better scalability (O(n) vs O(nΒ²))
- Type safety (validation vs hope)
- Cleaner architecture (loose coupling)
- Production safety (built-in circuit breakers)
- Security model (5 visibility types)
- Semantic intelligence (meaning-based routing)
Different frameworks for different priorities. Choose based on what matters to your team.
β Production-ready core:
- 1300+ tests with >75% coverage (>90% on critical paths)
- Blackboard orchestrator with typed artifacts
- Parallel + sequential execution (automatic)
- Zero-trust security (5 visibility types)
- Semantic subscriptions with local embeddings
- Timer-based agent scheduling (interval, daily, cron, one-time)
- Server components for extensible HTTP API
- Circuit breakers and feedback prevention
- OpenTelemetry + DuckDB tracing
- Real-time dashboard with 7-mode trace viewer
- MCP integration (Model Context Protocol)
- Best-of-N, batching, joins, fan-out
- Type-safe retrieval API
- SQLite persistent store
- Advanced retry logic (basic only)
- Event replay (no Kafka yet)
- Kubernetes-native deployment (no Helm)
- OAuth/RBAC (dashboard has no auth)
All missing features planned for v1.0 (Q4 2025)
β Good fit right now:
- Startups/MVPs (fast iteration, type safety)
- Internal tools (in-memory acceptable)
- Research/prototyping (clean architecture)
- Medium-scale systems (10-50 agents, 1000s of artifacts)
- Enterprise persistence (multi-region, HA)
- Compliance auditing (immutable logs)
- Multi-tenancy SaaS (OAuth/SSO)
- Mission-critical 99.99% uptime
Flock 0.5.0 is production-ready for the right use cases. Know your requirements.
# Install
pip install flock-core
# With semantic features
pip install flock-core[semantic]
# Set API key
export OPENAI_API_KEY="sk-..."
# Try examples
git clone https://github.com/whiteducksoftware/flock-flow.git
cd flock-flow
# CLI examples
uv run python examples/01-cli/01_declarative_pizza.py
# Dashboard examples
uv run python examples/02-dashboard/01_declarative_pizza.py
# Semantic routing
uv run python examples/08-semantic/01_intelligent_ticket_routing.pyLearn by doing:
- π Examples README - Complete learning path
- π₯οΈ CLI Examples - Console output (01-12)
- π Dashboard Examples - Interactive visualization (01-12)
- π§ Semantic Examples - Meaning-based routing
- π Documentation - Full docs
Challenge: Analyze signals in parallel, correlate within time windows, maintain audit trails.
# Parallel signal analyzers
volatility = flock.agent("volatility").consumes(MarketData).publishes(VolatilityAlert)
sentiment = flock.agent("sentiment").consumes(NewsArticle).publishes(SentimentAlert)
# Trade execution waits for CORRELATED signals
trader = flock.agent("trader").consumes(
VolatilityAlert, SentimentAlert,
join=JoinSpec(within=timedelta(minutes=5))
).publishes(TradeOrder)Challenge: Multi-modal fusion with access controls, audit trails, zero-trust.
# Privacy controls built-in
radiology.publishes(XRayAnalysis, visibility=PrivateVisibility(agents={"diagnostician"}))
lab.publishes(LabResults, visibility=TenantVisibility(tenant_id="patient_123"))
# Diagnostician waits for BOTH with role-based access
diagnostician = flock.agent("diagnostician").consumes(XRayAnalysis, LabResults).publishes(Diagnosis)Challenge: Route support tickets to specialized teams based on meaning.
# Semantic routing (NO keyword matching!)
security_team.consumes(Ticket, semantic_match="security vulnerability exploit")
billing_team.consumes(Ticket, semantic_match="payment charge refund billing")
tech_support.consumes(Ticket, semantic_match="technical issue error bug")
# "SQL injection" β Security (no "security" keyword needed!)
# "charged twice" β Billing (semantic match!)
# "app crashes" β Tech Support (semantic understanding!)π Full Use Cases β
We're building Flock in the open. See Contributing Guide.
Before contributing:
- Architecture Overview - Codebase organization
- Error Handling - Required patterns
- Async Patterns - Standards
Quality standards:
- All tests must pass
- Coverage requirements met
- Code formatted with Ruff
Target: Q4 2025
See ROADMAP.md for detailed status and tracking.
Key initiatives:
- Reliability: Advanced retry, error recovery, distributed tracing
- Persistence: Multi-region stores, event replay, Kafka integration
- Security: OAuth/RBAC, audit logging, compliance tooling
- Operations: Kubernetes deployment, Helm charts, monitoring
- Quality: Performance benchmarks, stress testing, migration tools
Flock makes different architectural choices:
Instead of:
- β Prompt engineering β β Declarative type contracts
- β Workflow graphs β β Blackboard subscriptions
- β Keyword matching β β Semantic intelligence
- β Manual parallelization β β Automatic concurrent execution
- β Bolt-on security β β Zero-trust visibility controls
- β Hope-based debugging β β AI-queryable distributed traces
These are architectural decisions with real tradeoffs.
Different frameworks for different priorities. Choose based on what matters to your team.
Built with β€οΈ by white duck GmbH
"Declarative contracts eliminate prompt hell. Blackboard architecture eliminates graph spaghetti. Semantic intelligence eliminates keyword brittleness. Proven patterns applied to modern LLMs."
β Star on GitHub | π Documentation | π Try Examples | πΌ Enterprise Support
Last Updated: October 19, 2025 Version: Flock 0.5.0 (Blackboard Edition) Status: Production-Ready Core, Enterprise Features Roadmapped





