Skip to content

Commit 0076fd9

Browse files
SonAIengineclaude
andcommitted
feat(eval): MetaCorpus combiner — Phase 1.4 multi-domain foundation
eval/build_metacorpus.py reads N domain-specific sqlite corpora and emits a single combined sqlite with every node tagged ``properties._domain_id`` so cross-domain queries can score per-domain coverage without changing the Node schema or any backend code. Why combine instead of federate at query time: - All current backends are single-DB; federation = a new code path. - Combining is one-shot, no runtime overhead, no risk on hot paths. - Once cross-domain queries actually work on the combined DB, that proves the value before we invest in true federation. End-to-end test on the real corpora: $ uv run python eval/build_metacorpus.py krra → 90,125 nodes / 292,429 edges (0 collisions) assort → 13,909 nodes / 20,450 edges (0 collisions) x2bee → 19,843 nodes / 18,744 edges (0 collisions) TOTAL: 123,877 nodes, 331,623 edges → eval/data/metacorpus.sqlite The phrase-hub collision risk we worried about (different domains hashing the same phrase to the same ``phrase_<md5>`` id) didn't materialize at this scale — likely because the corpora's phrase vocabularies don't overlap meaningfully (legal/admin vs ecommerce). The combiner counts phrase collisions separately from real ones so Phase 1.2 namespacing can be deferred until/unless they appear. Output (818 MB) gitignored under existing eval/data/*.sqlite rule. Tests +7 (every node tagged, pre-existing _domain_id preserved on nested combines, collisions counted not silent, phrase-hub collisions tracked separately, edges copied verbatim, rebuild is clean, per-domain count queryable end-to-end). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent bd18b96 commit 0076fd9

2 files changed

Lines changed: 473 additions & 0 deletions

File tree

eval/build_metacorpus.py

