diff --git a/README.md b/README.md index b91cdac..6681658 100644 --- a/README.md +++ b/README.md @@ -231,6 +231,54 @@ tscm sweep --kind all --client acme --site dc1 --room server_room Runs all enabled collectors (RF, Wi-Fi, BLE, GSM) in parallel where feasible. +### Baseline and Anomaly Detection + +Create a baseline from historical sweeps to detect anomalies: + +```bash +# Create baseline from past 30 days of sweeps +tscm baseline --client acme --site hq --output baseline.json + +# Compare a sweep to baseline (programmatically) +python -c " +from tscm.storage.store import SweepStore +from tscm.baseline import load_baseline, compare_to_baseline, store_anomalies + +store = SweepStore('~/.tscm/data/sweeps.db') +baseline = load_baseline('baseline.json') +anomalies = compare_to_baseline(store, sweep_id=1, baseline=baseline) +store_anomalies(store, sweep_id=1, anomalies=anomalies) +" +``` + +### Generating Reports + +Generate reports in multiple formats: + +```bash +# Text report (to console) +tscm report sweep_id_20241209_143000Z + +# Save as text file +tscm report sweep_id_20241209_143000Z --output report.txt --format text + +# Generate JSON report +tscm report sweep_id_20241209_143000Z --output report.json --format json + +# Generate HTML report +tscm report sweep_id_20241209_143000Z --output report.html --format html +``` + +### Web Dashboard + +Launch the interactive web dashboard: + +```bash +tscm dashboard +``` + +Then open http://127.0.0.1:5000 in your browser to view sweeps, events, and anomalies. + ### Viewing Results Results are stored in SQLite database (default: `~/.tscm/data/sweeps.db`). @@ -247,6 +295,9 @@ sweeps = store.get_sweeps(limit=10) # Get events from a sweep events = store.get_events(sweep_db_id=1, event_type="rf", limit=100) + +# Get anomalies +anomalies = store.get_anomalies(sweep_db_id=1, min_score=0.5) ``` --- @@ -323,14 +374,26 @@ SWEEPERZERO/ ├── src/tscm/ │ ├── cli.py # Typer CLI interface │ ├── config.py # Pydantic configuration models +│ ├── baseline.py # Baseline and anomaly detection +│ ├── report.py # Report generation │ ├── collectors/ │ │ ├── rf_parser.py # rtl_power CSV parser │ │ ├── hackrf.py # HackRF/RTL-SDR integration +│ │ ├── hackrf_native.py # Native HackRF parser +│ │ ├── wifi.py # Wi-Fi monitoring +│ │ ├── ble.py # BLE scanning +│ │ ├── gsm.py # GSM scanning │ │ └── orchestrator.py # Multi-collector orchestration -│ └── storage/ -│ ├── models.py # SQLAlchemy models -│ └── store.py # Storage API +│ ├── storage/ +│ │ ├── models.py # SQLAlchemy models +│ │ └── store.py # Storage API +│ ├── dashboard/ +│ │ └── app.py # Flask web dashboard +│ └── templates/ +│ └── sweep_report.html # HTML report template ├── tests/ +│ ├── test_rf_parser.py +│ └── test_rf_and_baseline.py ├── scripts/install.sh ├── deploy/ └── config.example.yaml @@ -340,16 +403,20 @@ SWEEPERZERO/ 1. CLI Command → Parse arguments, load config 2. Create Sweep → Initialize database record -3. Run Collectors → Execute tools +3. Run Collectors → Execute tools (RTL-SDR, HackRF, Wi-Fi, BLE, GSM) 4. Parse Output → Stream processing 5. Store Events → Bulk insert into SQLite 6. Update Sweep → Mark completion status +7. (Optional) Baseline → Compare to historical data +8. (Optional) Detect Anomalies → Flag suspicious signals +9. (Optional) Generate Report → Text, JSON, or HTML output ### Database Schema - **sweeps**: Session metadata (client, site, room, timestamps, GPS) - **events**: Individual RF/Wi-Fi/BLE/GSM observations - **artifacts**: References to raw capture files +- **anomalies**: Detected anomalies with scores and metadata --- @@ -376,9 +443,53 @@ pytest --cov=tscm 1. Create `src/tscm/collectors/your_collector.py` 2. Implement function: `run_your_sweep(config, store, sweep_db_id) -> bool` -3. Add to orchestrator -4. Write tests -5. Update config schema +3. Import and add to `orchestrator.py` +4. Write tests in `tests/` +5. Update config schema in `config.py` if needed + +Example collector structure: + +```python +def run_your_sweep(config: TSCMConfig, store: SweepStore, sweep_db_id: int) -> bool: + """Run your custom sweep.""" + if not config.your_config.enabled: + return False + + # 1. Run collection tool + # 2. Parse output + # 3. Store events: store.add_event(...) + # 4. Store artifacts: store.add_artifact(...) + + return True +``` + +### Creating Baselines + +Baselines detect anomalies by comparing new sweeps to historical data: + +```python +from tscm.baseline import create_baseline, compare_to_baseline + +# Create baseline from multiple sweeps +baseline = create_baseline(store, sweep_ids=[1, 2, 3], freq_bin_mhz=1.0) + +# Compare new sweep +anomalies = compare_to_baseline(store, sweep_id=4, baseline=baseline) +``` + +### Generating Custom Reports + +Extend the reporting system: + +```python +from tscm.report import generate_json_report + +# Get sweep data +data = generate_json_report(store, sweep_id=1) + +# Process and format as needed +# data contains: sweep, events, anomalies, artifacts +``` --- @@ -400,10 +511,11 @@ For non-root operation: ### Known Limitations -- Wi-Fi: Requires monitor mode +- Wi-Fi: Requires monitor mode-capable adapter - BLE: Ubertooth detection is probabilistic - GSM: Scanning may be restricted in some jurisdictions - RF: May miss narrow-band or frequency-hopping transmitters +- Baseline: Requires multiple historical sweeps for accuracy --- @@ -415,6 +527,12 @@ For non-root operation: sudo apt install rtl-sdr ``` +### "Flask not found" (for dashboard) + +```bash +pip install flask +``` + ### "Permission denied" accessing USB Add udev rules and user to plugdev group, then log out/in. diff --git a/pyproject.toml b/pyproject.toml index 0c1a754..e935aaa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,6 +32,7 @@ dependencies = [ "sqlalchemy>=2.0.0", "python-dotenv>=1.0.0", "rich>=13.0.0", + "flask>=2.3.0", ] [project.optional-dependencies] diff --git a/scripts/install.sh b/scripts/install.sh index 5d390a7..7865b7a 100755 --- a/scripts/install.sh +++ b/scripts/install.sh @@ -101,10 +101,16 @@ apt-get install -y \ echo "" echo "Installing Python dependencies..." pip3 install --upgrade pip -pip3 install -e /opt/tscm || { + +# Install package from current directory or /opt/tscm +if [ -f "pyproject.toml" ]; then + pip3 install -e . +elif [ -d "/opt/tscm" ]; then + pip3 install -e /opt/tscm +else echo -e "${YELLOW} Could not install tscm package.${NC}" echo " Run 'pip3 install -e .' from the SWEEPERZERO directory" -} +fi echo "" echo -e "${GREEN}Installation complete!${NC}" diff --git a/src/tscm/baseline.py b/src/tscm/baseline.py new file mode 100644 index 0000000..9c3d03d --- /dev/null +++ b/src/tscm/baseline.py @@ -0,0 +1,275 @@ +"""Baseline and anomaly detection module. + +Provides baseline creation and comparison for RF sweeps to detect anomalies. +""" + +import json +import statistics +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Dict, List, Optional + +from tscm.storage.store import SweepStore + + +def create_baseline( + store: SweepStore, + sweep_ids: List[int], + freq_bin_mhz: float = 1.0, + output_path: Optional[Path] = None, +) -> Dict: + """ + Create RF baseline from multiple sweeps. + + Aggregates RF events from multiple sweeps to establish normal + RF environment characteristics (mean and stddev per frequency). + + Args: + store: Storage instance + sweep_ids: List of sweep database IDs to include in baseline + freq_bin_mhz: Frequency binning resolution in MHz + output_path: Optional path to save baseline JSON + + Returns: + Baseline dictionary with frequency statistics + """ + print(f"Creating baseline from {len(sweep_ids)} sweeps...") + + freq_bins: Dict[float, List[float]] = {} + + # Collect RF events from all sweeps + for sweep_id in sweep_ids: + events = store.get_events(sweep_id, event_type="rf", limit=100000) + + for event in events: + if event["freq_hz"] is None or event["power_db"] is None: + continue + + # Bin frequency + freq_mhz = event["freq_hz"] / 1e6 + freq_bin = round(freq_mhz / freq_bin_mhz) * freq_bin_mhz + + if freq_bin not in freq_bins: + freq_bins[freq_bin] = [] + + freq_bins[freq_bin].append(event["power_db"]) + + # Calculate statistics per frequency bin + baseline = { + "created_at": datetime.now(timezone.utc).isoformat(), + "num_sweeps": len(sweep_ids), + "freq_bin_mhz": freq_bin_mhz, + "frequencies": {}, + } + + for freq_bin, power_values in freq_bins.items(): + if len(power_values) < 2: + continue # Need at least 2 samples for statistics + + baseline["frequencies"][freq_bin] = { + "mean_db": statistics.mean(power_values), + "stdev_db": statistics.stdev(power_values), + "min_db": min(power_values), + "max_db": max(power_values), + "count": len(power_values), + } + + print(f"Baseline created with {len(baseline['frequencies'])} frequency bins") + + # Save to file if requested + if output_path: + output_path.parent.mkdir(parents=True, exist_ok=True) + with open(output_path, "w") as f: + json.dump(baseline, f, indent=2) + print(f"Baseline saved to {output_path}") + + return baseline + + +def load_baseline(baseline_path: Path) -> Dict: + """ + Load baseline from JSON file. + + Args: + baseline_path: Path to baseline JSON file + + Returns: + Baseline dictionary + """ + with open(baseline_path) as f: + return json.load(f) + + +def compare_to_baseline( + store: SweepStore, + sweep_id: int, + baseline: Dict, + threshold_sigma: float = 3.0, + min_power_threshold_db: float = -80.0, +) -> List[Dict]: + """ + Compare sweep to baseline and detect anomalies. + + Identifies RF signals that deviate significantly from baseline. + + Args: + store: Storage instance + sweep_id: Database ID of sweep to compare + baseline: Baseline dictionary from create_baseline() + threshold_sigma: Number of standard deviations for anomaly threshold + min_power_threshold_db: Minimum power level to consider (filter noise) + + Returns: + List of anomaly records + """ + print(f"Comparing sweep {sweep_id} to baseline...") + + freq_bin_mhz = baseline.get("freq_bin_mhz", 1.0) + baseline_freqs = baseline.get("frequencies", {}) + + # Get RF events for this sweep + events = store.get_events(sweep_id, event_type="rf", limit=100000) + + anomalies = [] + + for event in events: + if event["freq_hz"] is None or event["power_db"] is None: + continue + + power_db = event["power_db"] + + # Filter out noise floor + if power_db < min_power_threshold_db: + continue + + # Bin frequency + freq_mhz = event["freq_hz"] / 1e6 + freq_bin = round(freq_mhz / freq_bin_mhz) * freq_bin_mhz + + # Check if we have baseline for this frequency + freq_key = freq_bin + if freq_key not in baseline_freqs: + # Unknown frequency - could be anomaly + anomalies.append({ + "freq_hz": event["freq_hz"], + "power_db": power_db, + "kind": "unknown_frequency", + "score": 0.5, # Medium confidence + "details": "No baseline data for this frequency", + "event_id": event["id"], + }) + continue + + baseline_data = baseline_freqs[freq_key] + mean_db = baseline_data["mean_db"] + stdev_db = baseline_data["stdev_db"] + + # Calculate deviation in standard deviations + if stdev_db > 0: + deviation = abs(power_db - mean_db) / stdev_db + + if deviation > threshold_sigma: + # Significant deviation from baseline + score = min(1.0, deviation / (threshold_sigma * 2)) # Normalize score + + anomalies.append({ + "freq_hz": event["freq_hz"], + "power_db": power_db, + "kind": "power_anomaly", + "score": score, + "details": f"Power {power_db:.1f} dB deviates {deviation:.1f}σ from baseline {mean_db:.1f}±{stdev_db:.1f} dB", + "event_id": event["id"], + "deviation_sigma": deviation, + }) + + print(f"Found {len(anomalies)} anomalies") + + return anomalies + + +def store_anomalies(store: SweepStore, sweep_id: int, anomalies: List[Dict]) -> int: + """ + Store detected anomalies in database. + + Args: + store: Storage instance + sweep_id: Database ID of sweep + anomalies: List of anomaly records from compare_to_baseline() + + Returns: + Number of anomalies stored + """ + count = 0 + + for anomaly in anomalies: + metadata = { + "freq_hz": anomaly.get("freq_hz"), + "power_db": anomaly.get("power_db"), + "details": anomaly.get("details"), + } + + # Add any extra fields + for key in ["deviation_sigma"]: + if key in anomaly: + metadata[key] = anomaly[key] + + store.insert_anomaly( + sweep_id=sweep_id, + event_id=anomaly.get("event_id"), + kind=anomaly["kind"], + score=anomaly["score"], + metadata=metadata, + ) + count += 1 + + return count + + +def get_baseline_sweeps( + store: SweepStore, + client_name: str, + site: Optional[str] = None, + room: Optional[str] = None, + days_back: int = 30, + min_sweeps: int = 3, +) -> List[int]: + """ + Get sweep IDs suitable for baseline creation. + + Args: + store: Storage instance + client_name: Client name to filter + site: Optional site name + room: Optional room name + days_back: Number of days to look back + min_sweeps: Minimum number of sweeps required + + Returns: + List of sweep database IDs + """ + sweeps = store.get_sweeps( + client_name=client_name, + site=site, + room=room, + limit=100, + ) + + # Filter by date + cutoff_date = datetime.now(timezone.utc) - timedelta(days=days_back) + recent_sweeps = [ + s for s in sweeps + if s["start_time"] and ( + s["start_time"].astimezone(timezone.utc) if s["start_time"].tzinfo else s["start_time"].replace(tzinfo=timezone.utc) + ) >= cutoff_date + and s["status"] == "completed" + ] + + sweep_ids = [s["id"] for s in recent_sweeps] + + if len(sweep_ids) < min_sweeps: + print( + f"Warning: Only {len(sweep_ids)} sweeps found, " + f"minimum {min_sweeps} recommended for baseline" + ) + + return sweep_ids diff --git a/src/tscm/cli.py b/src/tscm/cli.py index 26aedba..bbc181e 100644 --- a/src/tscm/cli.py +++ b/src/tscm/cli.py @@ -405,5 +405,205 @@ def sweep( raise typer.Exit(1) +@app.command() +def baseline( + client: str = typer.Option( + ..., + "--client", + help="Client name for baseline", + ), + site: Optional[str] = typer.Option( + None, + "--site", + help="Site name", + ), + room: Optional[str] = typer.Option( + None, + "--room", + help="Room name", + ), + output: Optional[Path] = typer.Option( + None, + "--output", + "-o", + help="Output file for baseline JSON", + ), + days_back: int = typer.Option( + 30, + "--days", + help="Number of days to look back for sweeps", + ), + config_path: Optional[Path] = typer.Option( + None, + "--config", + "-c", + help="Path to config file", + ), +): + """Create RF baseline from historical sweeps.""" + try: + config = load_config(config_path) + except Exception as e: + rprint(f"[red]Error loading config: {e}[/red]") + raise typer.Exit(1) + + try: + from tscm.baseline import create_baseline, get_baseline_sweeps + + store = SweepStore(config.storage.database_path, config.storage.enable_wal) + + rprint(f"[bold]Creating baseline for {client}[/bold]") + if site: + rprint(f"Site: {site}") + if room: + rprint(f"Room: {room}") + rprint() + + # Get suitable sweeps + sweep_ids = get_baseline_sweeps( + store, + client_name=client, + site=site, + room=room, + days_back=days_back, + min_sweeps=3, + ) + + if not sweep_ids: + rprint("[yellow]No completed sweeps found for baseline[/yellow]") + raise typer.Exit(1) + + rprint(f"Found {len(sweep_ids)} sweeps for baseline") + + # Create baseline + baseline = create_baseline( + store, + sweep_ids, + freq_bin_mhz=1.0, + output_path=output, + ) + + rprint(f"[green]✓[/green] Baseline created with {len(baseline['frequencies'])} frequency bins") + + if output: + rprint(f"[green]✓[/green] Baseline saved to {output}") + + except ImportError as e: + rprint(f"[red]Error importing baseline module: {e}[/red]") + raise typer.Exit(1) + except Exception as e: + rprint(f"[red]Error creating baseline: {e}[/red]") + raise typer.Exit(1) + + +@app.command() +def report( + sweep_id: str = typer.Argument(..., help="Sweep ID to generate report for"), + output: Optional[Path] = typer.Option( + None, + "--output", + "-o", + help="Output file path", + ), + format: str = typer.Option( + "text", + "--format", + "-f", + help="Report format: text, json, html", + ), + config_path: Optional[Path] = typer.Option( + None, + "--config", + "-c", + help="Path to config file", + ), +): + """Generate a report for a sweep.""" + try: + config = load_config(config_path) + except Exception as e: + rprint(f"[red]Error loading config: {e}[/red]") + raise typer.Exit(1) + + try: + from tscm.report import generate_html_report, generate_json_report, generate_text_report + + store = SweepStore(config.storage.database_path, config.storage.enable_wal) + + # Get sweep by ID + sweep = store.get_sweep_by_id(sweep_id) + if not sweep: + rprint(f"[red]Sweep {sweep_id} not found[/red]") + raise typer.Exit(1) + + rprint(f"[bold]Generating {format} report for {sweep_id}[/bold]\n") + + # Generate report based on format + if format == "text": + report_text = generate_text_report(store, sweep["id"], output) + if not output: + rprint(report_text) + elif format == "json": + generate_json_report(store, sweep["id"], output) + elif format == "html": + generate_html_report(store, sweep["id"], output) + else: + rprint(f"[red]Unknown format: {format}[/red]") + rprint("Available formats: text, json, html") + raise typer.Exit(1) + + if output: + rprint(f"\n[green]✓[/green] Report saved to {output}") + + except ImportError as e: + rprint(f"[red]Error importing report module: {e}[/red]") + raise typer.Exit(1) + except Exception as e: + rprint(f"[red]Error generating report: {e}[/red]") + raise typer.Exit(1) + + +@app.command() +def dashboard( + host: str = typer.Option( + "127.0.0.1", + "--host", + help="Host to bind to", + ), + port: int = typer.Option( + 5000, + "--port", + "-p", + help="Port to listen on", + ), + config_path: Optional[Path] = typer.Option( + None, + "--config", + "-c", + help="Path to config file", + ), +): + """Start the web dashboard.""" + try: + from tscm.dashboard.app import run_dashboard + + rprint("[bold]Starting TSCM Dashboard[/bold]") + rprint(f"URL: http://{host}:{port}") + rprint("\nPress Ctrl+C to stop") + rprint() + + run_dashboard(host=host, port=port, config_path=config_path) + + except ImportError as e: + rprint(f"[red]Error: {e}[/red]") + rprint("The dashboard requires Flask. Install with: pip install flask") + raise typer.Exit(1) + except KeyboardInterrupt: + rprint("\n[yellow]Dashboard stopped[/yellow]") + except Exception as e: + rprint(f"[red]Error starting dashboard: {e}[/red]") + raise typer.Exit(1) + + if __name__ == "__main__": app() diff --git a/src/tscm/collectors/ble.py b/src/tscm/collectors/ble.py new file mode 100644 index 0000000..e0be787 --- /dev/null +++ b/src/tscm/collectors/ble.py @@ -0,0 +1,288 @@ +"""BLE (Bluetooth Low Energy) scanning collector using Ubertooth or hcitool.""" + +import re +import shutil +import subprocess +import time +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional + +from tscm.config import TSCMConfig +from tscm.storage.store import SweepStore + + +def run_ble_sweep( + config: TSCMConfig, + store: SweepStore, + sweep_db_id: int, + output_dir: Optional[Path] = None, +) -> bool: + """ + Run BLE scanning sweep. + + Attempts to use Ubertooth first, falls back to hcitool if available. + + Args: + config: TSCM configuration + store: Storage instance + sweep_db_id: Database ID of sweep + output_dir: Optional directory to save capture logs + + Returns: + True if successful + """ + if not config.ble.enabled: + print("BLE collection is disabled in config") + return False + + interface = config.ble.interface + duration = config.durations.ble_duration + + print(f"BLE sweep using {interface} for {duration} seconds") + + # Try Ubertooth first + if interface.lower() == "ubertooth": + return _run_ubertooth_sweep(config, store, sweep_db_id, output_dir, duration) + else: + # Try hcitool on specified interface + return _run_hcitool_sweep(config, store, sweep_db_id, output_dir, duration, interface) + + +def _run_ubertooth_sweep( + config: TSCMConfig, + store: SweepStore, + sweep_db_id: int, + output_dir: Optional[Path], + duration: int, +) -> bool: + """Run BLE sweep using Ubertooth.""" + # Check if ubertooth-btle is available + if not shutil.which("ubertooth-btle"): + print("Error: ubertooth-btle not found. Install with: apt install ubertooth") + return False + + # Prepare output directory + if output_dir is None: + output_dir = Path("/tmp/tscm_ble") + output_dir.mkdir(parents=True, exist_ok=True) + + timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%SZ") + log_file = output_dir / f"ble_ubertooth_{timestamp}.log" + + try: + print("Starting Ubertooth BLE capture...") + # Run ubertooth-btle -f (follow connections) -s (sniff advertisements) + cmd = ["ubertooth-btle", "-f", "-s"] + + with open(log_file, "w") as log: + process = subprocess.Popen( + cmd, + stdout=log, + stderr=subprocess.STDOUT, + text=True, + ) + + # Wait for duration + time.sleep(duration) + + # Stop capture + process.terminate() + try: + process.wait(timeout=5) + except subprocess.TimeoutExpired: + process.kill() + + print("Ubertooth BLE capture completed") + + # Parse log file for BLE advertisements + events_stored = _parse_ubertooth_log(log_file, store, sweep_db_id) + print(f"Stored {events_stored} BLE events") + + # Add artifact + store.add_artifact( + sweep_db_id, + artifact_type="ble_log", + file_path=str(log_file), + file_size_bytes=log_file.stat().st_size, + description="BLE Ubertooth capture log", + ) + + return True + + except Exception as e: + print(f"Error during Ubertooth BLE sweep: {e}") + return False + + +def _run_hcitool_sweep( + config: TSCMConfig, + store: SweepStore, + sweep_db_id: int, + output_dir: Optional[Path], + duration: int, + interface: str, +) -> bool: + """Run BLE sweep using hcitool lescan.""" + # Check if hcitool is available + if not shutil.which("hcitool"): + print("Error: hcitool not found. Install with: apt install bluez") + return False + + # Prepare output directory + if output_dir is None: + output_dir = Path("/tmp/tscm_ble") + output_dir.mkdir(parents=True, exist_ok=True) + + timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%SZ") + log_file = output_dir / f"ble_hcitool_{timestamp}.log" + + try: + print(f"Starting hcitool BLE scan on {interface}...") + # Run hcitool lescan + cmd = ["hcitool", "-i", interface, "lescan"] + + with open(log_file, "w") as log: + process = subprocess.Popen( + cmd, + stdout=log, + stderr=subprocess.STDOUT, + text=True, + ) + + # Wait for duration + time.sleep(duration) + + # Stop scan + process.terminate() + try: + process.wait(timeout=5) + except subprocess.TimeoutExpired: + process.kill() + + print("hcitool BLE scan completed") + + # Parse log file + events_stored = _parse_hcitool_log(log_file, store, sweep_db_id) + print(f"Stored {events_stored} BLE events") + + # Add artifact + store.add_artifact( + sweep_db_id, + artifact_type="ble_log", + file_path=str(log_file), + file_size_bytes=log_file.stat().st_size, + description=f"BLE hcitool scan log {interface}", + ) + + return True + + except Exception as e: + print(f"Error during hcitool BLE sweep: {e}") + return False + + +def _parse_ubertooth_log(log_path: Path, store: SweepStore, sweep_db_id: int) -> int: + """ + Parse Ubertooth log for BLE advertisements. + + Look for lines containing MAC addresses and advertisement data. + + Args: + log_path: Path to Ubertooth log file + store: Storage instance + sweep_db_id: Database ID of sweep + + Returns: + Number of events stored + """ + events_count = 0 + seen_macs = set() + + try: + with open(log_path, "r") as f: + for line in f: + # Look for MAC addresses in format XX:XX:XX:XX:XX:XX + mac_matches = re.findall(r"([0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2})", line) + + for mac in mac_matches: + mac_upper = mac.upper() + if mac_upper not in seen_macs: + seen_macs.add(mac_upper) + + # Extract RSSI if present (format: RSSI: -XX dBm) + rssi = None + rssi_match = re.search(r"RSSI[:\s]+(-?\d+)", line, re.IGNORECASE) + if rssi_match: + rssi = float(rssi_match.group(1)) + + # Store BLE event + store.add_event( + sweep_db_id=sweep_db_id, + event_type="ble", + timestamp=datetime.now(timezone.utc), + mac_address=mac_upper, + signal_strength=rssi, + ) + events_count += 1 + + except Exception as e: + print(f"Error parsing Ubertooth log: {e}") + + return events_count + + +def _parse_hcitool_log(log_path: Path, store: SweepStore, sweep_db_id: int) -> int: + """ + Parse hcitool lescan log. + + Format: XX:XX:XX:XX:XX:XX Device Name + + Args: + log_path: Path to hcitool log file + store: Storage instance + sweep_db_id: Database ID of sweep + + Returns: + Number of events stored + """ + events_count = 0 + seen_macs = set() + + try: + with open(log_path, "r") as f: + for line in f: + line = line.strip() + if not line or line.startswith("LE Scan"): + continue + + # Parse format: XX:XX:XX:XX:XX:XX (Device Name) + match = re.match( + r"([0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2})\s*(.*)", + line, + ) + if match: + mac = match.group(1).upper() + device_name = match.group(2).strip() if match.group(2) else None + + if mac not in seen_macs: + seen_macs.add(mac) + + # Store BLE event + metadata = {} + if device_name: + metadata["device_name"] = device_name + + store.add_event( + sweep_db_id=sweep_db_id, + event_type="ble", + timestamp=datetime.now(timezone.utc), + mac_address=mac, + metadata=metadata if metadata else None, + ) + events_count += 1 + + except Exception as e: + print(f"Error parsing hcitool log: {e}") + + return events_count diff --git a/src/tscm/collectors/gsm.py b/src/tscm/collectors/gsm.py new file mode 100644 index 0000000..1af98cb --- /dev/null +++ b/src/tscm/collectors/gsm.py @@ -0,0 +1,188 @@ +"""GSM scanning collector using gr-gsm.""" + +import re +import shutil +import subprocess +import time +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional + +from tscm.config import TSCMConfig +from tscm.storage.store import SweepStore + + +def run_gsm_sweep( + config: TSCMConfig, + store: SweepStore, + sweep_db_id: int, + output_dir: Optional[Path] = None, +) -> bool: + """ + Run GSM scanning sweep using gr-gsm. + + Scans configured GSM bands and detects cell towers. Can identify + rogue base stations by comparing against allowed MCC-MNC pairs. + + Args: + config: TSCM configuration + store: Storage instance + sweep_db_id: Database ID of sweep + output_dir: Optional directory to save scan logs + + Returns: + True if successful + """ + if not config.gsm.enabled: + print("GSM collection is disabled in config") + return False + + device_string = config.gsm.device_string + bands = config.gsm.bands + duration = config.durations.gsm_duration + + print(f"GSM sweep on device {device_string}, bands {bands}") + print(f"Duration: {duration} seconds") + + # Check if grgsm_scanner is available + if not shutil.which("grgsm_scanner"): + print("Error: grgsm_scanner not found") + print("Install gr-gsm from: https://github.com/ptrkrysik/gr-gsm") + return False + + # Prepare output directory + if output_dir is None: + output_dir = Path("/tmp/tscm_gsm") + output_dir.mkdir(parents=True, exist_ok=True) + + timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%SZ") + log_file = output_dir / f"gsm_scan_{timestamp}.log" + + try: + print("Starting GSM scan...") + # Run grgsm_scanner + # Format: grgsm_scanner -b BAND -d DEVICE + cmd = [ + "grgsm_scanner", + "-b", + bands, + "-d", + device_string, + ] + + with open(log_file, "w") as log: + process = subprocess.Popen( + cmd, + stdout=log, + stderr=subprocess.STDOUT, + text=True, + ) + + # Wait for duration + time.sleep(duration) + + # Stop scan + process.terminate() + try: + process.wait(timeout=5) + except subprocess.TimeoutExpired: + process.kill() + + print("GSM scan completed") + + # Parse log file + events_stored = _parse_grgsm_log(log_file, store, sweep_db_id, config) + print(f"Stored {events_stored} GSM events") + + # Add artifact + store.add_artifact( + sweep_db_id, + artifact_type="gsm_log", + file_path=str(log_file), + file_size_bytes=log_file.stat().st_size, + description=f"GSM scan log bands {bands}", + ) + + return True + + except Exception as e: + print(f"Error during GSM sweep: {e}") + return False + + +def _parse_grgsm_log( + log_path: Path, store: SweepStore, sweep_db_id: int, config: TSCMConfig +) -> int: + """ + Parse grgsm_scanner log output. + + Typical format includes: + ARFCN: XXX, Freq: XXX.X MHz, CID: XXXXX, LAC: XXXXX, MCC: XXX, MNC: XX, PWR: -XX dBm + + Args: + log_path: Path to gr-gsm log file + store: Storage instance + sweep_db_id: Database ID of sweep + config: TSCM configuration (for allowed MCC-MNC checking) + + Returns: + Number of events stored + """ + events_count = 0 + allowed_mcc_mnc = set(config.gsm.allowed_mcc_mnc) + + try: + with open(log_path, "r") as f: + for line in f: + # Parse GSM cell information + # Look for patterns like: + # ARFCN: 123, Freq: 935.4 MHz, CID: 12345, LAC: 1234, MCC: 655, MNC: 01, PWR: -75 + + arfcn_match = re.search(r"ARFCN[:\s]+(\d+)", line, re.IGNORECASE) + cid_match = re.search(r"CID[:\s]+(\d+)", line, re.IGNORECASE) + lac_match = re.search(r"LAC[:\s]+(\d+)", line, re.IGNORECASE) + mcc_match = re.search(r"MCC[:\s]+(\d+)", line, re.IGNORECASE) + mnc_match = re.search(r"MNC[:\s]+(\d+)", line, re.IGNORECASE) + power_match = re.search(r"PWR[:\s]+(-?\d+)", line, re.IGNORECASE) + + # Need at least ARFCN and one of MCC/MNC to be useful + if arfcn_match and (mcc_match or mnc_match): + arfcn = int(arfcn_match.group(1)) + mcc = int(mcc_match.group(1)) if mcc_match else None + mnc = int(mnc_match.group(1)) if mnc_match else None + lac = int(lac_match.group(1)) if lac_match else None + cid = int(cid_match.group(1)) if cid_match else None + power = float(power_match.group(1)) if power_match else None + + # Check if this is a potentially rogue cell + is_rogue = False + if mcc is not None and mnc is not None and allowed_mcc_mnc: + mcc_mnc_str = f"{mcc}-{mnc:02d}" + if mcc_mnc_str not in allowed_mcc_mnc: + is_rogue = True + print(f"⚠ Potential rogue cell detected: MCC-MNC {mcc_mnc_str}") + + # Prepare metadata + metadata = {} + if is_rogue: + metadata["is_rogue"] = True + + # Store GSM event + store.add_event( + sweep_db_id=sweep_db_id, + event_type="gsm", + timestamp=datetime.now(timezone.utc), + mcc=mcc, + mnc=mnc, + lac=lac, + cid=cid, + arfcn=arfcn, + signal_strength=power, + metadata=metadata if metadata else None, + ) + events_count += 1 + + except Exception as e: + print(f"Error parsing GSM log: {e}") + + return events_count diff --git a/src/tscm/collectors/hackrf_native.py b/src/tscm/collectors/hackrf_native.py new file mode 100644 index 0000000..05db294 --- /dev/null +++ b/src/tscm/collectors/hackrf_native.py @@ -0,0 +1,168 @@ +"""Native HackRF sweep collector with custom parser. + +hackrf_sweep output format: +date, time, hz_low, hz_high, hz_bin_width, num_samples, dB, dB, dB, ... + +Example: +2024-12-09, 14:30:00, 2400000000, 2500000000, 1000000, 100, -45.2, -46.3, -44.1, ... +""" + +import signal +import subprocess +import time +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional + +from tscm.collectors.rf_parser import RTLPowerParser +from tscm.config import TSCMConfig +from tscm.storage.store import SweepStore + + +def run_hackrf_native_sweep( + config: TSCMConfig, + store: SweepStore, + sweep_db_id: int, + output_dir: Optional[Path] = None, +) -> bool: + """ + Run native hackrf_sweep and parse results. + + Args: + config: TSCM configuration + store: Storage instance + sweep_db_id: Database ID of sweep + output_dir: Optional directory to save raw CSV + + Returns: + True if successful + """ + if not config.hackrf.enabled: + print("HackRF is disabled in config") + return False + + print("Running HackRF native sweeps...") + + all_success = True + for band in config.hackrf.bands: + print(f"\nBand: {band.label} ({band.freq_start_mhz}-{band.freq_end_mhz} MHz)") + + freq_start_hz = int(band.freq_start_mhz * 1e6) + freq_end_hz = int(band.freq_end_mhz * 1e6) + step_hz = int(band.step_mhz * 1e6) + duration_sec = config.durations.rf_duration + + # Build hackrf_sweep command + # hackrf_sweep -f start:end -w bin_width -N num_sweeps + # We'll use -1 for continuous and use timeout for duration + cmd = [ + "hackrf_sweep", + "-f", + f"{freq_start_hz//1000000}:{freq_end_hz//1000000}", # MHz format + "-w", + str(step_hz), + ] + + print(f"Running: {' '.join(cmd)}") + print(f"Duration: {duration_sec} seconds") + + # Create output file if requested + output_file = None + if output_dir: + output_dir.mkdir(parents=True, exist_ok=True) + timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%SZ") + output_file = output_dir / f"hackrf_sweep_{band.label}_{timestamp}.csv" + + try: + # Run hackrf_sweep with timeout + process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=1, + ) + + # Use RTLPowerParser (hackrf_sweep has similar format) + parser = RTLPowerParser(strict=False) + events_batch = [] + batch_size = 100 + + file_handle = None + if output_file: + file_handle = open(output_file, "w") + + try: + start_time = time.time() + + # Process output line by line with timeout + for line in process.stdout: + # Check timeout + if time.time() - start_time > duration_sec: + process.send_signal(signal.SIGINT) + break + + # Save to file if requested + if file_handle: + file_handle.write(line) + + # Parse line (similar to rtl_power format) + event = parser.parse_line(line) + if event is None: + continue + + # Convert to database events + for freq_hz, power_db in event.freq_bins: + events_batch.append({ + "event_type": "rf", + "timestamp": event.timestamp, + "freq_hz": freq_hz, + "power_db": power_db, + "bandwidth_hz": event.freq_step_hz, + }) + + # Bulk insert when batch is full + if len(events_batch) >= batch_size: + store.add_events_bulk(sweep_db_id, events_batch) + print(f"Stored {len(events_batch)} RF events") + events_batch = [] + + finally: + if file_handle: + file_handle.close() + + # Terminate process + try: + process.terminate() + process.wait(timeout=5) + except subprocess.TimeoutExpired: + process.kill() + + # Store remaining events + if events_batch: + store.add_events_bulk(sweep_db_id, events_batch) + print(f"Stored {len(events_batch)} RF events") + + # Add artifact record + if output_file and output_file.exists(): + store.add_artifact( + sweep_db_id, + artifact_type="hackrf_sweep_csv", + file_path=str(output_file), + file_size_bytes=output_file.stat().st_size, + description=f"HackRF sweep {band.label} {freq_start_hz/1e6:.0f}-{freq_end_hz/1e6:.0f} MHz", + ) + + print( + f"HackRF band {band.label} completed. " + f"Parsed {parser.lines_parsed} lines, skipped {parser.lines_skipped}" + ) + + except FileNotFoundError: + print("Error: hackrf_sweep not found. Install with: apt install hackrf") + all_success = False + except Exception as e: + print(f"Error during HackRF sweep: {e}") + all_success = False + + return all_success diff --git a/src/tscm/collectors/orchestrator.py b/src/tscm/collectors/orchestrator.py index ce37645..5b58957 100644 --- a/src/tscm/collectors/orchestrator.py +++ b/src/tscm/collectors/orchestrator.py @@ -7,114 +7,6 @@ from tscm.storage.store import SweepStore -def run_wifi_sweep(config: TSCMConfig, store: SweepStore, sweep_db_id: int) -> bool: - """ - Run Wi-Fi sweep (stub implementation). - - TODO: Implement Wi-Fi capture using aircrack-ng suite. - - Use airmon-ng to enable monitor mode - - Use airodump-ng to capture Wi-Fi packets - - Parse output and store events - - Disable monitor mode after capture - - Args: - config: TSCM configuration - store: Storage instance - sweep_db_id: Database ID of sweep - - Returns: - True if successful - """ - if not config.wifi.enabled: - print("Wi-Fi collection is disabled in config") - return False - - print("Wi-Fi sweep: TODO - Not yet implemented") - print(f" Would capture on interface: {config.wifi.interface}") - print(f" Duration: {config.durations.wifi_duration} seconds") - - # TODO: Implement Wi-Fi capture - # 1. Check if airmon-ng and airodump-ng are available - # 2. Enable monitor mode on the interface - # 3. Run airodump-ng for the specified duration - # 4. Parse CSV output and store Wi-Fi events (AP, clients, etc.) - # 5. Disable monitor mode - # 6. Store artifact reference to pcap file - - return True - - -def run_ble_sweep(config: TSCMConfig, store: SweepStore, sweep_db_id: int) -> bool: - """ - Run BLE sweep (stub implementation). - - TODO: Implement BLE capture using Ubertooth. - - Use ubertooth-btle to capture BLE advertisements - - Parse output and extract MAC addresses - - Store BLE events - - Args: - config: TSCM configuration - store: Storage instance - sweep_db_id: Database ID of sweep - - Returns: - True if successful - """ - if not config.ble.enabled: - print("BLE collection is disabled in config") - return False - - print("BLE sweep: TODO - Not yet implemented") - print(f" Would use device: {config.ble.interface}") - print(f" Duration: {config.durations.ble_duration} seconds") - - # TODO: Implement BLE capture - # 1. Check if ubertooth-btle or hcitool is available - # 2. Run ubertooth-btle -f -s for specified duration - # 3. Parse output to extract BLE advertisements and MAC addresses - # 4. Store BLE events with MAC, RSSI, advertisement data - # 5. Store artifact reference to raw log file - - return True - - -def run_gsm_sweep(config: TSCMConfig, store: SweepStore, sweep_db_id: int) -> bool: - """ - Run GSM sweep (stub implementation). - - TODO: Implement GSM scanning using gr-gsm. - - Use grgsm_scanner to scan GSM bands - - Parse cell tower information - - Store GSM events with MCC, MNC, LAC, CID, ARFCN - - Args: - config: TSCM configuration - store: Storage instance - sweep_db_id: Database ID of sweep - - Returns: - True if successful - """ - if not config.gsm.enabled: - print("GSM collection is disabled in config") - return False - - print("GSM sweep: TODO - Not yet implemented") - print(f" Would use device: {config.gsm.device_string}") - print(f" Bands: {config.gsm.bands}") - print(f" Duration: {config.durations.gsm_duration} seconds") - - # TODO: Implement GSM scanning - # 1. Check if grgsm_scanner is available - # 2. Run grgsm_scanner with configured bands - # 3. Parse output to extract cell information - # 4. Store GSM events with MCC, MNC, LAC, CID, ARFCN, power - # 5. Store artifact reference to raw scanner output - - return True - - def run_all_sweeps(config: TSCMConfig, store: SweepStore, sweep_db_id: int) -> bool: """ Orchestrate all enabled sweep types. @@ -132,6 +24,9 @@ def run_all_sweeps(config: TSCMConfig, store: SweepStore, sweep_db_id: int) -> b True if all sweeps completed successfully """ from tscm.collectors.hackrf import run_hackrf_sweep + from tscm.collectors.wifi import run_wifi_sweep + from tscm.collectors.ble import run_ble_sweep + from tscm.collectors.gsm import run_gsm_sweep # Determine which sweeps to run sweep_tasks: List[tuple] = [] diff --git a/src/tscm/collectors/wifi.py b/src/tscm/collectors/wifi.py new file mode 100644 index 0000000..270acff --- /dev/null +++ b/src/tscm/collectors/wifi.py @@ -0,0 +1,257 @@ +"""Wi-Fi monitoring collector using aircrack-ng suite.""" + +import re +import shutil +import subprocess +import time +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional + +from tscm.config import TSCMConfig +from tscm.storage.store import SweepStore + + +def run_wifi_sweep( + config: TSCMConfig, + store: SweepStore, + sweep_db_id: int, + output_dir: Optional[Path] = None, +) -> bool: + """ + Run Wi-Fi monitoring sweep using aircrack-ng. + + Uses airmon-ng to enable monitor mode and airodump-ng to capture + Wi-Fi packets. Parses CSV output to extract AP and client information. + + Args: + config: TSCM configuration + store: Storage instance + sweep_db_id: Database ID of sweep + output_dir: Optional directory to save capture files + + Returns: + True if successful + """ + if not config.wifi.enabled: + print("Wi-Fi collection is disabled in config") + return False + + interface = config.wifi.interface + duration = config.durations.wifi_duration + + print(f"Wi-Fi sweep on interface {interface} for {duration} seconds") + + # Check if tools are available + if not shutil.which("airmon-ng"): + print("Error: airmon-ng not found. Please install the aircrack-ng suite for your operating system.") + return False + + # Prepare output directory + if output_dir is None: + output_dir = Path("/tmp/tscm_wifi") + output_dir.mkdir(parents=True, exist_ok=True) + + timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%SZ") + capture_prefix = output_dir / f"wifi_capture_{timestamp}" + + monitor_interface = None + + try: + # Enable monitor mode + print(f"Enabling monitor mode on {interface}...") + result = subprocess.run( + ["airmon-ng", "start", interface], + capture_output=True, + text=True, + timeout=10, + ) + + # Parse output to find monitor interface name (e.g., wlan0mon) + for line in result.stdout.split("\n"): + if "monitor mode" in line.lower() and "enabled" in line.lower(): + # Extract interface name (usually ends with 'mon') + match = re.search(r"(\w+mon)", line) + if match: + monitor_interface = match.group(1) + break + + if not monitor_interface: + # Try common pattern + monitor_interface = f"{interface}mon" + + print(f"Monitor interface: {monitor_interface}") + + # Run airodump-ng + print(f"Capturing Wi-Fi traffic for {duration} seconds...") + cmd = [ + "airodump-ng", + monitor_interface, + "-w", + str(capture_prefix), + "--output-format", + "csv", + ] + + process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + + # Wait for duration + time.sleep(duration) + + # Stop capture + process.terminate() + try: + process.wait(timeout=5) + except subprocess.TimeoutExpired: + process.kill() + + print("Wi-Fi capture completed") + + # Parse CSV output + csv_file = Path(f"{capture_prefix}-01.csv") + if csv_file.exists(): + events_stored = _parse_airodump_csv(csv_file, store, sweep_db_id) + print(f"Stored {events_stored} Wi-Fi events") + + # Add artifact + store.add_artifact( + sweep_db_id, + artifact_type="wifi_csv", + file_path=str(csv_file), + file_size_bytes=csv_file.stat().st_size, + description=f"Wi-Fi capture CSV {interface}", + ) + + # Also track pcap if exists + pcap_file = Path(f"{capture_prefix}-01.cap") + if pcap_file.exists(): + store.add_artifact( + sweep_db_id, + artifact_type="wifi_pcap", + file_path=str(pcap_file), + file_size_bytes=pcap_file.stat().st_size, + description=f"Wi-Fi packet capture {interface}", + ) + + return True + + except FileNotFoundError as e: + print(f"Error: Required tool not found: {e}") + return False + except Exception as e: + print(f"Error during Wi-Fi sweep: {e}") + return False + finally: + # Disable monitor mode + if monitor_interface: + print(f"Disabling monitor mode on {monitor_interface}...") + try: + subprocess.run( + ["airmon-ng", "stop", monitor_interface], + capture_output=True, + timeout=10, + ) + except Exception as e: + print(f"Warning: Could not disable monitor mode: {e}") + + +def _parse_airodump_csv(csv_path: Path, store: SweepStore, sweep_db_id: int) -> int: + """ + Parse airodump-ng CSV output. + + Format: + - First section: Access Points (BSSID, First time seen, Last time seen, channel, Speed, Privacy, Cipher, Authentication, Power, # beacons, # IV, LAN IP, ID-length, ESSID, Key) + - Second section: Stations (Station MAC, First time seen, Last time seen, Power, # packets, BSSID, Probed ESSIDs) + + Args: + csv_path: Path to airodump CSV file + store: Storage instance + sweep_db_id: Database ID of sweep + + Returns: + Number of events stored + """ + events_count = 0 + + try: + with open(csv_path, "r", encoding="utf-8", errors="ignore") as f: + content = f.read() + + # Split into AP and Station sections + sections = content.split("\r\n\r\n") + + # Parse APs (first section) + if sections: + ap_lines = sections[0].split("\n") + # Find header and data rows + for i, line in enumerate(ap_lines): + if line.strip().startswith("BSSID"): + # Parse AP entries + for ap_line in ap_lines[i + 1 :]: + if not ap_line.strip(): + continue + try: + parts = [p.strip() for p in ap_line.split(",")] + if len(parts) >= 14: + bssid = parts[0] + power = parts[8] + essid = parts[13] + + # Convert power to float + power_db = float(power) if power and power != "-1" else None + + # Store event + store.add_event( + sweep_db_id=sweep_db_id, + event_type="wifi", + timestamp=datetime.now(timezone.utc), + mac_address=bssid, + ssid=essid if essid else None, + signal_strength=power_db, + ) + events_count += 1 + except (ValueError, IndexError): + continue + break + + # Parse Stations (second section if exists) + if len(sections) > 1: + station_lines = sections[1].split("\n") + for i, line in enumerate(station_lines): + if "Station MAC" in line: + # Parse station entries + for station_line in station_lines[i + 1 :]: + if not station_line.strip(): + continue + try: + parts = [p.strip() for p in station_line.split(",")] + if len(parts) >= 6: + station_mac = parts[0] + power = parts[3] + bssid = parts[5] + + power_db = float(power) if power and power != "-1" else None + + # Store station event + store.add_event( + sweep_db_id=sweep_db_id, + event_type="wifi", + timestamp=datetime.now(timezone.utc), + mac_address=station_mac, + signal_strength=power_db, + metadata={"associated_bssid": bssid}, + ) + events_count += 1 + except (ValueError, IndexError): + continue + break + + except Exception as e: + print(f"Error parsing airodump CSV: {e}") + + return events_count diff --git a/src/tscm/dashboard/app.py b/src/tscm/dashboard/app.py new file mode 100644 index 0000000..5fe02bb --- /dev/null +++ b/src/tscm/dashboard/app.py @@ -0,0 +1,271 @@ +"""Web dashboard for TSCM sweep visualization. + +Simple Flask-based dashboard for viewing sweep results. +""" + +from pathlib import Path +from typing import Optional + +from tscm.config import load_config +from tscm.storage.store import SweepStore + + +def create_app(config_path: Optional[Path] = None): + """ + Create Flask application. + + Args: + config_path: Optional path to config file + + Returns: + Flask application instance + """ + try: + from flask import Flask, jsonify, render_template, request + except ImportError: + raise ImportError("Flask is required for dashboard. Install with: pip install flask") + + app = Flask(__name__, template_folder=str(Path(__file__).parent.parent / "templates")) + + # Load TSCM config + config = load_config(config_path) + store = SweepStore(config.storage.database_path, config.storage.enable_wal) + + @app.route("/") + def index(): + """Dashboard home page.""" + return render_template("dashboard.html") + + @app.route("/api/sweeps") + def get_sweeps(): + """API endpoint to list sweeps.""" + client = request.args.get("client") + site = request.args.get("site") + room = request.args.get("room") + try: + limit = int(request.args.get("limit", 50)) + limit = min(max(limit, 1), 1000) # Clamp between 1 and 1000 + except (ValueError, TypeError): + limit = 50 + + sweeps = store.get_sweeps( + client_name=client, + site=site, + room=room, + limit=limit, + ) + + return jsonify(sweeps) + + @app.route("/api/sweeps/") + def get_sweep(sweep_id): + """API endpoint to get sweep details.""" + sweep = store.get_sweep_by_id(str(sweep_id)) + if not sweep: + return jsonify({"error": "Sweep not found"}), 404 + + # Get events summary + events_summary = {} + for event_type in ["rf", "wifi", "ble", "gsm"]: + events = store.get_events(sweep["id"], event_type=event_type, limit=1000) + events_summary[event_type] = { + "count": len(events), + "events": events[:100], # Limit for performance + } + + # Get anomalies + anomalies = store.get_anomalies(sweep["id"]) + + # Get artifacts + artifacts = store.get_artifacts(sweep["id"]) + + return jsonify({ + "sweep": sweep, + "events": events_summary, + "anomalies": anomalies, + "artifacts": artifacts, + }) + + @app.route("/api/anomalies/") + def get_anomalies(sweep_id): + """API endpoint to get anomalies for a sweep.""" + # Validate and clamp min_score parameter + try: + min_score = float(request.args.get("min_score", 0.0)) + min_score = max(0.0, min(min_score, 1.0)) # Clamp between 0.0 and 1.0 + except (ValueError, TypeError): + min_score = 0.0 + + kind = request.args.get("kind") + + anomalies = store.get_anomalies(sweep_id, kind=kind, min_score=min_score) + return jsonify(anomalies) + + return app + + +def run_dashboard(host: str = "127.0.0.1", port: int = 5000, config_path: Optional[Path] = None): + """ + Run the dashboard web server. + + Args: + host: Host to bind to + port: Port to listen on + config_path: Optional path to config file + """ + app = create_app(config_path) + print(f"Starting TSCM dashboard on http://{host}:{port}") + app.run(host=host, port=port, debug=True) + + +def _get_default_dashboard_template() -> str: + """Get default dashboard HTML template.""" + return """ + + + TSCM Dashboard + + + + + +
+

