Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/ouroboros/auto/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
before starting execution.
"""

from ouroboros.auto.answerer import AutoAnswer, AutoAnswerer, AutoAnswerSource
from ouroboros.auto.answerer import AutoAnswer, AutoAnswerer, AutoAnswerMetadata, AutoAnswerSource
from ouroboros.auto.grading import GradeGate, GradeResult, SeedGrade
from ouroboros.auto.interview_driver import AutoInterviewDriver, AutoInterviewResult, InterviewTurn
from ouroboros.auto.ledger import LedgerEntry, LedgerSection, SeedDraftLedger
Expand All @@ -17,6 +17,7 @@

__all__ = [
"AutoAnswer",
"AutoAnswerMetadata",
"AutoAnswerSource",
"AutoAnswerer",
"AutoInterviewDriver",
Expand Down
10 changes: 10 additions & 0 deletions src/ouroboros/auto/answerer.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ class AutoBlocker:
question: str


@dataclass(frozen=True, slots=True)
class AutoAnswerMetadata:
"""Structured provenance for auto answers that need audit context."""

risk: str | None = None
confidence: float | None = None
provenance: tuple[str, ...] = ()


@dataclass(frozen=True, slots=True)
class AutoAnswer:
"""Answer plus structured ledger updates."""
Expand All @@ -76,6 +85,7 @@ class AutoAnswer:
assumptions: list[str] = field(default_factory=list)
non_goals: list[str] = field(default_factory=list)
blocker: AutoBlocker | None = None
metadata: AutoAnswerMetadata = field(default_factory=AutoAnswerMetadata)

@property
def prefixed_text(self) -> str:
Expand Down
149 changes: 144 additions & 5 deletions src/ouroboros/auto/driver_answerer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
AutoAnswer,
AutoAnswerContext,
AutoAnswerer,
AutoAnswerMetadata,
AutoAnswerSource,
AutoBlocker,
)
Expand Down Expand Up @@ -66,6 +67,13 @@ async def answer(
source=AutoAnswerSource.BLOCKER,
confidence=1.0,
blocker=AutoBlocker(reason=reason, question=question),
metadata=_answer_metadata(
backend=self.backend or "driver",
brake=self.brake,
risk=risk,
confidence=1.0,
scaffold=scaffold,
),
)

if self.adapter is None:
Expand Down Expand Up @@ -101,6 +109,13 @@ async def answer(
reason=f"selected driver {self.backend} failed to answer: {result.error}",
question=question,
),
metadata=_answer_metadata(
backend=self.backend or "driver",
brake=self.brake,
risk="driver answer unavailable",
confidence=1.0,
scaffold=scaffold,
),
)
text = _clean_driver_text(result.value.content)
if not text:
Expand All @@ -112,15 +127,40 @@ async def answer(
reason=f"selected driver {self.backend} returned an empty answer",
question=question,
),
metadata=_answer_metadata(
backend=self.backend or "driver",
brake=self.brake,
risk="empty driver answer",
confidence=1.0,
scaffold=scaffold,
),
)
answer_risk = classify_driver_answer_text_risk(text)
if answer_risk and self.brake == AutoBrakeMode.ON:
reason = f"brake on: risky selected-driver response requires approval ({answer_risk})"
return AutoAnswer(
text=f"Cannot send selected-driver answer automatically without approval: {answer_risk}",
source=AutoAnswerSource.BLOCKER,
confidence=1.0,
blocker=AutoBlocker(reason=reason, question=question),
metadata=_answer_metadata(
backend=self.backend or "driver",
brake=self.brake,
risk=_combined_risk(risk, answer_risk),
confidence=1.0,
scaffold=scaffold,
answer_risk=answer_risk,
),
)

assumptions = list(scaffold.assumptions)
confidence = min(scaffold.confidence, 0.82)
if risk:
assumptions.append(f"brake off auto-sent risky driver answer: {risk}")
final_risk = _combined_risk(risk, answer_risk)
if final_risk:
assumptions.append(f"brake off auto-sent risky driver answer: {final_risk}")
confidence = min(confidence, 0.62)
tagged_text = _tag_driver_text(
text, backend=self.backend or "driver", brake=self.brake, risk=risk
text, backend=self.backend or "driver", brake=self.brake, risk=final_risk
)
return AutoAnswer(
text=tagged_text,
Expand All @@ -129,11 +169,20 @@ async def answer(
ledger_updates=_ledger_updates_for(
scaffold,
driver_text=tagged_text,
risk=risk,
risk=final_risk,
backend=self.backend or "driver",
answer_risk=answer_risk,
),
assumptions=assumptions,
non_goals=list(scaffold.non_goals),
metadata=_answer_metadata(
backend=self.backend or "driver",
brake=self.brake,
risk=final_risk,
confidence=confidence,
scaffold=scaffold,
answer_risk=answer_risk,
),
)

def apply(self, answer: AutoAnswer, ledger: SeedDraftLedger, *, question: str) -> None:
Expand Down Expand Up @@ -177,6 +226,62 @@ def classify_interview_answer_risk(question: str, scaffold: AutoAnswer | None =
return None


def classify_driver_answer_text_risk(text: str) -> str | None:
"""Return a risk label for risky selected-driver output text."""
lowered = text.lower()
if _contains_real_secret(text):
return "actual answer contains secret or credential"
if re.search(
r"\b(password|passphrase|api[_ -]?key|access[_ -]?token|token|secret|credential)s?\b"
r"\s*(=|:)\s*['\"]?[^\s'\"<>]+",
text,
flags=re.IGNORECASE,
):
return "actual answer contains secret or credential"
if re.search(
r"\b[A-Z][A-Z0-9_]*(?:API[_]?KEY|ACCESS[_]?KEY|SECRET|TOKEN|PASSWORD|CREDENTIAL)\b"
r"\s*=\s*['\"]?[^\s'\"<>]+",
text,
):
return "actual answer contains secret or credential"
if re.search(
r"\b[A-Za-z][A-Za-z0-9+.-]*://[^\s/@:]+:[^\s/@]+@[^\s]+",
text,
):
return "actual answer contains secret or credential"
if re.search(r"\bBearer\s+[A-Za-z0-9._~+/-]{20,}={0,2}\b", text):
return "actual answer contains secret or credential"
if re.search(r"\beyJ[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\b", text):
return "actual answer contains secret or credential"
destructive_action = (
r"\b(delete|destroy|drop|truncate|wipe|erase|purge|deprovision|terminate)\b"
)
production_target = r"\b(production|prod|live|billing|database|db|credentials?)\b"
if re.search(destructive_action, lowered) and re.search(production_target, lowered):
return "actual answer recommends destructive production action"
if re.search(r"\brm\s+-rf\s+/(?:\s|$)", text):
return "actual answer recommends destructive production action"
return None


def _contains_real_secret(text: str) -> bool:
secret_patterns = (
r"\bAKIA[0-9A-Z]{16}\b",
r"\bASIA[0-9A-Z]{16}\b",
r"\bgh[pousr]_[A-Za-z0-9_]{30,}\b",
r"\bgithub_pat_[A-Za-z0-9_]{40,}\b",
r"\bsk-[A-Za-z0-9_-]{20,}\b",
r"\bxox[baprs]-[A-Za-z0-9-]{20,}\b",
)
return any(re.search(pattern, text) for pattern in secret_patterns)


def _combined_risk(pre_response_risk: str | None, answer_risk: str | None) -> str | None:
if pre_response_risk and answer_risk:
return f"{pre_response_risk}; {answer_risk}"
return pre_response_risk or answer_risk


def _driver_prompt(
question: str,
ledger: SeedDraftLedger,
Expand Down Expand Up @@ -229,8 +334,37 @@ def _tag_driver_text(text: str, *, backend: str, brake: AutoBrakeMode, risk: str
return f"[{' ; '.join(tags)}] {text}"


def _answer_metadata(
*,
backend: str,
brake: AutoBrakeMode,
risk: str | None,
confidence: float,
scaffold: AutoAnswer,
answer_risk: str | None = None,
) -> AutoAnswerMetadata:
"""Build structured selected-driver provenance for downstream audit surfaces."""
provenance = [
f"driver:{backend}",
f"brake:{brake.value}",
f"scaffold_source:{scaffold.source.value}",
]
if answer_risk:
provenance.append(f"answer_risk:{answer_risk}")
return AutoAnswerMetadata(
risk=risk,
confidence=max(0.0, min(1.0, float(confidence))),
provenance=tuple(provenance),
)


def _ledger_updates_for(
scaffold: AutoAnswer, *, driver_text: str, risk: str | None, backend: str
scaffold: AutoAnswer,
*,
driver_text: str,
risk: str | None,
backend: str,
answer_risk: str | None = None,
) -> list[tuple[str, LedgerEntry]]:
updates = [
(
Expand Down Expand Up @@ -263,6 +397,11 @@ def _ledger_updates_for(
confidence=0.6,
status=LedgerStatus.INFERRED,
rationale="Risk was preserved as provenance for Seed-ready and A-grade review gates.",
evidence=(
[f"driver:{backend}", f"answer_risk:{answer_risk}"]
if answer_risk
else [f"driver:{backend}"]
),
),
)
)
Expand Down
Loading