Lines changed: 293 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,293 @@
1+
"""Combine N domain-specific sqlite corpora into one MetaCorpus.
2+
3+
Phase 1.4 of the v0.20+ track. Output: one sqlite file containing every
4+
node + every edge from each source corpus, with each node tagged
5+
``properties._domain_id = <domain>`` so cross-domain queries can score
6+
per-domain coverage without changing the Node schema.
7+
8+
Why combine instead of federate at query time?
9+
- Existing backends are single-DB. Federated query = new code path.
10+
- Combining is one-time, side-effect-free, doesn't touch runtime.
11+
- Once cross-domain queries actually work on the combined DB, that
12+
proves the value before we invest in federation.
13+
14+
Node ID collision risk: source IDs are mostly 16-char MD5 hashes
15+
(`doc_<hash>` / `chunk_<hash>` / `phrase_<hash>`). Across all 3
16+
corpora (~150K nodes total), birthday-paradox collision probability
17+
is ~10⁻¹². If a collision DOES happen at runtime, the second insert
18+
will overwrite the first — loud rather than silent because the
19+
combiner checks via INSERT OR IGNORE and reports duplicates.
20+
21+
Phrase hub IDs (``phrase_<md5>``) are the highest collision risk
22+
because the same word ("operations") in two domains produces the same
23+
hash. Phase 1.2 (deferred) namespaces phrase hubs as
24+
``phrase_{domain}_{md5}``; until then, the combiner reports phrase
25+
collisions so we know the magnitude.
26+
27+
CLI
28+
===
29+
uv run python eval/build_metacorpus.py
30+
Default: combine krra + assort + x2bee → eval/data/metacorpus.sqlite
31+
32+
uv run python eval/build_metacorpus.py --out custom.sqlite \\
33+
--source krra=eval/data/krra_graph.sqlite \\
34+
--source assort=eval/data/assort_graph.sqlite
35+
"""
36+
37+
from __future__ import annotations
38+
39+
import argparse
40+
import json
41+
import sqlite3
42+
import sys
43+
from dataclasses import dataclass
44+
from pathlib import Path
45+
46+
EVAL_DIR = Path(__file__).parent
47+
DATA_DIR = EVAL_DIR / "data"
48+
49+
DEFAULT_SOURCES: dict[str, Path] = {
50+
"krra": DATA_DIR / "krra_graph.sqlite",
51+
"assort": DATA_DIR / "assort_graph.sqlite",
52+
"x2bee": DATA_DIR / "x2bee_graph.sqlite",
53+
}
54+
55+
DEFAULT_OUT = DATA_DIR / "metacorpus.sqlite"
56+
57+
58+
@dataclass(slots=True)
59+
class MergeStats:
60+
"""Per-domain merge accounting — surfaced so collisions are visible."""
61+
62+
domain: str
63+
nodes_read: int = 0
64+
nodes_inserted: int = 0
65+
nodes_skipped: int = 0
66+
edges_read: int = 0
67+
edges_inserted: int = 0
68+
edges_skipped: int = 0
69+
phrase_collisions: int = 0
70+
other_collisions: int = 0
71+
72+
73+
def _ensure_schema(conn: sqlite3.Connection) -> None:
74+
"""Create the same syn_nodes / syn_edges schema used by SqliteGraphBackend.
75+
76+
Mirrors ``src/synaptic/backends/sqlite.py`` so this combined file
77+
drops in as a normal SqliteGraphBackend corpus. Indexes match what
78+
the FTS / vector index would otherwise rebuild on first open.
79+
"""
80+
cur = conn.cursor()
81+
cur.executescript(
82+
"""
83+
CREATE TABLE IF NOT EXISTS syn_nodes (
84+
id TEXT PRIMARY KEY,
85+
kind TEXT NOT NULL DEFAULT 'concept',
86+
title TEXT NOT NULL DEFAULT '',
87+
content TEXT NOT NULL DEFAULT '',
88+
tags_json TEXT NOT NULL DEFAULT '[]',
89+
level TEXT NOT NULL DEFAULT 'L0',
90+
vitality REAL NOT NULL DEFAULT 1.0,
91+
access_count INTEGER NOT NULL DEFAULT 0,
92+
success_count INTEGER NOT NULL DEFAULT 0,
93+
failure_count INTEGER NOT NULL DEFAULT 0,
94+
source TEXT NOT NULL DEFAULT '',
95+
properties_json TEXT NOT NULL DEFAULT '{}',
96+
embedding_json TEXT NOT NULL DEFAULT '[]',
97+
created_at REAL,
98+
updated_at REAL
99+
);
100+
CREATE TABLE IF NOT EXISTS syn_edges (
101+
id TEXT PRIMARY KEY,
102+
source_id TEXT NOT NULL,
103+
target_id TEXT NOT NULL,
104+
kind TEXT NOT NULL DEFAULT 'related',
105+
weight REAL NOT NULL DEFAULT 1.0,
106+
created_at REAL
107+
);
108+
CREATE INDEX IF NOT EXISTS idx_nodes_kind ON syn_nodes(kind);
109+
CREATE INDEX IF NOT EXISTS idx_edges_source ON syn_edges(source_id);
110+
CREATE INDEX IF NOT EXISTS idx_edges_target ON syn_edges(target_id);
111+
"""
112+
)
113+
conn.commit()
114+
115+
116+
def _merge_one(
117+
src: Path,
118+
domain: str,
119+
dst: sqlite3.Connection,
120+
) -> MergeStats:
121+
"""Merge one source sqlite into the destination connection.
122+
123+
Every node gets ``properties_json._domain_id = domain`` injected
124+
before insert. Pre-existing _domain_id (e.g. user already partitioned
125+
a single corpus) is preserved.
126+
127+
Edges copied verbatim. Cross-corpus edges don't exist yet by
128+
construction — sources are independent before the combiner runs.
129+
"""
130+
stats = MergeStats(domain=domain)
131+
src_conn = sqlite3.connect(src)
132+
src_conn.row_factory = sqlite3.Row
133+
src_cur = src_conn.cursor()
134+
dst_cur = dst.cursor()
135+
136+
# Nodes — read full row, mutate properties_json, insert.
137+
for row in src_cur.execute("SELECT * FROM syn_nodes"):
138+
stats.nodes_read += 1
139+
try:
140+
props = json.loads(row["properties_json"] or "{}")
141+
except json.JSONDecodeError:
142+
props = {}
143+
if not isinstance(props, dict):
144+
props = {}
145+
# Don't clobber an explicit _domain_id (allow nested combines)
146+
props.setdefault("_domain_id", domain)
147+
new_props_json = json.dumps(props, ensure_ascii=False)
148+
149+
try:
150+
dst_cur.execute(
151+
"""
152+
INSERT INTO syn_nodes (
153+
id, kind, title, content, tags_json, level, vitality,
154+
access_count, success_count, failure_count, source,
155+
properties_json, embedding_json, created_at, updated_at
156+
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
157+
""",
158+
(
159+
row["id"],
160+
row["kind"],
161+
row["title"],
162+
row["content"],
163+
row["tags_json"],
164+
row["level"],
165+
row["vitality"],
166+
row["access_count"],
167+
row["success_count"],
168+
row["failure_count"],
169+
row["source"],
170+
new_props_json,
171+
row["embedding_json"],
172+
row["created_at"],
173+
row["updated_at"],
174+
),
175+
)
176+
stats.nodes_inserted += 1
177+
except sqlite3.IntegrityError:
178+
# Collision — same node id already inserted by an earlier
179+
# source. Track which kind so we know if it's a phrase-hub
180+
# collision (expected, fixed by Phase 1.2) or something
181+
# actually concerning.
182+
stats.nodes_skipped += 1
183+
if str(row["id"]).startswith("phrase_"):
184+
stats.phrase_collisions += 1
185+
else:
186+
stats.other_collisions += 1
187+
188+
# Edges — copy as-is.
189+
for row in src_cur.execute("SELECT * FROM syn_edges"):
190+
stats.edges_read += 1
191+
try:
192+
dst_cur.execute(
193+
"""
194+
INSERT INTO syn_edges (
195+
id, source_id, target_id, kind, weight, created_at
196+
) VALUES (?, ?, ?, ?, ?, ?)
197+
""",
198+
(
199+
row["id"],
200+
row["source_id"],
201+
row["target_id"],
202+
row["kind"],
203+
row["weight"],
204+
row["created_at"],
205+
),
206+
)
207+
stats.edges_inserted += 1
208+
except sqlite3.IntegrityError:
209+
stats.edges_skipped += 1
210+
211+
src_conn.close()
212+
dst.commit()
213+
return stats
214+
215+
216+
def build(sources: dict[str, Path], out: Path) -> list[MergeStats]:
217+
"""Build a MetaCorpus at ``out`` from each (domain → sqlite) source."""
218+
if out.exists():
219+
out.unlink() # always start clean — partial combines are confusing
220+
out.parent.mkdir(parents=True, exist_ok=True)
221+
dst = sqlite3.connect(out)
222+
_ensure_schema(dst)
223+
all_stats: list[MergeStats] = []
224+
for domain, src in sources.items():
225+
if not src.exists():
226+
print(f"!! source missing: {domain}{src}", file=sys.stderr)
227+
continue
228+
all_stats.append(_merge_one(src, domain, dst))
229+
dst.close()
230+
return all_stats
231+
232+
233+
def _format(stats: list[MergeStats], out: Path) -> str:
234+
lines = ["", "MetaCorpus build report", "=" * 50, ""]
235+
total_n = total_e = 0
236+
for s in stats:
237+
lines.append(
238+
f" {s.domain:<10s} → nodes {s.nodes_inserted}/{s.nodes_read}"
239+
f" (+{s.nodes_skipped} skipped) edges {s.edges_inserted}/{s.edges_read}"
240+
f" (+{s.edges_skipped} skipped)"
241+
)
242+
if s.phrase_collisions:
243+
lines.append(
244+
f" phrase hub collisions: {s.phrase_collisions}"
245+
f" (Phase 1.2 will namespace these)"
246+
)
247+
if s.other_collisions:
248+
lines.append(f" ⚠ OTHER collisions: {s.other_collisions} (investigate)")
249+
total_n += s.nodes_inserted
250+
total_e += s.edges_inserted
251+
lines.append("")
252+
lines.append(f" TOTAL: {total_n} nodes, {total_e} edges → {out}")
253+
lines.append("")
254+
return "\n".join(lines)
255+
256+
257+
def _parse_args() -> argparse.Namespace:
258+
p = argparse.ArgumentParser(description=__doc__)
259+
p.add_argument(
260+
"--out",
261+
type=Path,
262+
default=DEFAULT_OUT,
263+
help=f"Output sqlite path (default: {DEFAULT_OUT})",
264+
)
265+
p.add_argument(
266+
"--source",
267+
action="append",
268+
default=[],
269+
help="Source as 'domain=path/to/sqlite'. Repeatable. "
270+
"If omitted, defaults to krra+assort+x2bee from eval/data/.",
271+
)
272+
return p.parse_args()
273+
274+
275+
def main() -> int:
276+
args = _parse_args()
277+
if args.source:
278+
sources: dict[str, Path] = {}
279+
for spec in args.source:
280+
if "=" not in spec:
281+
print(f"!! bad --source: {spec!r} (need domain=path)", file=sys.stderr)
282+
return 2
283+
domain, path = spec.split("=", 1)
284+
sources[domain.strip()] = Path(path.strip())
285+
else:
286+
sources = DEFAULT_SOURCES
287+
stats = build(sources, args.out)
288+
print(_format(stats, args.out))
289+
return 0
290+
291+
292+
if __name__ == "__main__":
293+
raise SystemExit(main())

0 commit comments

Comments
 (0)