Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
547 changes: 478 additions & 69 deletions examples/cli/ensemble_cli.py

Large diffs are not rendered by default.

52 changes: 45 additions & 7 deletions multimind/core/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,19 +115,28 @@ async def route(
**kwargs
) -> Union[GenerationResult, EmbeddingResult, ImageAnalysisResult]:
"""Route a request to the appropriate provider(s)."""
if task_type not in self.task_configs:
provider_override = kwargs.get("provider")
if not provider_override and task_type not in self.task_configs:
raise ValueError(f"No configuration found for task type: {task_type}")

config = self.task_configs[task_type]
start_time = time.time()

try:
if config.routing_strategy == RoutingStrategy.ENSEMBLE:
result = await self._handle_ensemble(task_type, input_data, config, **kwargs)
elif config.routing_strategy == RoutingStrategy.CASCADE:
result = await self._handle_cascade(task_type, input_data, config, **kwargs)
if provider_override:
result = await self._route_specific_provider(
provider_override,
task_type,
input_data,
**kwargs
)
else:
result = await self._handle_single_provider(task_type, input_data, config, **kwargs)
config = self.task_configs[task_type]
if config.routing_strategy == RoutingStrategy.ENSEMBLE:
result = await self._handle_ensemble(task_type, input_data, config, **kwargs)
elif config.routing_strategy == RoutingStrategy.CASCADE:
result = await self._handle_cascade(task_type, input_data, config, **kwargs)
else:
result = await self._handle_single_provider(task_type, input_data, config, **kwargs)

# Record successful request metrics
latency_ms = (time.time() - start_time) * 1000
Expand Down Expand Up @@ -179,6 +188,35 @@ async def route(
)
raise

async def _route_specific_provider(
self,
provider_name: str,
task_type: TaskType,
input_data: Any,
**kwargs
) -> Union[GenerationResult, EmbeddingResult, ImageAnalysisResult]:
"""
Route directly to a specific provider when explicitly requested.
This bypasses task configuration while still leveraging the same execution pipeline.
"""
if provider_name not in self.providers:
raise ValueError(f"Provider '{provider_name}' is not registered with the router")

single_provider_config = TaskConfig(
preferred_providers=[provider_name],
fallback_providers=[],
routing_strategy=RoutingStrategy.COST_BASED
)
call_kwargs = dict(kwargs)
call_kwargs.pop("provider", None)
return await self._handle_single_provider(
task_type,
input_data,
single_provider_config,
use_adaptive_routing=False,
**call_kwargs
)

