-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathppr.py
More file actions
322 lines (268 loc) · 11.8 KB
/
ppr.py
File metadata and controls
322 lines (268 loc) · 11.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
"""Personalized PageRank (PPR) engine — zero-dependency, dict-based sparse implementation.
Power iteration:
r(t+1) = (1 - damping) * personalization + damping * A_norm * r(t)
Where A_norm is a column-normalized adjacency matrix built from the graph edges.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from synaptic.models import EdgeKind, NodeKind
if TYPE_CHECKING:
from synaptic.extensions.chunk_entity_index import ChunkEntityIndex
from synaptic.protocols import StorageBackend
# Edge type별 PPR 전파 가중치 — 의미 있는 관계일수록 더 강하게 전파
_EDGE_TYPE_WEIGHTS: dict[EdgeKind, float] = {
EdgeKind.CAUSED: 1.0, # 인과 관계 — 강한 전파
EdgeKind.RESULTED_IN: 1.0, # 결과 — 강한 전파
EdgeKind.DEPENDS_ON: 0.9, # 의존 — 강한 전파
EdgeKind.LEARNED_FROM: 0.8, # 교훈 — 중간
EdgeKind.PRODUCED: 0.8, # 생산 — 중간
EdgeKind.PART_OF: 0.7, # 부분-전체 — 중간
EdgeKind.CONTAINS: 0.6, # 포함 (phrase) — 약한 전파
EdgeKind.RELATED: 0.4, # 일반 관련 — 약한 (노이즈 방지)
EdgeKind.CONTRADICTS: 0.2, # 모순 — 최소 전파
EdgeKind.SUPERSEDES: 0.3, # 대체 — 약한
EdgeKind.IS_A: 0.5, # 타입 계층 — 중간
EdgeKind.INVOKED: 0.6, # 호출 — 중간
EdgeKind.FOLLOWED_BY: 0.7, # 순서 — 중간
# v1.0: chunk-entity graph
EdgeKind.MENTIONS: 0.8, # chunk → entity — 강한
EdgeKind.EXTRACTED_FROM: 0.8, # entity → chunk — 강한
EdgeKind.NEXT_CHUNK: 0.3, # chunk → chunk 순서 — 약한 (청크 간 직접 전파 제한)
}
async def personalized_pagerank(
backend: StorageBackend,
seed_scores: dict[str, float],
*,
damping: float = 0.85,
max_iter: int = 50,
tol: float = 1e-6,
top_k: int = 20,
) -> list[tuple[str, float]]:
"""Perform PPR and return top-k (node_id, score) pairs.
The graph is discovered incrementally via BFS from seed nodes so that
only the reachable subgraph is materialized — no need to enumerate all
nodes/edges in the backend.
Args:
backend: Storage backend implementing the StorageBackend protocol.
seed_scores: {node_id: weight} — search result scores as personalization.
damping: Probability of following an edge (vs teleporting back to seeds).
max_iter: Maximum power-iteration steps.
tol: Convergence threshold (L1 norm of rank change).
top_k: Number of top-ranked nodes to return.
Returns:
List of (node_id, ppr_score) sorted descending by score.
"""
if not seed_scores:
return []
# --- 1. BFS to discover the reachable subgraph (depth 2 from seeds) ---
# adjacency: source -> [(target, weight), ...]
adj: dict[str, list[tuple[str, float]]] = {}
visited: set[str] = set()
frontier = set(seed_scores.keys())
bfs_depth = 2
for _ in range(bfs_depth):
if not frontier:
break
next_frontier: set[str] = set()
for nid in frontier:
if nid in visited:
continue
visited.add(nid)
if nid not in adj:
adj[nid] = []
edges = await backend.get_edges(nid, direction="both")
for edge in edges:
# Determine the neighbor
if edge.source_id == nid:
neighbor_id = edge.target_id
else:
neighbor_id = edge.source_id
# Edge type weighting: meaningful relations spread more
edge_type_weight = _EDGE_TYPE_WEIGHTS.get(edge.kind, 0.5)
effective_weight = edge.weight * edge_type_weight
# Add edge in both directions (undirected for PPR spreading)
adj[nid].append((neighbor_id, effective_weight))
if neighbor_id not in adj:
adj[neighbor_id] = []
adj[neighbor_id].append((nid, effective_weight))
if neighbor_id not in visited:
next_frontier.add(neighbor_id)
frontier = next_frontier
# Mark remaining frontier nodes as visited (leaf nodes with no outgoing expansion)
visited.update(frontier)
for nid in frontier:
if nid not in adj:
adj[nid] = []
all_nodes = set(adj.keys()) | set(seed_scores.keys())
# No edges at all — return seed scores directly (sorted)
if not any(adj.values()):
sorted_seeds = sorted(seed_scores.items(), key=lambda x: x[1], reverse=True)
return sorted_seeds[:top_k]
# --- 2. Build column-normalized adjacency (as sparse dicts) ---
# out_weight[node] = sum of weights of outgoing edges
out_weight: dict[str, float] = {}
for src, neighbors in adj.items():
total = sum(w for _, w in neighbors)
out_weight[src] = total if total > 0 else 1.0
# --- 3. Normalize personalization vector ---
total_seed = sum(seed_scores.values())
if total_seed == 0:
return []
personalization: dict[str, float] = {nid: s / total_seed for nid, s in seed_scores.items()}
# --- 4. Power iteration ---
# Initialize rank vector = personalization
rank: dict[str, float] = {}
for nid in all_nodes:
rank[nid] = personalization.get(nid, 0.0)
teleport_coeff = 1.0 - damping
for _ in range(max_iter):
new_rank: dict[str, float] = {}
# Initialize with teleport (personalization)
for nid in all_nodes:
new_rank[nid] = teleport_coeff * personalization.get(nid, 0.0)
# Distribute rank along edges
for src, neighbors in adj.items():
if not neighbors:
continue
src_rank = rank[src]
src_out = out_weight[src]
for tgt, w in neighbors:
# Column-normalized: edge_weight / total_out_weight * src_rank
contribution = damping * src_rank * w / src_out
new_rank[tgt] = new_rank.get(tgt, 0.0) + contribution
# Check convergence (L1 norm)
diff = sum(abs(new_rank.get(nid, 0.0) - rank.get(nid, 0.0)) for nid in all_nodes)
rank = new_rank
if diff < tol:
break
# --- 5. Return top-k ---
sorted_results = sorted(rank.items(), key=lambda x: x[1], reverse=True)
return sorted_results[:top_k]
async def personalized_pagerank_v2(
backend: StorageBackend,
seed_scores: dict[str, float],
*,
chunk_entity_index: ChunkEntityIndex | None = None,
damping: float = 0.85,
max_iter: int = 50,
tol: float = 1e-6,
top_k: int = 20,
edge_weight_floor: float = 0.15,
passage_boost: float = 1.5,
) -> list[tuple[str, float]]:
"""HippoRAG2-inspired PPR v2 with noise reduction.
Key improvements over v1:
1. CHUNK seed boost: passage nodes get higher teleport weight (more grounded)
2. Entity-mediated spreading: CHUNK→CHUNK direct propagation blocked,
spreading only through ENTITY nodes (reduces noise)
3. Weak edge zeroing: edges with weight < edge_weight_floor are ignored
Args:
backend: Storage backend.
seed_scores: {node_id: weight} — search result scores.
chunk_entity_index: Bidirectional index (enables chunk/entity awareness).
damping: Edge-follow probability.
max_iter: Max iterations.
tol: Convergence threshold.
top_k: Top results to return.
edge_weight_floor: Zero out edges below this weight.
passage_boost: Teleport weight multiplier for CHUNK nodes.
Returns:
List of (node_id, ppr_score) sorted descending.
"""
if not seed_scores:
return []
# --- 1. BFS subgraph discovery (depth 2) ---
adj: dict[str, list[tuple[str, float]]] = {}
node_kinds: dict[str, str] = {} # node_id → kind
visited: set[str] = set()
frontier = set(seed_scores.keys())
bfs_depth = 2
for _ in range(bfs_depth):
if not frontier:
break
next_frontier: set[str] = set()
for nid in frontier:
if nid in visited:
continue
visited.add(nid)
if nid not in adj:
adj[nid] = []
# Track node kind for chunk/entity awareness
if nid not in node_kinds:
node = await backend.get_node(nid)
if node:
node_kinds[nid] = str(node.kind)
edges = await backend.get_edges(nid, direction="both")
for edge in edges:
if edge.source_id == nid:
neighbor_id = edge.target_id
else:
neighbor_id = edge.source_id
# Track neighbor kind
if neighbor_id not in node_kinds:
neighbor_node = await backend.get_node(neighbor_id)
if neighbor_node:
node_kinds[neighbor_id] = str(neighbor_node.kind)
edge_type_weight = _EDGE_TYPE_WEIGHTS.get(edge.kind, 0.5)
effective_weight = edge.weight * edge_type_weight
# Weak edge zeroing
if effective_weight < edge_weight_floor:
continue
# Entity-mediated spreading: block CHUNK→CHUNK direct propagation
src_kind = node_kinds.get(nid, "")
dst_kind = node_kinds.get(neighbor_id, "")
if src_kind == NodeKind.CHUNK and dst_kind == NodeKind.CHUNK:
# Only allow NEXT_CHUNK (sequential reading), heavily dampened
if edge.kind != EdgeKind.NEXT_CHUNK:
continue
adj[nid].append((neighbor_id, effective_weight))
if neighbor_id not in adj:
adj[neighbor_id] = []
adj[neighbor_id].append((nid, effective_weight))
if neighbor_id not in visited:
next_frontier.add(neighbor_id)
frontier = next_frontier
visited.update(frontier)
for nid in frontier:
if nid not in adj:
adj[nid] = []
all_nodes = set(adj.keys()) | set(seed_scores.keys())
if not any(adj.values()):
sorted_seeds = sorted(seed_scores.items(), key=lambda x: x[1], reverse=True)
return sorted_seeds[:top_k]
# --- 2. Column-normalized adjacency ---
out_weight: dict[str, float] = {}
for src, neighbors in adj.items():
total = sum(w for _, w in neighbors)
out_weight[src] = total if total > 0 else 1.0
# --- 3. Personalization with passage boost ---
boosted_seeds: dict[str, float] = {}
for nid, score in seed_scores.items():
if node_kinds.get(nid) == NodeKind.CHUNK:
boosted_seeds[nid] = score * passage_boost
else:
boosted_seeds[nid] = score
total_seed = sum(boosted_seeds.values())
if total_seed == 0:
return []
personalization = {nid: s / total_seed for nid, s in boosted_seeds.items()}
# --- 4. Power iteration ---
rank: dict[str, float] = {nid: personalization.get(nid, 0.0) for nid in all_nodes}
teleport_coeff = 1.0 - damping
for _ in range(max_iter):
new_rank = {nid: teleport_coeff * personalization.get(nid, 0.0) for nid in all_nodes}
for src, neighbors in adj.items():
if not neighbors:
continue
src_rank = rank[src]
src_out = out_weight[src]
for tgt, w in neighbors:
contribution = damping * src_rank * w / src_out
new_rank[tgt] = new_rank.get(tgt, 0.0) + contribution
diff = sum(abs(new_rank.get(nid, 0.0) - rank.get(nid, 0.0)) for nid in all_nodes)
rank = new_rank
if diff < tol:
break
# --- 5. Return top-k ---
sorted_results = sorted(rank.items(), key=lambda x: x[1], reverse=True)
return sorted_results[:top_k]