🔍 SWEEPERZERO Dashboard

+

Technical Surveillance Counter-Measures Monitoring

+
+ +
+
+

Recent Sweeps

+
Loading sweeps...
+
+
+ + + +""" + + +if __name__ == "__main__": + run_dashboard() diff --git a/src/tscm/report.py b/src/tscm/report.py new file mode 100644 index 0000000..349cd3e --- /dev/null +++ b/src/tscm/report.py @@ -0,0 +1,290 @@ +"""Report generation module for sweep results.""" + +import json +from datetime import datetime +from pathlib import Path +from typing import Dict, Optional + +from tscm.storage.store import SweepStore + + +def generate_text_report( + store: SweepStore, + sweep_id: int, + output_path: Optional[Path] = None, +) -> str: + """ + Generate a text report for a sweep. + + Args: + store: Storage instance + sweep_id: Database ID of sweep + output_path: Optional path to save report + + Returns: + Report text + """ + # Get sweep details + sweep = store.get_sweep_by_id(str(sweep_id)) + if not sweep: + raise ValueError(f"Sweep {sweep_id} not found") + + # Build report + lines = [] + lines.append("=" * 80) + lines.append("TSCM SWEEP REPORT") + lines.append("=" * 80) + lines.append("") + lines.append(f"Sweep ID: {sweep['sweep_id']}") + lines.append(f"Client: {sweep['client_name']}") + if sweep["site"]: + lines.append(f"Site: {sweep['site']}") + if sweep["room"]: + lines.append(f"Room: {sweep['room']}") + lines.append(f"Status: {sweep['status']}") + lines.append(f"Start Time: {sweep['start_time']}") + if sweep["end_time"]: + lines.append(f"End Time: {sweep['end_time']}") + if sweep["gps_lat"] and sweep["gps_lon"]: + lines.append(f"GPS: {sweep['gps_lat']:.6f}, {sweep['gps_lon']:.6f}") + lines.append("") + + # Get events summary + lines.append("-" * 80) + lines.append("EVENTS SUMMARY") + lines.append("-" * 80) + + event_types = ["rf", "wifi", "ble", "gsm"] + for event_type in event_types: + events = store.get_events(sweep["id"], event_type=event_type, limit=10000) + lines.append(f"{event_type.upper()}: {len(events)} events") + + if event_type == "rf" and events: + # RF statistics + powers = [e["power_db"] for e in events if e["power_db"] is not None] + if powers: + lines.append(f" Power range: {min(powers):.1f} to {max(powers):.1f} dB") + lines.append(f" Mean power: {sum(powers)/len(powers):.1f} dB") + + elif event_type == "wifi" and events: + # Wi-Fi statistics + unique_bssids = set(e["mac_address"] for e in events if e["mac_address"]) + lines.append(f" Unique BSSIDs: {len(unique_bssids)}") + ssids = [e["ssid"] for e in events if e["ssid"]] + if ssids: + lines.append(f" SSIDs detected: {len(set(ssids))}") + + elif event_type == "ble" and events: + # BLE statistics + unique_macs = set(e["mac_address"] for e in events if e["mac_address"]) + lines.append(f" Unique BLE devices: {len(unique_macs)}") + + elif event_type == "gsm" and events: + # GSM statistics + unique_cells = set( + (e["mcc"], e["mnc"], e["lac"], e["cid"]) + for e in events + if e["mcc"] is not None + ) + lines.append(f" Unique cells: {len(unique_cells)}") + + lines.append("") + + # Get anomalies + anomalies = store.get_anomalies(sweep["id"], min_score=0.3) + lines.append("-" * 80) + lines.append(f"ANOMALIES ({len(anomalies)})") + lines.append("-" * 80) + + if anomalies: + for anomaly in anomalies[:20]: # Top 20 + lines.append(f"[{anomaly['score']:.2f}] {anomaly['kind']}") + if anomaly["metadata"]: + details = anomaly["metadata"].get("details", "") + if details: + lines.append(f" {details}") + else: + lines.append("No significant anomalies detected") + + lines.append("") + + # Get artifacts + artifacts = store.get_artifacts(sweep["id"]) + lines.append("-" * 80) + lines.append(f"ARTIFACTS ({len(artifacts)})") + lines.append("-" * 80) + + for artifact in artifacts: + size_mb = artifact["file_size_bytes"] / (1024 * 1024) if artifact["file_size_bytes"] else 0 + lines.append( + f"{artifact['artifact_type']}: {artifact['file_path']} ({size_mb:.2f} MB)" + ) + + lines.append("") + lines.append("=" * 80) + lines.append(f"Report generated: {datetime.utcnow().isoformat()}") + lines.append("=" * 80) + + report_text = "\n".join(lines) + + # Save to file if requested + if output_path: + output_path.parent.mkdir(parents=True, exist_ok=True) + with open(output_path, "w") as f: + f.write(report_text) + print(f"Report saved to {output_path}") + + return report_text + + +def generate_json_report( + store: SweepStore, + sweep_id: int, + output_path: Optional[Path] = None, +) -> Dict: + """ + Generate a JSON report for a sweep. + + Args: + store: Storage instance + sweep_id: Database ID of sweep + output_path: Optional path to save report + + Returns: + Report dictionary + """ + # Get sweep details + sweep = store.get_sweep_by_id(str(sweep_id)) + if not sweep: + raise ValueError(f"Sweep {sweep_id} not found") + + # Build report + report = { + "sweep": sweep, + "events": {}, + "anomalies": store.get_anomalies(sweep["id"]), + "artifacts": store.get_artifacts(sweep["id"]), + "generated_at": datetime.utcnow().isoformat(), + } + + # Get events by type + event_types = ["rf", "wifi", "ble", "gsm"] + for event_type in event_types: + events = store.get_events(sweep["id"], event_type=event_type, limit=1000) + report["events"][event_type] = events + + # Save to file if requested + if output_path: + output_path.parent.mkdir(parents=True, exist_ok=True) + with open(output_path, "w") as f: + json.dump(report, f, indent=2, default=str) + print(f"JSON report saved to {output_path}") + + return report + + +def generate_html_report( + store: SweepStore, + sweep_id: int, + output_path: Optional[Path] = None, + template_path: Optional[Path] = None, +) -> str: + """ + Generate an HTML report for a sweep. + + Args: + store: Storage instance + sweep_id: Database ID of sweep + output_path: Optional path to save report + template_path: Optional path to HTML template + + Returns: + HTML report string + """ + # Get data + json_data = generate_json_report(store, sweep_id) + + # Load template + if template_path and template_path.exists(): + with open(template_path) as f: + template = f.read() + else: + # Use default template + template = _get_default_html_template() + + # Simple template substitution + html = template.replace("{{SWEEP_DATA}}", json.dumps(json_data, default=str)) + + # Save to file if requested + if output_path: + output_path.parent.mkdir(parents=True, exist_ok=True) + with open(output_path, "w") as f: + f.write(html) + print(f"HTML report saved to {output_path}") + + return html + + +def _get_default_html_template() -> str: + """Get default HTML template.""" + return """ + + + TSCM Sweep Report + + + +

