Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/ouroboros/auto/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
before starting execution.
"""

from ouroboros.auto.answerer import AutoAnswer, AutoAnswerer, AutoAnswerSource
from ouroboros.auto.answerer import AutoAnswer, AutoAnswerer, AutoAnswerMetadata, AutoAnswerSource
from ouroboros.auto.grading import GradeGate, GradeResult, SeedGrade
from ouroboros.auto.interview_driver import AutoInterviewDriver, AutoInterviewResult, InterviewTurn
from ouroboros.auto.ledger import LedgerEntry, LedgerSection, SeedDraftLedger
Expand All @@ -17,6 +17,7 @@

__all__ = [
"AutoAnswer",
"AutoAnswerMetadata",
"AutoAnswerSource",
"AutoAnswerer",
"AutoInterviewDriver",
Expand Down
10 changes: 10 additions & 0 deletions src/ouroboros/auto/answerer.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ class AutoBlocker:
question: str


@dataclass(frozen=True, slots=True)
class AutoAnswerMetadata:
"""Structured provenance for auto answers that need audit context."""

risk: str | None = None
confidence: float | None = None
provenance: tuple[str, ...] = ()


@dataclass(frozen=True, slots=True)
class AutoAnswer:
"""Answer plus structured ledger updates."""
Expand All @@ -76,6 +85,7 @@ class AutoAnswer:
assumptions: list[str] = field(default_factory=list)
non_goals: list[str] = field(default_factory=list)
blocker: AutoBlocker | None = None
metadata: AutoAnswerMetadata = field(default_factory=AutoAnswerMetadata)

@property
def prefixed_text(self) -> str:
Expand Down
149 changes: 144 additions & 5 deletions src/ouroboros/auto/driver_answerer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
AutoAnswer,
AutoAnswerContext,
AutoAnswerer,
AutoAnswerMetadata,
AutoAnswerSource,
AutoBlocker,
)
Expand Down Expand Up @@ -66,6 +67,13 @@ async def answer(
source=AutoAnswerSource.BLOCKER,
confidence=1.0,
blocker=AutoBlocker(reason=reason, question=question),
metadata=_answer_metadata(
backend=self.backend or "driver",
brake=self.brake,
risk=risk,
confidence=1.0,
scaffold=scaffold,
),
)

if self.adapter is None:
Expand Down Expand Up @@ -101,6 +109,13 @@ async def answer(
reason=f"selected driver {self.backend} failed to answer: {result.error}",
question=question,
),
metadata=_answer_metadata(
backend=self.backend or "driver",
brake=self.brake,
risk="driver answer unavailable",
confidence=1.0,
scaffold=scaffold,
),
)
text = _clean_driver_text(result.value.content)
if not text:
Expand All @@ -112,15 +127,40 @@ async def answer(
reason=f"selected driver {self.backend} returned an empty answer",
question=question,
),
metadata=_answer_metadata(
backend=self.backend or "driver",
brake=self.brake,
risk="empty driver answer",
confidence=1.0,
scaffold=scaffold,
),
)
answer_risk = classify_driver_answer_text_risk(text)
if answer_risk and self.brake == AutoBrakeMode.ON:
reason = f"brake on: risky selected-driver response requires approval ({answer_risk})"
return AutoAnswer(
text=f"Cannot send selected-driver answer automatically without approval: {answer_risk}",
source=AutoAnswerSource.BLOCKER,
confidence=1.0,
blocker=AutoBlocker(reason=reason, question=question),
metadata=_answer_metadata(
backend=self.backend or "driver",
brake=self.brake,
risk=_combined_risk(risk, answer_risk),
confidence=1.0,
scaffold=scaffold,
answer_risk=answer_risk,
),
)

assumptions = list(scaffold.assumptions)
confidence = min(scaffold.confidence, 0.82)
if risk:
assumptions.append(f"brake off auto-sent risky driver answer: {risk}")
final_risk = _combined_risk(risk, answer_risk)
if final_risk:
assumptions.append(f"brake off auto-sent risky driver answer: {final_risk}")
confidence = min(confidence, 0.62)
tagged_text = _tag_driver_text(
text, backend=self.backend or "driver", brake=self.brake, risk=risk
text, backend=self.backend or "driver", brake=self.brake, risk=final_risk
)
return AutoAnswer(
text=tagged_text,
Expand All @@ -129,11 +169,20 @@ async def answer(
ledger_updates=_ledger_updates_for(
scaffold,
driver_text=tagged_text,
risk=risk,
risk=final_risk,
backend=self.backend or "driver",
answer_risk=answer_risk,
),
assumptions=assumptions,
non_goals=list(scaffold.non_goals),
metadata=_answer_metadata(
backend=self.backend or "driver",
brake=self.brake,
risk=final_risk,
confidence=confidence,
scaffold=scaffold,
answer_risk=answer_risk,
),
)

def apply(self, answer: AutoAnswer, ledger: SeedDraftLedger, *, question: str) -> None:
Expand Down Expand Up @@ -177,6 +226,62 @@ def classify_interview_answer_risk(question: str, scaffold: AutoAnswer | None =
return None


def classify_driver_answer_text_risk(text: str) -> str | None:
"""Return a risk label for risky selected-driver output text."""
lowered = text.lower()
if _contains_real_secret(text):
return "actual answer contains secret or credential"
if re.search(
r"\b(password|passphrase|api[_ -]?key|access[_ -]?token|token|secret|credential)s?\b"
r"\s*(=|:)\s*['\"]?[^\s'\"<>]{12,}",
text,
flags=re.IGNORECASE,
):
return "actual answer contains secret or credential"
if re.search(
r"\b[A-Z][A-Z0-9_]*(?:API[_]?KEY|ACCESS[_]?KEY|SECRET|TOKEN|PASSWORD|CREDENTIAL)\b"
r"\s*=\s*['\"]?[^\s'\"<>]{12,}",
text,
):
return "actual answer contains secret or credential"
if re.search(
r"\b[A-Za-z][A-Za-z0-9+.-]*://[^\s/@:]+:[^\s/@]{8,}@[^\s]+",
text,
):
return "actual answer contains secret or credential"
if re.search(r"\bBearer\s+[A-Za-z0-9._~+/-]{20,}={0,2}\b", text):
return "actual answer contains secret or credential"
if re.search(r"\beyJ[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\b", text):
return "actual answer contains secret or credential"
destructive_action = (
r"\b(delete|destroy|drop|truncate|wipe|erase|purge|deprovision|terminate)\b"
)
production_target = r"\b(production|prod|live|billing|database|db|credentials?)\b"
if re.search(destructive_action, lowered) and re.search(production_target, lowered):
return "actual answer recommends destructive production action"
if re.search(r"\brm\s+-rf\s+/(?:\s|$)", text):
return "actual answer recommends destructive production action"
return None


def _contains_real_secret(text: str) -> bool:
secret_patterns = (
r"\bAKIA[0-9A-Z]{16}\b",
r"\bASIA[0-9A-Z]{16}\b",
r"\bgh[pousr]_[A-Za-z0-9_]{30,}\b",
r"\bgithub_pat_[A-Za-z0-9_]{40,}\b",
r"\bsk-[A-Za-z0-9_-]{20,}\b",
r"\bxox[baprs]-[A-Za-z0-9-]{20,}\b",
)
return any(re.search(pattern, text) for pattern in secret_patterns)


def _combined_risk(pre_response_risk: str | None, answer_risk: str | None) -> str | None:
if pre_response_risk and answer_risk:
return f"{pre_response_risk}; {answer_risk}"
return pre_response_risk or answer_risk


def _driver_prompt(
question: str,
ledger: SeedDraftLedger,
Expand Down Expand Up @@ -229,8 +334,37 @@ def _tag_driver_text(text: str, *, backend: str, brake: AutoBrakeMode, risk: str
return f"[{' ; '.join(tags)}] {text}"


def _answer_metadata(
*,
backend: str,
brake: AutoBrakeMode,
risk: str | None,
confidence: float,
scaffold: AutoAnswer,
answer_risk: str | None = None,
) -> AutoAnswerMetadata:
"""Build structured selected-driver provenance for downstream audit surfaces."""
provenance = [
f"driver:{backend}",
f"brake:{brake.value}",
f"scaffold_source:{scaffold.source.value}",
]
if answer_risk:
provenance.append(f"answer_risk:{answer_risk}")
return AutoAnswerMetadata(
risk=risk,
confidence=max(0.0, min(1.0, float(confidence))),
provenance=tuple(provenance),
)


def _ledger_updates_for(
scaffold: AutoAnswer, *, driver_text: str, risk: str | None, backend: str
scaffold: AutoAnswer,
*,
driver_text: str,
risk: str | None,
backend: str,
answer_risk: str | None = None,
) -> list[tuple[str, LedgerEntry]]:
updates = [
(
Expand Down Expand Up @@ -263,6 +397,11 @@ def _ledger_updates_for(
confidence=0.6,
status=LedgerStatus.INFERRED,
rationale="Risk was preserved as provenance for Seed-ready and A-grade review gates.",
evidence=(
[f"driver:{backend}", f"answer_risk:{answer_risk}"]
if answer_risk
else [f"driver:{backend}"]
),
),
)
)
Expand Down
Loading