Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/api-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
"better-sqlite3": "^12.8.0",
"dotenv": "^17.2.2",
"openai": "^6.32.0",
"vectorlite": "^0.2.0",
"zod": "^4.3.6"
}
}
2 changes: 1 addition & 1 deletion packages/api-core/src/service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1374,7 +1374,7 @@ test('clusterRepository emits timed progress updates while identifying similarit
});

assert.ok(result.edges > 0);
assert.ok(messages.some((message) => /identifying similarity edges/.test(message)));
assert.ok(messages.some((message) => /\[cluster\] (building|querying) /.test(message)));
} finally {
Date.now = originalDateNow;
service.close();
Expand Down
244 changes: 138 additions & 106 deletions packages/api-core/src/service.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import http from 'node:http';
import crypto from 'node:crypto';
import { existsSync } from 'node:fs';
import fs from 'node:fs';
import { createRequire } from 'node:module';
import os from 'node:os';
import { fileURLToPath } from 'node:url';
import { Worker } from 'node:worker_threads';
import path from 'node:path';

import { IterableMapper } from '@shutterstock/p-map-iterable';
import {
Expand Down Expand Up @@ -46,7 +46,6 @@ import {
} from '@ghcrawl/api-contract';

import { buildClusters } from './cluster/build.js';
import { buildSourceKindEdges } from './cluster/exact-edges.js';
import {
ensureRuntimeDirs,
isLikelyGitHubToken,
Expand All @@ -62,7 +61,7 @@ import { openDb, type SqliteDatabase } from './db/sqlite.js';
import { buildCanonicalDocument, isBotLikeAuthor } from './documents/normalize.js';
import { makeGitHubClient, type GitHubClient } from './github/client.js';
import { OpenAiProvider, type AiProvider } from './openai/provider.js';
import { cosineSimilarity, normalizeEmbedding, rankNearestNeighbors } from './search/exact.js';
import { cosineSimilarity, normalizeEmbedding, rankNearestNeighbors, rankNearestNeighborsByScore } from './search/exact.js';

type RunTable = 'sync_runs' | 'summary_runs' | 'embedding_runs' | 'cluster_runs';

Expand Down Expand Up @@ -257,10 +256,10 @@ const SYNC_BATCH_SIZE = 100;
const SYNC_BATCH_DELAY_MS = 5000;
const STALE_CLOSED_SWEEP_LIMIT = 1000;
const CLUSTER_PROGRESS_INTERVAL_MS = 5000;
const CLUSTER_PARALLEL_MIN_EMBEDDINGS = 5000;
const EMBED_ESTIMATED_CHARS_PER_TOKEN = 3;
const EMBED_MAX_ITEM_TOKENS = 7000;
const EMBED_MAX_BATCH_TOKENS = 250000;
const requireFromHere = createRequire(import.meta.url);
const EMBED_TRUNCATION_MARKER = '\n\n[truncated for embedding]';
const EMBED_CONTEXT_RETRY_ATTEMPTS = 5;
const EMBED_CONTEXT_RETRY_FALLBACK_SHRINK_RATIO = 0.9;
Expand Down Expand Up @@ -1084,15 +1083,17 @@ export class GHCrawlService {
const runId = this.startRun('cluster_runs', repository.id, repository.fullName);
const minScore = params.minScore ?? 0.82;
const k = params.k ?? 6;
const candidateK = Math.max(k, Math.max(k * 16, 64));

try {
const { items, sourceKinds } = this.loadClusterableThreadMeta(repository.id);

params.onProgress?.(
`[cluster] loaded ${items.length} embedded thread(s) across ${sourceKinds.length} source kind(s) for ${repository.fullName} k=${k} minScore=${minScore}`,
`[cluster] loaded ${items.length} embedded thread(s) across ${sourceKinds.length} source kind(s) for ${repository.fullName} backend=vectorlite k=${k} candidateK=${candidateK} minScore=${minScore}`,
);
const aggregatedEdges = await this.aggregateRepositoryEdges(repository.id, sourceKinds, {
limit: k,
candidateK,
minScore,
onProgress: params.onProgress,
});
Expand All @@ -1108,7 +1109,7 @@ export class GHCrawlService {
items.map((item) => ({ threadId: item.id, number: item.number, title: item.title })),
edges,
);
this.persistClusterRun(repository.id, runId, aggregatedEdges, clusters);
this.persistClusterRun(repository.id, runId, aggregatedEdges, clusters, 'vectorlite_hnsw_cosine');
this.pruneOldClusterRuns(repository.id, runId);

params.onProgress?.(`[cluster] persisted ${clusters.length} cluster(s) and pruned older cluster runs`);
Expand Down Expand Up @@ -2847,10 +2848,37 @@ export class GHCrawlService {
.iterate(repoId, this.config.embedModel) as IterableIterator<StoredEmbeddingRow>;
}

private loadNormalizedEmbeddingsForSourceKind(
private loadNormalizedEmbeddingForSourceKindHead(
repoId: number,
sourceKind: EmbeddingSourceKind,
): Array<{ id: number; normalizedEmbedding: number[] }> {
): { id: number; normalizedEmbedding: number[] } | null {
const row = this.db
.prepare(
`select t.id, e.embedding_json
from threads t
join document_embeddings e on e.thread_id = t.id
where t.repo_id = ?
and t.state = 'open'
and t.closed_at_local is null
and e.model = ?
and e.source_kind = ?
order by t.number asc
limit 1`,
)
.get(repoId, this.config.embedModel, sourceKind) as { id: number; embedding_json: string } | undefined;
if (!row) {
return null;
}
return {
id: row.id,
normalizedEmbedding: normalizeEmbedding(JSON.parse(row.embedding_json) as number[]).normalized,
};
}

private *iterateNormalizedEmbeddingsForSourceKind(
repoId: number,
sourceKind: EmbeddingSourceKind,
): IterableIterator<{ id: number; normalizedEmbedding: number[] }> {
const rows = this.db
.prepare(
`select t.id, e.embedding_json
Expand All @@ -2863,12 +2891,22 @@ export class GHCrawlService {
and e.source_kind = ?
order by t.number asc`,
)
.all(repoId, this.config.embedModel, sourceKind) as Array<{ id: number; embedding_json: string }>;
.iterate(repoId, this.config.embedModel, sourceKind) as IterableIterator<{ id: number; embedding_json: string }>;

return rows.map((row) => ({
id: row.id,
normalizedEmbedding: normalizeEmbedding(JSON.parse(row.embedding_json) as number[]).normalized,
}));
for (const row of rows) {
yield {
id: row.id,
normalizedEmbedding: normalizeEmbedding(JSON.parse(row.embedding_json) as number[]).normalized,
};
}
}

private normalizedEmbeddingBuffer(values: number[]): Buffer {
return Buffer.from(Float32Array.from(values).buffer);
}

private normalizedDistanceToScore(distance: number): number {
return 1 - distance / 2;
}

private loadClusterableThreadMeta(repoId: number): {
Expand Down Expand Up @@ -3052,101 +3090,103 @@ export class GHCrawlService {
private async aggregateRepositoryEdges(
repoId: number,
sourceKinds: EmbeddingSourceKind[],
params: { limit: number; minScore: number; onProgress?: (message: string) => void },
params: { limit: number; candidateK: number; minScore: number; onProgress?: (message: string) => void },
): Promise<Map<string, { leftThreadId: number; rightThreadId: number; score: number; sourceKinds: Set<EmbeddingSourceKind> }>> {
const aggregated = new Map<string, { leftThreadId: number; rightThreadId: number; score: number; sourceKinds: Set<EmbeddingSourceKind> }>();
const totalItems = sourceKinds.reduce((sum, sourceKind) => sum + this.countEmbeddingsForSourceKind(repoId, sourceKind), 0);

if (sourceKinds.length === 0 || totalItems === 0) {
if (sourceKinds.length === 0) {
return aggregated;
}

const workerRuntime = this.resolveEdgeWorkerRuntime();
const shouldParallelize = workerRuntime !== null && sourceKinds.length > 1 && totalItems >= CLUSTER_PARALLEL_MIN_EMBEDDINGS && os.availableParallelism() > 1;
if (!shouldParallelize) {
let processedItems = 0;
let tempDb: SqliteDatabase | null = null;
let tempDir: string | null = null;

try {
tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'ghcrawl-vectorlite-'));
tempDb = openDb(path.join(tempDir, 'cluster.db'));
tempDb.pragma('journal_mode = MEMORY');
tempDb.pragma('synchronous = OFF');
tempDb.pragma('temp_store = MEMORY');
const vectorlite = requireFromHere('vectorlite') as { vectorlitePath: () => string };
(tempDb as SqliteDatabase & { loadExtension: (extensionPath: string) => void }).loadExtension(vectorlite.vectorlitePath());

for (const sourceKind of sourceKinds) {
const items = this.loadNormalizedEmbeddingsForSourceKind(repoId, sourceKind);
const edges = buildSourceKindEdges(items, {
limit: params.limit,
minScore: params.minScore,
progressIntervalMs: CLUSTER_PROGRESS_INTERVAL_MS,
onProgress: (progress) => {
if (!params.onProgress) return;
params.onProgress(
`[cluster] identifying similarity edges ${processedItems + progress.processedItems}/${totalItems} source embeddings processed current_edges~=${aggregated.size + progress.currentEdgeEstimate}`,
);
},
});
processedItems += items.length;
this.mergeSourceKindEdges(aggregated, edges, sourceKind);
}
const sourceRowCount = this.countEmbeddingsForSourceKind(repoId, sourceKind);
if (sourceRowCount === 0) {
continue;
}

return aggregated;
}
const firstRow = this.loadNormalizedEmbeddingForSourceKindHead(repoId, sourceKind);
if (!firstRow) {
continue;
}

const progressBySource = new Map<EmbeddingSourceKind, { processedItems: number; totalItems: number; currentEdgeEstimate: number }>();

const edgeSets = await Promise.all(
sourceKinds.map(
(sourceKind) =>
new Promise<Array<{ leftThreadId: number; rightThreadId: number; score: number }>>((resolve, reject) => {
const worker = new Worker(workerRuntime.url, {
workerData: {
dbPath: this.config.dbPath,
repoId,
sourceKind,
limit: params.limit,
minScore: params.minScore,
},
});
const tableName = `vector_${sourceKind}`;
const dimension = firstRow.normalizedEmbedding.length;
const safeCandidateK = Math.min(params.candidateK, Math.max(1, sourceRowCount - 1));

worker.on('message', (message: unknown) => {
if (!message || typeof message !== 'object') {
return;
}
const typed = message as
| {
type: 'progress';
sourceKind: EmbeddingSourceKind;
processedItems: number;
totalItems: number;
currentEdgeEstimate: number;
}
| { type: 'result'; sourceKind: EmbeddingSourceKind; edges: Array<{ leftThreadId: number; rightThreadId: number; score: number }> };
if (typed.type === 'progress') {
progressBySource.set(typed.sourceKind, {
processedItems: typed.processedItems,
totalItems: typed.totalItems,
currentEdgeEstimate: typed.currentEdgeEstimate,
});
if (params.onProgress) {
const processedItems = Array.from(progressBySource.values()).reduce((sum, value) => sum + value.processedItems, 0);
const currentEdgeEstimate = Array.from(progressBySource.values()).reduce((sum, value) => sum + value.currentEdgeEstimate, 0);
params.onProgress(
`[cluster] identifying similarity edges ${processedItems}/${totalItems} source embeddings processed current_edges~=${aggregated.size + currentEdgeEstimate}`,
);
}
return;
}
resolve(typed.edges);
});
params.onProgress?.(`[cluster] building ${sourceKind} HNSW index ${sourceRowCount} vector(s)`);
tempDb.exec(
`create virtual table ${tableName} using vectorlite(vec float32[${dimension}], hnsw(max_elements=${sourceRowCount}));`,
);
const insert = tempDb.prepare(`insert into ${tableName}(rowid, vec) values (?, ?)`);
tempDb.transaction(() => {
for (const row of this.iterateNormalizedEmbeddingsForSourceKind(repoId, sourceKind)) {
insert.run(row.id, this.normalizedEmbeddingBuffer(row.normalizedEmbedding));
}
})();

worker.on('error', reject);
worker.on('exit', (code) => {
if (code !== 0) {
reject(new Error(`edge worker for ${sourceKind} exited with code ${code}`));
const query = tempDb.prepare(
`select rowid, distance from ${tableName} where knn_search(vec, knn_param(?, ${safeCandidateK + 1}))`,
);
let processed = 0;
let lastProgressAt = Date.now();
for (const row of this.iterateNormalizedEmbeddingsForSourceKind(repoId, sourceKind)) {
const candidates = query.all(this.normalizedEmbeddingBuffer(row.normalizedEmbedding)) as Array<{
rowid: number;
distance: number;
}>;
const ranked = rankNearestNeighborsByScore(candidates, {
limit: params.limit,
minScore: params.minScore,
score: (candidate) => {
if (candidate.rowid === row.id) {
return -1;
}
return this.normalizedDistanceToScore(candidate.distance);
},
});
for (const candidate of ranked) {
const key = this.edgeKey(row.id, candidate.item.rowid);
const existing = aggregated.get(key);
if (existing) {
existing.score = Math.max(existing.score, candidate.score);
existing.sourceKinds.add(sourceKind);
continue;
}
aggregated.set(key, {
leftThreadId: Math.min(row.id, candidate.item.rowid),
rightThreadId: Math.max(row.id, candidate.item.rowid),
score: candidate.score,
sourceKinds: new Set([sourceKind]),
});
}),
),
);
}
processed += 1;
const now = Date.now();
if (params.onProgress && now - lastProgressAt >= CLUSTER_PROGRESS_INTERVAL_MS) {
params.onProgress(`[cluster] querying ${sourceKind} index ${processed}/${sourceRowCount} current_edges=${aggregated.size}`);
lastProgressAt = now;
}
}
tempDb.exec(`drop table ${tableName}`);
}

for (const [index, edges] of edgeSets.entries()) {
this.mergeSourceKindEdges(aggregated, edges, sourceKinds[index] as EmbeddingSourceKind);
return aggregated;
} finally {
tempDb?.close();
if (tempDir) {
fs.rmSync(tempDir, { recursive: true, force: true });
}
}

return aggregated;
}

private mergeSourceKindEdges(
Expand Down Expand Up @@ -3186,20 +3226,12 @@ export class GHCrawlService {
return row.count;
}

private resolveEdgeWorkerRuntime(): { url: URL } | null {
const jsUrl = new URL('./cluster/edge-worker.js', import.meta.url);
if (existsSync(fileURLToPath(jsUrl))) {
return { url: jsUrl };
}
// Source-mode runs do not have a compiled worker entrypoint, so keep clustering in-process.
return null;
}

private persistClusterRun(
repoId: number,
runId: number,
aggregatedEdges: Map<string, { leftThreadId: number; rightThreadId: number; score: number; sourceKinds: Set<EmbeddingSourceKind> }>,
clusters: Array<{ representativeThreadId: number; members: number[] }>,
method: string,
): void {
const insertEdge = this.db.prepare(
`insert into similarity_edges (repo_id, cluster_run_id, left_thread_id, right_thread_id, method, score, explanation_json, created_at)
Expand All @@ -3224,7 +3256,7 @@ export class GHCrawlService {
runId,
edge.leftThreadId,
edge.rightThreadId,
'exact_cosine',
method,
edge.score,
asJson({ sources: Array.from(edge.sourceKinds).sort(), model: this.config.embedModel }),
createdAt,
Expand Down
Loading