TSCM Sweep Report

+
+

Loading report data...

+
+ + +""" diff --git a/src/tscm/storage/models.py b/src/tscm/storage/models.py index 7b0b573..1b3c65e 100644 --- a/src/tscm/storage/models.py +++ b/src/tscm/storage/models.py @@ -124,6 +124,28 @@ class Artifact(Base): __table_args__ = (Index("idx_artifact_sweep_type", "sweep_id", "artifact_type"),) +class Anomaly(Base): + """Represents a detected anomaly during a sweep.""" + + __tablename__ = "anomalies" + + id = Column(Integer, primary_key=True, autoincrement=True) + sweep_id = Column(Integer, ForeignKey("sweeps.id", ondelete="CASCADE"), nullable=False) + event_ref = Column(Integer, nullable=True) # Optional reference to specific event ID + score = Column(Float, nullable=False) # Anomaly score (0.0 - 1.0) + kind = Column(String(50), nullable=False, index=True) # Type: freq_anomaly, rogue_ap, unknown_ble, etc. + anomaly_metadata = Column(Text, nullable=True) # JSON string for additional details (renamed from metadata to avoid conflict) + created_at = Column(DateTime, default=utcnow, nullable=False) + + # Relationships + sweep = relationship("Sweep") + + __table_args__ = ( + Index("idx_anomaly_sweep_kind", "sweep_id", "kind"), + Index("idx_anomaly_score", "score"), + ) + + def create_engine_with_wal(database_url: str, enable_wal: bool = True, echo: bool = False): """ Create SQLAlchemy engine with WAL mode enabled. diff --git a/src/tscm/storage/store.py b/src/tscm/storage/store.py index 24b2a75..339de67 100644 --- a/src/tscm/storage/store.py +++ b/src/tscm/storage/store.py @@ -8,7 +8,7 @@ from sqlalchemy import desc, select from sqlalchemy.orm import sessionmaker -from .models import Artifact, Event, Sweep, create_engine_with_wal, init_db +from .models import Anomaly, Artifact, Event, Sweep, create_engine_with_wal, init_db class SweepStore: @@ -344,3 +344,76 @@ def get_artifacts(self, sweep_db_id: int) -> List[Dict]: } for a in artifacts ] + + def insert_anomaly( + self, + sweep_id: int, + event_id: Optional[int], + kind: str, + score: float, + metadata: Optional[Dict] = None, + ) -> int: + """ + Insert an anomaly record. + + Args: + sweep_id: Database ID of sweep + event_id: Optional reference to specific event ID + kind: Type of anomaly (freq_anomaly, rogue_ap, unknown_ble, etc.) + score: Anomaly score (0.0 - 1.0) + metadata: Additional metadata as dict + + Returns: + Database ID of created anomaly + """ + with self.SessionLocal() as session: + anomaly = Anomaly( + sweep_id=sweep_id, + event_ref=event_id, + kind=kind, + score=score, + anomaly_metadata=json.dumps(metadata) if metadata else None, + ) + session.add(anomaly) + session.commit() + session.refresh(anomaly) + return anomaly.id + + def get_anomalies( + self, sweep_db_id: int, kind: Optional[str] = None, min_score: float = 0.0 + ) -> List[Dict]: + """ + Get anomalies for a sweep. + + Args: + sweep_db_id: Database ID of sweep + kind: Optional filter by anomaly kind + min_score: Minimum anomaly score to return + + Returns: + List of anomaly records + """ + with self.SessionLocal() as session: + stmt = ( + select(Anomaly) + .where(Anomaly.sweep_id == sweep_db_id) + .where(Anomaly.score >= min_score) + .order_by(desc(Anomaly.score)) + ) + + if kind: + stmt = stmt.where(Anomaly.kind == kind) + + anomalies = session.scalars(stmt).all() + + return [ + { + "id": a.id, + "event_ref": a.event_ref, + "kind": a.kind, + "score": a.score, + "metadata": json.loads(a.anomaly_metadata) if a.anomaly_metadata else None, + "created_at": a.created_at, + } + for a in anomalies + ] diff --git a/src/tscm/templates/dashboard.html b/src/tscm/templates/dashboard.html new file mode 100644 index 0000000..66044e6 --- /dev/null +++ b/src/tscm/templates/dashboard.html @@ -0,0 +1,154 @@ + + + + TSCM Dashboard + + + + + +
+