async def _handle_single_provider(
self,
task_type: TaskType,
Expand Down
128 changes: 100 additions & 28 deletions multimind/ensemble/advanced.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,23 +127,24 @@ async def _weighted_voting(
) -> EnsembleResult:
"""Combine results using weighted voting (adaptive if enabled)."""
if use_adaptive_weights or not weights:
providers = [result.provider for result in results]
providers = [self._get_provider_name(result) for result in results]
weights = self.performance_tracker.get_all_weights(providers)
# Normalize weights
total_weight = sum(weights.values())
normalized_weights = {k: v/total_weight for k, v in weights.items()}
# Calculate weighted scores for each result
weighted_scores = []
for result in results:
weight = normalized_weights.get(result.provider, 0.0)
provider_name = self._get_provider_name(result)
weight = normalized_weights.get(provider_name, 0.0)
weighted_scores.append((result, weight))
# Select result with highest weight
best_result, best_weight = max(weighted_scores, key=lambda x: x[1])
return EnsembleResult(
result=best_result,
confidence=ConfidenceScore(
score=best_weight,
explanation=f"Selected result from {best_result.provider} with adaptive weight {best_weight:.2f}"
explanation=f"Selected result from {self._get_provider_name(best_result)} with adaptive weight {best_weight:.2f}"
),
provider_votes=normalized_weights
)
Expand Down Expand Up @@ -171,15 +172,15 @@ async def _confidence_cascade(
return EnsembleResult(
result=result,
confidence=confidence,
provider_votes={r.provider: c.score for r, c in confidence_scores}
provider_votes={self._get_provider_name(r): c.score for r, c in confidence_scores}
)

# If no result meets threshold, return highest confidence
best_result, best_confidence = confidence_scores[0]
return EnsembleResult(
result=best_result,
confidence=best_confidence,
provider_votes={r.provider: c.score for r, c in confidence_scores}
provider_votes={self._get_provider_name(r): c.score for r, c in confidence_scores}
)

async def _parallel_voting(
Expand All @@ -203,7 +204,7 @@ async def _parallel_voting(

# Normalize scores
total_score = sum(score for _, score in scores)
normalized_scores = {r.provider: s/total_score for r, s in scores}
normalized_scores = {self._get_provider_name(r): s/total_score for r, s in scores}

# Select best result
best_result, best_score = max(scores, key=lambda x: x[1])
Expand All @@ -212,7 +213,7 @@ async def _parallel_voting(
result=best_result,
confidence=ConfidenceScore(
score=best_score,
explanation=f"Selected result from {best_result.provider} with LLM evaluation score {best_score:.2f}"
explanation=f"Selected result from {self._get_provider_name(best_result)} with LLM evaluation score {best_score:.2f}"
),
provider_votes=normalized_scores
)
Expand All @@ -234,7 +235,7 @@ async def _majority_voting(
# Fallback to string equality if no embedder available
embedder = None

texts = [str(r.result) for r in results]
texts = [self._extract_result_content(r) for r in results]
if embedder is not None:
embeddings = embedder.encode(texts, convert_to_tensor=True)
import torch
Expand Down Expand Up @@ -264,19 +265,19 @@ def get_score(r):
vote_count = len(largest_group)
total_votes = len(results)
explanation = f"Selected result by semantic majority voting: {vote_count}/{total_votes} semantically similar."
provider_votes = {r.provider: 1.0 if idx in largest_group else 0.0 for idx, r in enumerate(results)}
provider_votes = {self._get_provider_name(r): 1.0 if idx in largest_group else 0.0 for idx, r in enumerate(results)}
else:
# Fallback: string equality
result_counts = {}
for result in results:
key = str(result.result)
key = self._extract_result_content(result)
if key not in result_counts:
result_counts[key] = (result, 0)
result_counts[key] = (result, result_counts[key][1] + 1)
best_result, vote_count = max(result_counts.values(), key=lambda x: x[1])
total_votes = len(results)
explanation = f"Selected result with {vote_count}/{total_votes} votes (string equality fallback)"
provider_votes = {r.provider: 1.0 for r in results}
provider_votes = {self._get_provider_name(r): 1.0 for r in results}
return EnsembleResult(
result=best_result,
confidence=ConfidenceScore(
Expand All @@ -303,15 +304,15 @@ async def _rank_based(
borda_scores = {}
for result, ranking in zip(results, rankings):
score = self._calculate_borda_score(ranking, len(results))
borda_scores[result.provider] = score
borda_scores[self._get_provider_name(result)] = score

# Normalize scores
total_score = sum(borda_scores.values())
normalized_scores = {k: v/total_score for k, v in borda_scores.items()}

# Select result with highest Borda score
best_provider = max(borda_scores.items(), key=lambda x: x[1])[0]
best_result = next(r for r in results if r.provider == best_provider)
best_result = next(r for r in results if self._get_provider_name(r) == best_provider)

return EnsembleResult(
result=best_result,
Expand All @@ -322,16 +323,40 @@ async def _rank_based(
provider_votes=normalized_scores
)

def _extract_result_content(self, result: Union[GenerationResult, EmbeddingResult, ImageAnalysisResult]) -> str:
"""Extract text content from any result type for evaluation."""
if isinstance(result, GenerationResult):
return result.text
elif isinstance(result, EmbeddingResult):
return f"Embedding vector of length {len(result.embedding)}"
elif isinstance(result, ImageAnalysisResult):
# Combine text, captions, and objects for evaluation
parts = []
if result.text:
parts.append(f"Text: {result.text}")
if result.captions:
parts.append(f"Captions: {', '.join(result.captions)}")
if result.objects:
parts.append(f"Objects: {len(result.objects)} detected")
return " | ".join(parts) if parts else "No content extracted"
else:
return str(result)

def _get_provider_name(self, result: Union[GenerationResult, EmbeddingResult, ImageAnalysisResult]) -> str:
"""Extract provider name from any result type."""
return getattr(result, 'provider', None) or getattr(result, 'provider_name', 'unknown')

async def _evaluate_confidence(
self,
result: Union[GenerationResult, EmbeddingResult, ImageAnalysisResult],
task_type: TaskType,
**kwargs
) -> ConfidenceScore:
"""Evaluate confidence in a result using LLM."""
content = self._extract_result_content(result)
prompt = f"""
Evaluate the confidence in this {task_type} result:
{result.result}
{content}

Consider:
1. Completeness of the response
Expand All @@ -342,21 +367,33 @@ async def _evaluate_confidence(
Provide a confidence score (0.0 to 1.0) and explanation.
"""

provider_name = self._get_provider_name(result)
evaluation_models = kwargs.get("evaluation_models", {})
evaluation_providers = kwargs.get("evaluation_providers", {})
default_model = kwargs.get("evaluation_model", "gpt-4")
eval_model = evaluation_models.get(provider_name, default_model)
eval_provider = evaluation_providers.get(provider_name, kwargs.get("evaluation_provider", provider_name))
route_kwargs = {
k: v for k, v in kwargs.items()
if k not in {"evaluation_models", "evaluation_providers", "evaluation_model", "evaluation_provider"}
}
evaluation = await self.router.route(
TaskType.TEXT_GENERATION,
prompt,
model="gpt-4",
**kwargs
provider=eval_provider,
model=eval_model,
**route_kwargs
)

# Parse confidence score from evaluation
score = self._parse_confidence_score(evaluation.result)
explanation = self._parse_confidence_explanation(evaluation.result)
eval_content = self._extract_result_content(evaluation)
score = self._parse_confidence_score(eval_content)
explanation = self._parse_confidence_explanation(eval_content)

return ConfidenceScore(
score=score,
explanation=explanation,
metadata={"raw_evaluation": evaluation.result}
metadata={"raw_evaluation": eval_content}
)

async def _evaluate_with_llm(
Expand All @@ -366,9 +403,10 @@ async def _evaluate_with_llm(
**kwargs
) -> str:
"""Evaluate a result using LLM."""
content = self._extract_result_content(result)
prompt = f"""
Evaluate this {task_type} result:
{result.result}
{content}

Consider:
1. Accuracy and correctness
Expand All @@ -379,14 +417,25 @@ async def _evaluate_with_llm(
Provide a detailed evaluation with a numerical score (0-100).
"""

provider_name = self._get_provider_name(result)
evaluation_models = kwargs.get("evaluation_models", {})
evaluation_providers = kwargs.get("evaluation_providers", {})
default_model = kwargs.get("evaluation_model", "gpt-4")
eval_model = evaluation_models.get(provider_name, default_model)
eval_provider = evaluation_providers.get(provider_name, kwargs.get("evaluation_provider", provider_name))
route_kwargs = {
k: v for k, v in kwargs.items()
if k not in {"evaluation_models", "evaluation_providers", "evaluation_model", "evaluation_provider"}
}
evaluation = await self.router.route(
TaskType.TEXT_GENERATION,
prompt,
model="gpt-4",
**kwargs
provider=eval_provider,
model=eval_model,
**route_kwargs
)

return evaluation.result
return self._extract_result_content(evaluation)

async def _get_provider_ranking(
self,
Expand All @@ -395,9 +444,10 @@ async def _get_provider_ranking(
**kwargs
) -> List[str]:
"""Get ranking of results from a provider."""
content = self._extract_result_content(result)
prompt = f"""
Rank the following {task_type} results from best to worst:
{result.result}
{content}

Consider:
1. Quality and accuracy
Expand All @@ -408,14 +458,36 @@ async def _get_provider_ranking(
Provide a ranked list of provider names.
"""

provider_name = self._get_provider_name(result)
evaluation_models = kwargs.get("evaluation_models", {})
evaluation_providers = kwargs.get("evaluation_providers", {})
default_model = kwargs.get("evaluation_model", "gpt-4")

# Use provider-specific model if available, otherwise use smart defaults
eval_model = evaluation_models.get(provider_name)
if not eval_model:
# Use provider-appropriate default models
if provider_name == "ollama":
eval_model = "mistral" # Ollama doesn't have gpt-4
elif provider_name == "anthropic" or provider_name == "claude":
eval_model = "claude-3-sonnet"
else:
eval_model = default_model # Use gpt-4 for OpenAI and others

eval_provider = evaluation_providers.get(provider_name, kwargs.get("evaluation_provider", provider_name))
route_kwargs = {
k: v for k, v in kwargs.items()
if k not in {"evaluation_models", "evaluation_providers", "evaluation_model", "evaluation_provider"}
}
ranking = await self.router.route(
TaskType.TEXT_GENERATION,
prompt,
model="gpt-4",
**kwargs
provider=eval_provider,
model=eval_model,
**route_kwargs
)

return self._parse_ranking(ranking.result)
return self._parse_ranking(self._extract_result_content(ranking))

def _parse_confidence_score(self, evaluation: str) -> float:
"""Parse confidence score from evaluation text."""
Expand Down Expand Up @@ -519,7 +591,7 @@ def tune_weights_with_optuna(self, results, task_type, eval_fn, n_trials=30):
if not OPTUNA_AVAILABLE:
raise ImportError("Optuna is required for hyperparameter tuning. Please install optuna.")

providers = [r.provider for r in results]
providers = [self._get_provider_name(r) for r in results]
def objective(trial):
weights = {p: trial.suggest_float(f"weight_{p}", 0.01, 1.0) for p in providers}
# Normalize
Expand Down
4 changes: 3 additions & 1 deletion multimind/providers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@

from .claude import ClaudeProvider
from .openai import OpenAIProvider
from .ollama import OllamaProvider

__all__ = [
"ClaudeProvider",
"OpenAIProvider"
"OpenAIProvider",
"OllamaProvider"
]
4 changes: 2 additions & 2 deletions multimind/providers/claude.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ async def chat(
) / 1000 # Convert to USD

return GenerationResult(
text=result,
tokens_used=tokens_used,
provider_name="claude",
model_name=model,
result=result,
tokens_used=tokens_used,
latency_ms=latency_ms,
cost_estimate_usd=cost
)
Expand Down
Loading
Loading