-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdraft_revisions.py
More file actions
358 lines (291 loc) · 10.9 KB
/
Copy pathdraft_revisions.py
File metadata and controls
358 lines (291 loc) · 10.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
"""Draft revision tracking for logical podcast episodes.
A logical episode represents one conceptual podcast topic. When
the same topic is regenerated, the new draft becomes a numbered
revision (v2, v3, ...) of the same logical episode rather than an
unrelated draft.
The grouping key is derived from the episode title via
normalize_episode_key(). Source URLs provide a secondary match
signal for multi-paper episodes.
"""
import json
import re
import unicodedata
# ── Episode key normalization ──────────────────────────────────
def normalize_episode_key(title):
"""Derive a stable grouping key from an episode title.
Strips the "Episode: " prefix, normalizes unicode, lowercases,
removes punctuation, and collapses whitespace. The result is a
lowercase slug that groups drafts for the same logical episode.
Returns empty string for empty/None titles.
"""
if not title:
return ""
t = re.sub(r"^Episode:\s*", "", title, flags=re.IGNORECASE)
t = unicodedata.normalize("NFKD", t)
t = t.lower().strip()
t = re.sub(r"[^\w\s]", "", t)
t = re.sub(r"\s+", " ", t).strip()
return t
def _urls_to_arxiv_set(source_urls_json):
"""Extract arXiv IDs from a JSON-encoded list of URLs."""
if not source_urls_json:
return set()
try:
urls = json.loads(source_urls_json)
except (json.JSONDecodeError, TypeError):
return set()
ids = set()
for url in urls:
m = re.search(r'(\d{4}\.\d{4,5})', url)
if m:
ids.add(m.group(1))
return ids
# ── Revision detection and assignment ──────────────────────────
def find_logical_episode(conn, title, source_urls=None, exclude_id=None):
"""Find existing episodes that share the same logical identity.
Matches by episode_key (normalized title). If source_urls is
provided and the title match is ambiguous, arXiv ID overlap is
used as a tiebreaker.
Args:
exclude_id: Optional podcast ID to exclude from results
(typically the just-inserted row that triggered this
detection).
Returns a list of matching podcast rows (as dicts), ordered by
revision descending (latest first). Empty list if no match.
"""
key = normalize_episode_key(title)
if not key:
return []
rows = conn.execute(
"SELECT * FROM podcasts WHERE episode_key = ? "
"ORDER BY revision DESC, created_at DESC",
(key,)
).fetchall()
result = [dict(r) for r in rows if r["id"] != exclude_id]
if result:
return result
# Fallback: match by normalized title text even if episode_key
# was never backfilled (handles pre-migration rows).
all_rows = conn.execute(
"SELECT * FROM podcasts ORDER BY created_at DESC"
).fetchall()
matches = []
for r in all_rows:
if r["id"] == exclude_id:
continue
if normalize_episode_key(r["title"]) == key:
matches.append(dict(r))
return matches
def detect_revision(conn, title, source_urls=None, exclude_id=None):
"""Detect whether a new draft is a revision of an existing episode.
Args:
exclude_id: Optional podcast ID to exclude from matching
(the just-inserted row that triggered detection).
Returns (episode_key, revision_number, superseded_ids) where:
- episode_key: the normalized grouping key
- revision_number: the revision to assign (1 if new, N+1 if existing)
- superseded_ids: list of podcast IDs to mark as superseded
"""
key = normalize_episode_key(title)
existing = find_logical_episode(conn, title, source_urls,
exclude_id=exclude_id)
if not existing:
return key, 1, []
max_rev = max(e.get("revision") or 1 for e in existing)
active_ids = [
e["id"] for e in existing
if (e.get("revision_state") or "active") == "active"
]
return key, max_rev + 1, active_ids
def assign_revision(conn, podcast_id, episode_key, revision,
superseded_ids=None):
"""Set episode_key and revision on a podcast row, and supersede
older active revisions for the same logical episode.
Args:
conn: SQLite connection.
podcast_id: The newly inserted podcast ID.
episode_key: Normalized grouping key.
revision: Revision number to assign.
superseded_ids: List of older podcast IDs to mark superseded.
"""
conn.execute(
"UPDATE podcasts SET episode_key = ?, revision = ?, "
"revision_state = 'active' WHERE id = ?",
(episode_key, revision, podcast_id),
)
if superseded_ids:
for old_id in superseded_ids:
if old_id != podcast_id:
conn.execute(
"UPDATE podcasts SET revision_state = 'superseded' "
"WHERE id = ?",
(old_id,),
)
conn.commit()
# ── Approval / rejection ───────────────────────────────────────
def approve_revision(conn, podcast_id):
"""Mark a revision as approved and supersede all other active
revisions for the same logical episode.
Returns the episode_key, or None if the podcast_id was not found.
"""
row = conn.execute(
"SELECT * FROM podcasts WHERE id = ?", (podcast_id,)
).fetchone()
if not row:
return None
ep_key = row["episode_key"]
if not ep_key:
ep_key = normalize_episode_key(row["title"])
# Mark this revision as approved
conn.execute(
"UPDATE podcasts SET revision_state = 'approved', "
"episode_key = ? WHERE id = ?",
(ep_key, podcast_id),
)
# Supersede all other revisions for this logical episode
if ep_key:
conn.execute(
"UPDATE podcasts SET revision_state = 'superseded' "
"WHERE episode_key = ? AND id != ? "
"AND revision_state = 'active'",
(ep_key, podcast_id),
)
conn.commit()
return ep_key
def reject_episode_drafts(conn, episode_key):
"""Reject all active draft revisions for a logical episode.
Only rejects drafts that have not been published. Published
episodes (those with published_at set) are left untouched.
Returns the list of rejected podcast IDs.
"""
if not episode_key:
return []
rows = conn.execute(
"SELECT id, published_at FROM podcasts "
"WHERE episode_key = ? AND revision_state IN ('active', 'superseded')",
(episode_key,),
).fetchall()
rejected = []
for r in rows:
if r["published_at"]:
continue
conn.execute(
"UPDATE podcasts SET revision_state = 'rejected' WHERE id = ?",
(r["id"],),
)
rejected.append(r["id"])
conn.commit()
return rejected
def mark_published(conn, podcast_id):
"""Mark a revision as published. Supersede all other active/approved
revisions for the same logical episode.
Returns the episode_key, or None if not found.
"""
row = conn.execute(
"SELECT * FROM podcasts WHERE id = ?", (podcast_id,)
).fetchone()
if not row:
return None
ep_key = row["episode_key"]
if not ep_key:
ep_key = normalize_episode_key(row["title"])
conn.execute(
"UPDATE podcasts SET revision_state = 'published', "
"episode_key = ? WHERE id = ?",
(ep_key, podcast_id),
)
if ep_key:
conn.execute(
"UPDATE podcasts SET revision_state = 'superseded' "
"WHERE episode_key = ? AND id != ? "
"AND revision_state IN ('active', 'approved')",
(ep_key, podcast_id),
)
conn.commit()
return ep_key
# ── Stale draft cleanup ───────────────────────────────────────
def find_stale_published_drafts(conn):
"""Find drafts that are already published but still have active
revision_state. These are leftovers from before the revision
model was introduced.
Returns a list of podcast row dicts.
"""
rows = conn.execute(
"SELECT * FROM podcasts "
"WHERE published_at IS NOT NULL AND published_at != '' "
"AND (revision_state IS NULL OR revision_state = 'active')"
).fetchall()
return [dict(r) for r in rows]
def cleanup_stale_published_drafts(conn):
"""Mark already-published episodes as 'published' in revision_state,
and backfill their episode_key if missing.
Returns the number of rows cleaned up.
"""
stale = find_stale_published_drafts(conn)
count = 0
for row in stale:
ep_key = row.get("episode_key")
if not ep_key:
ep_key = normalize_episode_key(row["title"])
conn.execute(
"UPDATE podcasts SET revision_state = 'published', "
"episode_key = ? WHERE id = ?",
(ep_key, row["id"]),
)
count += 1
conn.commit()
return count
def backfill_episode_keys(conn):
"""Backfill episode_key for all rows that are missing it.
Returns the number of rows updated.
"""
rows = conn.execute(
"SELECT id, title FROM podcasts "
"WHERE episode_key IS NULL OR episode_key = ''"
).fetchall()
count = 0
for r in rows:
key = normalize_episode_key(r["title"])
if key:
conn.execute(
"UPDATE podcasts SET episode_key = ? WHERE id = ?",
(key, r["id"]),
)
count += 1
conn.commit()
return count
# ── Query helpers ──────────────────────────────────────────────
def get_active_drafts(conn):
"""Get all active (non-superseded, non-rejected) draft episodes.
Returns episodes that have not been published and are in
'active' state, one per logical episode (latest revision only).
"""
rows = conn.execute(
"SELECT * FROM podcasts "
"WHERE (published_at IS NULL OR published_at = '') "
"AND revision_state = 'active' "
"ORDER BY created_at DESC"
).fetchall()
# Deduplicate by episode_key, keeping only the latest
seen_keys = set()
result = []
for r in rows:
d = dict(r)
key = d.get("episode_key") or normalize_episode_key(d["title"])
if key in seen_keys:
continue
seen_keys.add(key)
result.append(d)
return result
def get_revision_history(conn, episode_key):
"""Get all revisions for a logical episode, ordered by revision.
Returns a list of podcast row dicts.
"""
if not episode_key:
return []
rows = conn.execute(
"SELECT * FROM podcasts WHERE episode_key = ? "
"ORDER BY revision ASC, created_at ASC",
(episode_key,),
).fetchall()
return [dict(r) for r in rows]