🔍 SWEEPERZERO Dashboard

+

Technical Surveillance Counter-Measures Monitoring

+
+ +
+
+

Recent Sweeps

+
Loading sweeps...
+
+
+ + + + diff --git a/src/tscm/templates/sweep_report.html b/src/tscm/templates/sweep_report.html new file mode 100644 index 0000000..6e97b30 --- /dev/null +++ b/src/tscm/templates/sweep_report.html @@ -0,0 +1,418 @@ + + + + + + TSCM Sweep Report + + + +
+

🔍 TSCM Sweep Report

+

Technical Surveillance Counter-Measures Analysis

+
+ +
+ +
+

Sweep Information

+
Loading...
+
+ + +
+

Event Statistics

+
Loading...
+
+ + +
+

Detected Anomalies

+
Loading...
+
+ + +
+

Capture Artifacts

+
Loading...
+
+ + +
+ + + + diff --git a/tests/test_rf_and_baseline.py b/tests/test_rf_and_baseline.py new file mode 100644 index 0000000..1d626ad --- /dev/null +++ b/tests/test_rf_and_baseline.py @@ -0,0 +1,368 @@ +"""Tests for RF collection and baseline detection.""" + +import tempfile +from datetime import datetime, timezone +from pathlib import Path + +import pytest + +from tscm.baseline import ( + compare_to_baseline, + create_baseline, + get_baseline_sweeps, + store_anomalies, +) +from tscm.storage.store import SweepStore + + +@pytest.fixture +def temp_db(): + """Create a temporary database for testing.""" + with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: + db_path = f.name + + store = SweepStore(db_path, enable_wal=True) + yield store + + # Cleanup + Path(db_path).unlink(missing_ok=True) + + +@pytest.fixture +def sample_sweep(temp_db): + """Create a sample sweep with RF events.""" + sweep_id = temp_db.create_sweep( + sweep_id="test_sweep_001", + client_name="test_client", + site="test_site", + room="test_room", + ) + + # Add some RF events + events = [] + base_time = datetime.now(timezone.utc) + + # Normal background at 100 MHz: -50 dB + for i in range(10): + events.append({ + "event_type": "rf", + "timestamp": base_time, + "freq_hz": 100e6, + "power_db": -50.0 + (i - 5) * 0.5, # -52.5 to -47.5 dB + }) + + # Normal background at 200 MHz: -55 dB + for i in range(10): + events.append({ + "event_type": "rf", + "timestamp": base_time, + "freq_hz": 200e6, + "power_db": -55.0 + (i - 5) * 0.3, + }) + + # Anomalous signal at 433 MHz: -30 dB (strong) + events.append({ + "event_type": "rf", + "timestamp": base_time, + "freq_hz": 433e6, + "power_db": -30.0, + }) + + temp_db.add_events_bulk(sweep_id, events) + + yield sweep_id, temp_db + + +class TestRFStorage: + """Test RF event storage.""" + + def test_create_sweep(self, temp_db): + """Test creating a sweep.""" + sweep_id = temp_db.create_sweep( + sweep_id="test_001", + client_name="acme_corp", + site="hq", + room="boardroom", + ) + + assert sweep_id > 0 + + # Retrieve sweep + sweep = temp_db.get_sweep_by_id("test_001") + assert sweep is not None + assert sweep["client_name"] == "acme_corp" + assert sweep["site"] == "hq" + assert sweep["room"] == "boardroom" + + def test_add_rf_events(self, temp_db): + """Test adding RF events.""" + sweep_id = temp_db.create_sweep( + sweep_id="test_002", + client_name="test", + ) + + # Add single event + event_id = temp_db.add_event( + sweep_db_id=sweep_id, + event_type="rf", + timestamp=datetime.now(timezone.utc), + freq_hz=100e6, + power_db=-45.5, + ) + + assert event_id > 0 + + # Retrieve events + events = temp_db.get_events(sweep_id, event_type="rf") + assert len(events) == 1 + assert events[0]["freq_hz"] == 100e6 + assert events[0]["power_db"] == -45.5 + + def test_add_events_bulk(self, temp_db): + """Test bulk event insertion.""" + sweep_id = temp_db.create_sweep( + sweep_id="test_003", + client_name="test", + ) + + events = [ + { + "event_type": "rf", + "timestamp": datetime.now(timezone.utc), + "freq_hz": 100e6 + i * 1e6, + "power_db": -50.0 - i, + } + for i in range(100) + ] + + count = temp_db.add_events_bulk(sweep_id, events) + assert count == 100 + + # Retrieve events + stored_events = temp_db.get_events(sweep_id, event_type="rf", limit=200) + assert len(stored_events) == 100 + + +class TestAnomalyStorage: + """Test anomaly storage.""" + + def test_insert_anomaly(self, temp_db): + """Test inserting an anomaly.""" + sweep_id = temp_db.create_sweep( + sweep_id="test_004", + client_name="test", + ) + + anomaly_id = temp_db.insert_anomaly( + sweep_id=sweep_id, + event_id=None, + kind="freq_anomaly", + score=0.85, + metadata={"details": "Unusual signal detected"}, + ) + + assert anomaly_id > 0 + + # Retrieve anomalies + anomalies = temp_db.get_anomalies(sweep_id) + assert len(anomalies) == 1 + assert anomalies[0]["kind"] == "freq_anomaly" + assert anomalies[0]["score"] == 0.85 + assert anomalies[0]["metadata"]["details"] == "Unusual signal detected" + + def test_get_anomalies_filtered(self, temp_db): + """Test filtering anomalies.""" + sweep_id = temp_db.create_sweep( + sweep_id="test_005", + client_name="test", + ) + + # Add multiple anomalies + temp_db.insert_anomaly(sweep_id, None, "type_a", 0.9) + temp_db.insert_anomaly(sweep_id, None, "type_b", 0.7) + temp_db.insert_anomaly(sweep_id, None, "type_a", 0.5) + temp_db.insert_anomaly(sweep_id, None, "type_c", 0.3) + + # Filter by kind + type_a_anomalies = temp_db.get_anomalies(sweep_id, kind="type_a") + assert len(type_a_anomalies) == 2 + + # Filter by score + high_score = temp_db.get_anomalies(sweep_id, min_score=0.6) + assert len(high_score) == 2 + + +class TestBaseline: + """Test baseline creation and comparison.""" + + def test_create_baseline(self, sample_sweep): + """Test baseline creation from sweeps.""" + sweep_id, store = sample_sweep + + baseline = create_baseline(store, [sweep_id], freq_bin_mhz=1.0) + + assert "frequencies" in baseline + assert baseline["num_sweeps"] == 1 + assert baseline["freq_bin_mhz"] == 1.0 + + # Check that we have statistics for known frequencies + freqs = baseline["frequencies"] + assert "100.0" in freqs or 100.0 in freqs + assert "200.0" in freqs or 200.0 in freqs + + def test_baseline_statistics(self, sample_sweep): + """Test baseline statistics calculation.""" + sweep_id, store = sample_sweep + + baseline = create_baseline(store, [sweep_id], freq_bin_mhz=1.0) + + # Check 100 MHz baseline (should have mean around -50 dB) + freq_100 = baseline["frequencies"].get("100.0") or baseline["frequencies"].get(100.0) + assert freq_100 is not None + assert -52 < freq_100["mean_db"] < -48 + assert freq_100["stdev_db"] > 0 + assert freq_100["count"] == 10 + + def test_compare_to_baseline_no_anomalies(self, sample_sweep): + """Test comparison with no anomalies.""" + sweep_id, store = sample_sweep + + # Create baseline from same sweep + baseline = create_baseline(store, [sweep_id], freq_bin_mhz=1.0) + + # Compare to itself (should find the anomalous 433 MHz signal) + anomalies = compare_to_baseline( + store, + sweep_id, + baseline, + threshold_sigma=3.0, + min_power_threshold_db=-80.0, + ) + + # Should detect the 433 MHz anomaly (unknown frequency) + assert len(anomalies) > 0 + freq_433_anomalies = [a for a in anomalies if a["freq_hz"] == 433e6] + assert len(freq_433_anomalies) > 0 + + def test_store_anomalies(self, sample_sweep): + """Test storing detected anomalies.""" + sweep_id, store = sample_sweep + + baseline = create_baseline(store, [sweep_id], freq_bin_mhz=1.0) + anomalies = compare_to_baseline(store, sweep_id, baseline) + + # Store anomalies + count = store_anomalies(store, sweep_id, anomalies) + assert count == len(anomalies) + + # Retrieve from database + stored = store.get_anomalies(sweep_id) + assert len(stored) == count + + def test_get_baseline_sweeps(self, temp_db): + """Test getting sweeps for baseline.""" + # Create multiple completed sweeps + sweep_ids = [] + for i in range(5): + sid = temp_db.create_sweep( + sweep_id=f"baseline_sweep_{i}", + client_name="baseline_client", + site="baseline_site", + ) + temp_db.update_sweep(sid, status="completed") + sweep_ids.append(sid) + + # Get baseline sweeps + result = get_baseline_sweeps( + temp_db, + client_name="baseline_client", + site="baseline_site", + days_back=30, + min_sweeps=3, + ) + + assert len(result) == 5 + + def test_baseline_save_load(self, sample_sweep, tmp_path): + """Test saving and loading baseline.""" + from tscm.baseline import load_baseline + + sweep_id, store = sample_sweep + + baseline_file = tmp_path / "baseline.json" + baseline = create_baseline( + store, [sweep_id], freq_bin_mhz=1.0, output_path=baseline_file + ) + + assert baseline_file.exists() + + # Load baseline + loaded = load_baseline(baseline_file) + assert loaded["num_sweeps"] == baseline["num_sweeps"] + assert loaded["freq_bin_mhz"] == baseline["freq_bin_mhz"] + assert len(loaded["frequencies"]) == len(baseline["frequencies"]) + + +class TestMultipleEventTypes: + """Test storage of different event types.""" + + def test_wifi_events(self, temp_db): + """Test Wi-Fi event storage.""" + sweep_id = temp_db.create_sweep( + sweep_id="wifi_test", + client_name="test", + ) + + temp_db.add_event( + sweep_db_id=sweep_id, + event_type="wifi", + timestamp=datetime.now(timezone.utc), + mac_address="AA:BB:CC:DD:EE:FF", + ssid="TestNetwork", + signal_strength=-65.0, + ) + + events = temp_db.get_events(sweep_id, event_type="wifi") + assert len(events) == 1 + assert events[0]["mac_address"] == "AA:BB:CC:DD:EE:FF" + assert events[0]["ssid"] == "TestNetwork" + + def test_ble_events(self, temp_db): + """Test BLE event storage.""" + sweep_id = temp_db.create_sweep( + sweep_id="ble_test", + client_name="test", + ) + + temp_db.add_event( + sweep_db_id=sweep_id, + event_type="ble", + timestamp=datetime.now(timezone.utc), + mac_address="11:22:33:44:55:66", + signal_strength=-75.0, + ) + + events = temp_db.get_events(sweep_id, event_type="ble") + assert len(events) == 1 + assert events[0]["mac_address"] == "11:22:33:44:55:66" + + def test_gsm_events(self, temp_db): + """Test GSM event storage.""" + sweep_id = temp_db.create_sweep( + sweep_id="gsm_test", + client_name="test", + ) + + temp_db.add_event( + sweep_db_id=sweep_id, + event_type="gsm", + timestamp=datetime.now(timezone.utc), + mcc=310, + mnc=260, + lac=12345, + cid=67890, + arfcn=123, + ) + + events = temp_db.get_events(sweep_id, event_type="gsm") + assert len(events) == 1