Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ If you want help feel free to reach out to us at:

![](docs/gif/demo.gif)


![](docs/images/nist.png)



## Roadmap

- [ ] Full Application Pen Testing
Expand Down
18 changes: 16 additions & 2 deletions cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
from rich.progress import (
Progress, SpinnerColumn, TextColumn, TimeElapsedColumn
)
from typing import Dict, Any, Optional, Callable
from typing import Dict, Any, Callable
from rich import box
from core.runner import execute_prompt_tests, execute_rerun_test
from core.config_manager.cli_adapter import CLIConfigAdapter
from core.analytics.tracker import analytics_tracker


def dict_to_cli_table(
Expand Down Expand Up @@ -120,6 +121,9 @@ def test(config_path, prompt, strategy, provider, output, report, parallel, verb
cli_adapter = CLIConfigAdapter()

try:
# Track test start
analytics_tracker.track_test_start(strategy or "manual", provider or "unknown")

# Load configuration from CLI arguments
cli_adapter.load_from_cli(
config_path=config_path,
Expand Down Expand Up @@ -165,8 +169,9 @@ def test(config_path, prompt, strategy, provider, output, report, parallel, verb
click.echo(f"Error processing configuration: {e}", err=True)
sys.exit(1)

# Run the tests with a progress indicator
# Run the tests with a progress indicator and track metrics
console.print("\nRunning tests...")

with Progress(
SpinnerColumn(),
TextColumn("[bold blue]{task.description}"),
Expand All @@ -176,6 +181,9 @@ def test(config_path, prompt, strategy, provider, output, report, parallel, verb
report_data = execute_prompt_tests(config_dict=runner_config)
progress.update(task, completed=True)

# Track test completion
analytics_tracker.track_test_end(report_data['metadata'].get('success_count', 0) > 0)

console.print("[bold green]Tests completed successfully![/]")
console.print(f"[bold cyan]Report saved successfully at {report_data['report_metadata']['path']}[/]")
console.print("\n")
Expand All @@ -192,6 +200,12 @@ def test(config_path, prompt, strategy, provider, output, report, parallel, verb
attack_success_style = ("red" if attack_success_rate >= 50 else
"yellow" if attack_success_rate >= 20 else "green")

# Track additional metrics
analytics_tracker.track_prompt_length(
metadata.get('prompt', ''),
metadata.get('prompt_type', 'system')
)

# Create a dictionary with all the report metrics
report_metrics = {
"Total Tests": test_count,
Expand Down
110 changes: 110 additions & 0 deletions core/analytics/tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import os
import time
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.metrics import set_meter_provider, get_meter_provider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader, ConsoleMetricExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.exporter.prometheus import PrometheusMetricReader
from prometheus_client import start_http_server

# Start Prometheus scrape server (default on port 9090)
# start_http_server(port=9090)

# Initialize OpenTelemetry metrics
meter_provider = MeterProvider(
metric_readers=[
# Console for local debugging
PeriodicExportingMetricReader(ConsoleMetricExporter()),

# # OTLP Exporter for production
PeriodicExportingMetricReader(
OTLPMetricExporter(
endpoint=os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317")
)
),
# PrometheusMetricReader()
]
)

set_meter_provider(meter_provider)
meter = get_meter_provider().get_meter("compliant-llm")

class AnalyticsTracker:
"""Class for tracking analytics and metrics."""

def __init__(self):
self.total_tests = meter.create_counter(
"compliant_llm.total_tests",
description="Total number of tests executed",
unit="1",
)
self.success_count = meter.create_counter(
"compliant_llm.success_count",
description="Number of successful attacks",
unit="1",
)
self.test_duration = meter.create_histogram(
"compliant_llm.test_duration",
description="Duration of each test in seconds",
unit="s",
)
self.strategy_usage = meter.create_counter(
"compliant_llm.strategy_usage",
description="Usage count per attack strategy",
unit="1",
)
self.provider_usage = meter.create_counter(
"compliant_llm.provider_usage",
description="Usage count per LLM provider",
unit="1",
)
self.prompt_length_hist = meter.create_histogram(
"compliant_llm.prompt_length",
description="Length of prompts in characters",
unit="chars",
)
self.api_response_time = meter.create_histogram(
"compliant_llm.api_response_time",
description="API response time in seconds",
unit="s",
)
self.errors = meter.create_counter(
"compliant_llm.errors",
description="Number of errors encountered",
unit="1",
)
self.nist_compliance = meter.create_counter(
"compliant_llm.nist_compliance",
description="NIST compliance status",
unit="1",
)

def track_test_start(self, strategy_name: str, provider_name: str):
self.total_tests.add(1)
self.strategy_usage.add(1, {"strategy": strategy_name})
self.provider_usage.add(1, {"provider": provider_name})
self.start_time = time.time()

def track_test_end(self, success: bool):
duration = time.time() - self.start_time
self.test_duration.record(duration)
if success:
self.success_count.add(1)

def track_prompt_length(self, prompt: str, prompt_type: str):
self.prompt_length_hist.record(len(prompt), {"type": prompt_type})

def track_api_response(self, response_time: float, tokens: int, provider: str):
self.api_response_time.record(response_time, {"provider": provider})

def track_error(self, error_type: str, error_message: str):
self.errors.add(1, {"type": error_type, "message": error_message[:100]})

def track_nist_compliance(self, is_compliant: bool, category: str):
self.nist_compliance.add(1, {
"category": category,
"status": "pass" if is_compliant else "fail"
})

# Initialize global tracker
analytics_tracker = AnalyticsTracker()
Binary file added docs/images/controls_tested.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/nist.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
6 changes: 5 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ dependencies = [
"aiofiles",
"plotly",
"psutil",
"markdown"
"markdown",
"opentelemetry-api",
"opentelemetry-sdk",
"opentelemetry-exporter-otlp",
"opentelemetry-instrumentation"
]

[project.scripts]
Expand Down
4 changes: 4 additions & 0 deletions requirements-lock.txt
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,7 @@ tzdata==2025.2
urllib3==2.4.0
yarl==1.20.0
zipp==3.21.0
opentelemetry-api
opentelemetry-sdk
opentelemetry-exporter-otlp
opentelemetry-instrumentation
6 changes: 5 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,8 @@ pytest
aiofiles
plotly
psutil
markdown
markdown
opentelemetry-api
opentelemetry-sdk
opentelemetry-exporter-otlp
opentelemetry-instrumentation