11#!/usr/bin/env python3
22"""SecID Server — self-hosted resolver with pluggable storage.
33
4- Usage:
5- python secid_server.py --registry /path/to/SecID/registry
6- python secid_server.py --registry /data/public/registry --registry /data/private/registry
7- python secid_server.py --storage redis --redis-url redis://localhost:6379
8- python secid_server.py --load bulk # pre-load all entries at startup
9- python secid_server.py --load lazy # load on first request (default)
4+ Two ways to use this module:
5+
6+ 1. CLI: python secid_server.py --registry /path/to/SecID/registry [...]
7+ 2. Library: from secid_server import create_app, ServerConfig
8+ config = ServerConfig(registry_dirs=["./registry"])
9+ app = create_app(config)
10+
11+ The factory function lets tests build a fully-configured app without
12+ running the CLI bootstrap, and lets ASGI deployments (uvicorn, gunicorn)
13+ construct the app from environment variables instead of argparse.
1014
1115Serves:
1216 GET /api/v1/resolve?secid=... — REST API (same as secid.cloudsecurityalliance.org)
13- /mcp — MCP endpoint (same three tools)
17+ GET /health — health check
18+ POST /admin/reload — reload registry data after git pull
19+ /mcp — MCP endpoint (when `mcp` package is installed)
1420"""
1521
22+ from __future__ import annotations
23+
1624import argparse
1725import json
1826import logging
1927import os
2028import sys
29+ from dataclasses import dataclass , field
2130from typing import Optional
2231
23- from fastapi import FastAPI , Query , Request
32+ from fastapi import FastAPI , Query
2433from fastapi .middleware .cors import CORSMiddleware
2534from fastapi .responses import JSONResponse
2635
2736from storage import create_store
2837from registry_loader import bulk_load , SECID_TYPES
2938from resolver import resolve
3039
31- # --- CLI arguments ---
32-
33- parser = argparse .ArgumentParser (description = "SecID Self-Hosted Server" )
34- parser .add_argument (
35- "--registry" , action = "append" , default = [],
36- help = "Path to registry directory (can specify multiple for overlay). Default: ./registry" ,
37- )
38- parser .add_argument ("--storage" , default = "memory" , choices = ["memory" , "redis" , "memcached" , "sqlite" ])
39- parser .add_argument ("--redis-url" , default = "redis://localhost:6379" )
40- parser .add_argument ("--memcached-url" , default = "localhost:11211" )
41- parser .add_argument ("--sqlite-path" , default = ":memory:" )
42- parser .add_argument ("--load" , default = "lazy" , choices = ["lazy" , "bulk" ])
43- parser .add_argument ("--host" , default = "0.0.0.0" )
44- parser .add_argument ("--port" , type = int , default = 8000 )
45- parser .add_argument ("--log-level" , default = "INFO" , choices = ["DEBUG" , "INFO" , "WARNING" , "ERROR" ])
46-
47- args , _ = parser .parse_known_args ()
48-
49- # Default registry path
50- if not args .registry :
51- # Try common locations
52- for candidate in ["./registry" , "../SecID/registry" , os .path .expanduser ("~/GitHub/CloudSecurityAlliance/SecID/registry" )]:
53- if os .path .isdir (candidate ):
54- args .registry = [candidate ]
55- break
56- if not args .registry :
57- print ("Error: No registry directory found. Use --registry /path/to/SecID/registry" , file = sys .stderr )
58- sys .exit (1 )
59-
60- logging .basicConfig (level = getattr (logging , args .log_level ), format = "%(asctime)s %(levelname)s %(message)s" )
6140logger = logging .getLogger (__name__ )
6241
63- # --- Storage + loading ---
6442
65- storage_kwargs = {}
66- if args .storage == "redis" :
67- storage_kwargs ["url" ] = args .redis_url
68- elif args .storage == "memcached" :
69- storage_kwargs ["url" ] = args .memcached_url
70- elif args .storage == "sqlite" :
71- storage_kwargs ["path" ] = args .sqlite_path
43+ # ---------------------------------------------------------------------------
44+ # Configuration
45+ # ---------------------------------------------------------------------------
7246
73- store = create_store (args .storage , ** storage_kwargs )
7447
75- if args .load == "bulk" :
76- count = bulk_load (store , args .registry )
77- logger .info (f"Bulk loaded { count } namespaces into { args .storage } store" )
78- else :
79- logger .info (f"Lazy loading from { args .registry } with { args .storage } store" )
48+ @dataclass
49+ class ServerConfig :
50+ """Configuration for the SecID server. Pass to create_app().
8051
81- # --- FastAPI app ---
52+ Attributes:
53+ registry_dirs: List of paths to registry directories. Later
54+ directories override earlier ones for the same namespace+type
55+ (overlay support).
56+ storage_type: One of "memory", "redis", "memcached", "sqlite".
57+ storage_kwargs: Backend-specific kwargs forwarded to create_store().
58+ For redis/memcached: {"url": "..."}. For sqlite: {"path": "..."}.
59+ load_mode: "lazy" (load on first request, default) or "bulk"
60+ (load everything at startup).
61+ """
8262
83- app = FastAPI (
84- title = "SecID Server" ,
85- description = "Self-hosted SecID resolver" ,
86- version = "0.1.0" ,
87- )
63+ registry_dirs : list [str ]
64+ storage_type : str = "memory"
65+ storage_kwargs : dict = field (default_factory = dict )
66+ load_mode : str = "lazy"
8867
89- app .add_middleware (
90- CORSMiddleware ,
91- allow_origins = ["*" ],
92- allow_methods = ["GET" , "POST" ],
93- allow_headers = ["*" ],
94- )
9568
69+ # ---------------------------------------------------------------------------
70+ # App factory
71+ # ---------------------------------------------------------------------------
9672
97- @app .get ("/api/v1/resolve" )
98- async def api_resolve (
99- secid : str = Query (..., description = "SecID string to resolve" ),
100- parsability : Optional [str ] = Query (None , description = "Filter results by parsability: 'structured' or 'scraped'" ),
101- ):
102- """Resolve a SecID string to URLs and registry data."""
103- result = resolve (store , secid , registry_dirs = args .registry )
104- # Filter by parsability if requested
105- if parsability and "results" in result :
106- result ["results" ] = [
107- r for r in result ["results" ]
108- if "url" not in r or r .get ("parsability" ) == parsability
109- ]
110- return JSONResponse (content = result )
11173
74+ def create_app (config : ServerConfig ) -> FastAPI :
75+ """Create a configured SecID server FastAPI app.
11276
113- @app .post ("/admin/reload" )
114- async def admin_reload ():
115- """Reload registry data (after git pull)."""
116- from registry_loader import update_load
117- count = update_load (store , args .registry )
118- return {"reloaded" : count }
77+ Used by the CLI (see main()) and by tests (via fastapi.testclient.TestClient).
78+ Has no module-level side effects, so importing this file is safe.
79+ """
80+ store = create_store (config .storage_type , ** config .storage_kwargs )
11981
82+ if config .load_mode == "bulk" :
83+ count = bulk_load (store , config .registry_dirs )
84+ logger .info (f"Bulk loaded { count } namespaces into { config .storage_type } store" )
85+ else :
86+ logger .info (f"Lazy loading from { config .registry_dirs } with { config .storage_type } store" )
12087
121- @app .get ("/health" )
122- async def health ():
123- """Health check."""
124- key_count = len (store .keys ())
125- return {"status" : "ok" , "store" : args .storage , "keys" : key_count }
126-
88+ app = FastAPI (
89+ title = "SecID Server" ,
90+ description = "Self-hosted SecID resolver" ,
91+ version = "0.1.0" ,
92+ )
12793
128- # --- MCP Server (same three tools as SecID-Service) ---
94+ app .add_middleware (
95+ CORSMiddleware ,
96+ allow_origins = ["*" ],
97+ allow_methods = ["GET" , "POST" ],
98+ allow_headers = ["*" ],
99+ )
129100
130- try :
131- from mcp .server .fastmcp import FastMCP
101+ @app .get ("/api/v1/resolve" )
102+ async def api_resolve (
103+ secid : str = Query (..., description = "SecID string to resolve" ),
104+ parsability : Optional [str ] = Query (
105+ None ,
106+ description = "Filter results by parsability: 'structured' or 'scraped'" ,
107+ ),
108+ ):
109+ """Resolve a SecID string to URLs and registry data."""
110+ result = resolve (store , secid , registry_dirs = config .registry_dirs )
111+ if parsability and "results" in result :
112+ result ["results" ] = [
113+ r for r in result ["results" ]
114+ if "url" not in r or r .get ("parsability" ) == parsability
115+ ]
116+ return JSONResponse (content = result )
117+
118+ @app .post ("/admin/reload" )
119+ async def admin_reload ():
120+ """Reload registry data (after git pull)."""
121+ from registry_loader import update_load
122+ count = update_load (store , config .registry_dirs )
123+ return {"reloaded" : count }
124+
125+ @app .get ("/health" )
126+ async def health ():
127+ """Health check — returns store type and current key count."""
128+ key_count = len (store .keys ())
129+ return {"status" : "ok" , "store" : config .storage_type , "keys" : key_count }
130+
131+ _try_mount_mcp (app , store , config )
132+ return app
133+
134+
135+ def _try_mount_mcp (app : FastAPI , store , config : ServerConfig ) -> None :
136+ """Mount /mcp endpoint if the `mcp` package is available.
137+
138+ Same three tools as SecID-Service (resolve, lookup, describe). Optional
139+ dependency so users who only need the REST API don't have to install MCP.
140+ """
141+ try :
142+ from mcp .server .fastmcp import FastMCP
143+ except ImportError :
144+ logger .info ("MCP SDK not installed — /mcp endpoint disabled. Install with: pip install mcp" )
145+ return
132146
133147 mcp = FastMCP (
134148 "SecID" ,
135- instructions = "Self-hosted SecID resolver. Resolve, look up, and describe security knowledge identifiers." ,
149+ instructions = (
150+ "Self-hosted SecID resolver. Resolve, look up, and describe "
151+ "security knowledge identifiers."
152+ ),
136153 )
137154
138155 @mcp .tool ()
@@ -145,7 +162,7 @@ def mcp_resolve(secid: str) -> str:
145162 secid:ttp/mitre.org/attack#T1059.003 → ATT&CK technique URL
146163 secid:methodology/first.org/cvss@4.0 → CVSS v4.0 specification
147164 """
148- return json .dumps (resolve (store , secid , registry_dirs = args . registry ), indent = 2 )
165+ return json .dumps (resolve (store , secid , registry_dirs = config . registry_dirs ), indent = 2 )
149166
150167 @mcp .tool ()
151168 def mcp_lookup (type : str , identifier : str ) -> str :
@@ -157,7 +174,7 @@ def mcp_lookup(type: str, identifier: str) -> str:
157174 identifier: The identifier to search for (e.g., CVE-2021-44228, CWE-79)
158175 """
159176 secid = f"secid:{ type } /{ identifier } "
160- return json .dumps (resolve (store , secid , registry_dirs = args . registry ), indent = 2 )
177+ return json .dumps (resolve (store , secid , registry_dirs = config . registry_dirs ), indent = 2 )
161178
162179 @mcp .tool ()
163180 def mcp_describe (secid : str ) -> str :
@@ -171,19 +188,95 @@ def mcp_describe(secid: str) -> str:
171188 hash_idx = secid .find ("#" )
172189 if hash_idx != - 1 :
173190 secid = secid [:hash_idx ]
174- return json .dumps (resolve (store , secid , registry_dirs = args . registry ), indent = 2 )
191+ return json .dumps (resolve (store , secid , registry_dirs = config . registry_dirs ), indent = 2 )
175192
176- # Mount MCP at /mcp
177193 app .mount ("/mcp" , mcp .streamable_http_app ())
178194 logger .info ("MCP endpoint available at /mcp" )
179195
180- except ImportError :
181- logger .info ("MCP SDK not installed — /mcp endpoint disabled. Install with: pip install mcp" )
182196
197+ # ---------------------------------------------------------------------------
198+ # CLI entry point
199+ # ---------------------------------------------------------------------------
200+
201+
202+ def _parse_args (argv : Optional [list [str ]] = None ) -> argparse .Namespace :
203+ parser = argparse .ArgumentParser (description = "SecID Self-Hosted Server" )
204+ parser .add_argument (
205+ "--registry" , action = "append" , default = [],
206+ help = "Path to registry directory (can specify multiple for overlay). Default: ./registry" ,
207+ )
208+ parser .add_argument ("--storage" , default = "memory" , choices = ["memory" , "redis" , "memcached" , "sqlite" ])
209+ parser .add_argument ("--redis-url" , default = "redis://localhost:6379" )
210+ parser .add_argument ("--memcached-url" , default = "localhost:11211" )
211+ parser .add_argument ("--sqlite-path" , default = ":memory:" )
212+ parser .add_argument ("--load" , default = "lazy" , choices = ["lazy" , "bulk" ])
213+ parser .add_argument ("--host" , default = "0.0.0.0" )
214+ parser .add_argument ("--port" , type = int , default = 8000 )
215+ parser .add_argument ("--log-level" , default = "INFO" , choices = ["DEBUG" , "INFO" , "WARNING" , "ERROR" ])
216+ return parser .parse_args (argv )
217+
218+
219+ def _resolve_registry_dirs (provided : list [str ]) -> list [str ]:
220+ """If no --registry was passed, search common host-local locations.
221+
222+ Returns the list to use (provided as-is if non-empty, or a single
223+ auto-discovered path, or empty list if nothing found).
224+ """
225+ if provided :
226+ return provided
227+ for candidate in [
228+ "./registry" ,
229+ "../SecID/registry" ,
230+ os .path .expanduser ("~/GitHub/CloudSecurityAlliance/SecID/registry" ),
231+ ]:
232+ if os .path .isdir (candidate ):
233+ return [candidate ]
234+ return []
235+
236+
237+ def _build_storage_kwargs (args : argparse .Namespace ) -> dict :
238+ if args .storage == "redis" :
239+ return {"url" : args .redis_url }
240+ if args .storage == "memcached" :
241+ return {"url" : args .memcached_url }
242+ if args .storage == "sqlite" :
243+ return {"path" : args .sqlite_path }
244+ return {}
245+
246+
247+ def main (argv : Optional [list [str ]] = None ) -> int :
248+ """CLI entry point. Returns exit code."""
249+ args = _parse_args (argv )
250+
251+ registry_dirs = _resolve_registry_dirs (args .registry )
252+ if not registry_dirs :
253+ print (
254+ "Error: No registry directory found. Use --registry /path/to/SecID/registry" ,
255+ file = sys .stderr ,
256+ )
257+ return 1
258+
259+ logging .basicConfig (
260+ level = getattr (logging , args .log_level ),
261+ format = "%(asctime)s %(levelname)s %(message)s" ,
262+ )
263+
264+ config = ServerConfig (
265+ registry_dirs = registry_dirs ,
266+ storage_type = args .storage ,
267+ storage_kwargs = _build_storage_kwargs (args ),
268+ load_mode = args .load ,
269+ )
270+
271+ app = create_app (config )
183272
184- if __name__ == "__main__" :
185273 import uvicorn
186274 logger .info (f"Starting SecID server on { args .host } :{ args .port } " )
187- logger .info (f"Registry: { args . registry } " )
275+ logger .info (f"Registry: { registry_dirs } " )
188276 logger .info (f"Storage: { args .storage } , Loading: { args .load } " )
189277 uvicorn .run (app , host = args .host , port = args .port )
278+ return 0
279+
280+
281+ if __name__ == "__main__" :
282+ sys .exit (main ())
0 commit comments