Skip to content

Commit 8b6527f

Browse files
authored
Merge pull request #21 from codefortulsa/change-from-s3-to-dynamoDB
feat: switch registry storage to DynamoDB via Dyntastic, add local fallback
2 parents c179a15 + 2b29c47 commit 8b6527f

File tree

8 files changed

+93
-48
lines changed

8 files changed

+93
-48
lines changed

README.md

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ python -m ipykernel install --user --name=tgov-scraper --display-name="TGOV Scra
3333
jupyter notebook
3434
```
3535

36-
### Prefect flows
36+
### Prefect workflows
37+
We use prefect to organize code into workflows of data tasks.
38+
3739
See https://docs.prefect.io/get-started
3840

3941
```bash
@@ -42,6 +44,42 @@ prefect server start # to start the persistent server
4244
python -m flows.translate_meetings # to run a specific flow
4345
```
4446

47+
#### Data "registry" for workflows
48+
The prefect workflows use a "registry" to track meetings and urls to their data artifacts.
49+
E.g., `get_new_meetings` task adds each meeting to the registry with a `video_url`, `transcribe_videos` transcribes the video and adds `transcription_url`, etc.
50+
51+
```mermaid
52+
gitGraph
53+
branch registry
54+
checkout registry
55+
commit id: " "
56+
57+
branch get_new_meetings
58+
checkout get_new_meetings
59+
commit id: "get_new_meetings()"
60+
checkout registry
61+
merge get_new_meetings id: "video_url"
62+
63+
branch transcribe_videos
64+
checkout transcribe_videos
65+
commit id: "transcribe_videos()"
66+
checkout registry
67+
merge transcribe_videos id: "transcription_url"
68+
69+
branch create_subtitled_video_pages
70+
checkout create_subtitled_video_pages
71+
commit id: "create_subtitled_video_pages()"
72+
checkout registry
73+
merge create_subtitled_video_pages id: "subtitled_video_url"
74+
75+
branch translate_transcriptions
76+
checkout translate_transcriptions
77+
commit id: "translate_transcriptions(es)"
78+
commit id: "translate_transcriptions(mya)"
79+
checkout registry
80+
merge translate_transcriptions id: "translated_transcription_urls"
81+
```
82+
4583
### Tests
4684

4785
```bash

db/create_tables.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from src.models.meeting import Meeting
2+
3+
def create_tables():
4+
print("Creating DynamoDB tables if they don't exist...")
5+
Meeting.create_table(wait=True, billing_mode="PAY_PER_REQUEST")
6+
print("All tables created!")
7+
8+
if __name__ == "__main__":
9+
create_tables()

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ aiofiles = "^24.1.0"
2727
faster-whisper = "^1.1.1"
2828
prefect = "^3.3.0"
2929
boto3 = "^1.37.24"
30+
dyntastic = "^0.18.0"
3031

3132

3233
[tool.poetry.group.dev.dependencies]

src/aws.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33
import boto3
44
from botocore.exceptions import ClientError, NoCredentialsError, PartialCredentialsError
55

6+
67
def is_aws_configured():
7-
required_vars = ['AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY']
8+
required_vars = ['AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY', 'AWS_DEFAULT_REGION']
89
return all(var in os.environ for var in required_vars)
910

1011

@@ -37,4 +38,3 @@ def upload_to_s3(file_path, bucket_name, s3_path):
3738
except (NoCredentialsError, PartialCredentialsError) as e:
3839
print(f"Failed to upload to S3: {str(e)}")
3940
return False
40-

src/local_store.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import json
2+
import os
3+
from typing import Sequence
4+
from src.models.meeting import Meeting
5+
6+
LOCAL_STORE_PATH = "data/meetings.json"
7+
8+
def read_meetings() -> Sequence[Meeting]:
9+
if not os.path.exists(LOCAL_STORE_PATH):
10+
return []
11+
12+
with open(LOCAL_STORE_PATH, 'r') as f:
13+
data = json.load(f)
14+
return [Meeting(**meeting) for meeting in data]
15+
16+
def write_meetings(meetings: Sequence[Meeting]):
17+
os.makedirs(os.path.dirname(LOCAL_STORE_PATH), exist_ok=True)
18+
with open(LOCAL_STORE_PATH, 'w') as f:
19+
json_data = [meeting.model_dump_json() for meeting in meetings]
20+
json.dump(json_data, f)

src/meetings.py

Lines changed: 16 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,14 @@
1515
from selectolax.parser import HTMLParser
1616

1717
from src.aws import is_aws_configured
18-
from src.models.utils import from_jsonl, to_jsonl
18+
from src.local_store import read_meetings, write_meetings
1919

2020
from .models.meeting import Meeting
2121

2222
BASE_URL = "https://tulsa-ok.granicus.com/ViewPublisher.php?view_id=4"
2323
TGOV_BUCKET_NAME = "tgov-meetings"
2424
MEETINGS_REGISTRY_PATH = "data/meetings.jsonl"
2525

26-
2726
async def fetch_page(url: str, session: aiohttp.ClientSession) -> str:
2827
"""
2928
Fetch the HTML content of a page.
@@ -40,11 +39,9 @@ async def fetch_page(url: str, session: aiohttp.ClientSession) -> str:
4039
raise Exception(f"Failed to fetch {url}, status code: {response.status}")
4140
return await response.text()
4241

43-
4442
def clean_date(date: str) -> str:
4543
return re.sub(r"\s+", " ", date).strip()
4644

47-
4845
async def parse_meetings(html: str) -> List[Dict[str, str]]:
4946
"""
5047
Parse the meeting data from the HTML content.
@@ -67,21 +64,18 @@ async def parse_meetings(html: str) -> List[Dict[str, str]]:
6764
# Process each table
6865
for table in tables:
6966
for row in table.css("tr.listingRow"):
70-
cells = row.css("td")
7167
name_cells = row.css('td.listItem[headers^="Name"]')
7268
meeting_name = name_cells[0].text().strip() if name_cells else "Unknown"
7369

7470
date_cells = row.css('td.listItem[headers^="Date"]')
7571
raw_date = clean_date(date_cells[0].text().strip()) if date_cells else "Unknown"
7672
meeting_date = raw_date.split("-")[0].strip() if "-" in raw_date else raw_date
7773

78-
7974
duration_cells = row.css('td.listItem[headers^="Duration"]')
8075
duration_str = duration_cells[0].text().strip() if duration_cells else "Unknown"
8176
minutes = duration_to_minutes(duration_str)
8277
meeting_duration = f"{minutes // 60}:{minutes % 60:02d}" if minutes is not None else "Unknown"
8378

84-
8579
meeting_data = {
8680
"meeting": meeting_name,
8781
"date": meeting_date,
@@ -131,7 +125,6 @@ async def parse_meetings(html: str) -> List[Dict[str, str]]:
131125

132126
return meetings
133127

134-
135128
async def get_tgov_meetings() -> Sequence[Meeting]:
136129
"""
137130
Fetch and parse meeting data from the Government Access Television website.
@@ -147,7 +140,6 @@ async def get_tgov_meetings() -> Sequence[Meeting]:
147140
meetings = [Meeting(**meeting_dict) for meeting_dict in meeting_dicts]
148141
return meetings
149142

150-
151143
def duration_to_minutes(duration):
152144
if not duration or pd.isna(duration):
153145
return None
@@ -172,43 +164,25 @@ def duration_to_minutes(duration):
172164
except:
173165
return None
174166

175-
176167
def get_registry_meetings() -> Sequence[Meeting]:
177168
if is_aws_configured():
178-
print(f'Getting registry from AWS S3 bucket: {TGOV_BUCKET_NAME}, path: {MEETINGS_REGISTRY_PATH}')
179-
import boto3
180-
from botocore.exceptions import ClientError
181-
s3 = boto3.client('s3')
182-
try:
183-
registry_response = s3.get_object(Bucket=TGOV_BUCKET_NAME, Key=MEETINGS_REGISTRY_PATH)
184-
registry_body = registry_response['Body'].read().decode('utf-8')
185-
return from_jsonl(registry_body, Meeting)
186-
except ClientError as e:
187-
if e.response['Error']['Code'] == 'NoSuchKey':
188-
print('No registry file found on S3. Returning empty list.')
189-
190-
return []
191-
169+
print(f'Getting registry from DynamoDB.')
170+
return list(Meeting.scan())
171+
else:
172+
print(f'Getting registry from local store')
173+
return read_meetings()
192174

193175
def write_registry_meetings(meetings: Sequence[Meeting]) -> Sequence[Meeting]:
194-
jsonl_str = to_jsonl(meetings)
195-
196176
if is_aws_configured():
197-
print(f'Writing registry to AWS S3 bucket: {TGOV_BUCKET_NAME}, path: {MEETINGS_REGISTRY_PATH}')
198-
import boto3
199-
from botocore.exceptions import ClientError
200-
s3 = boto3.client('s3')
201-
202-
try:
203-
s3.put_object(
204-
Bucket=TGOV_BUCKET_NAME,
205-
Key=MEETINGS_REGISTRY_PATH,
206-
Body=jsonl_str,
207-
ContentType='application/x-ndjson'
208-
)
209-
print(f'Wrote {len(meetings)} meetings to S3.')
210-
except ClientError as e:
211-
print(f"Failed to write to S3: {e}")
212-
raise
177+
print(f'Writing registry to DynamoDB.')
178+
with Meeting.batch_writer():
179+
for meeting in meetings:
180+
if meeting.clip_id:
181+
meeting.save()
182+
else:
183+
print(f'Skipping meeting with missing clip_id: {meeting}')
184+
else:
185+
print(f'Writing registry to local store')
186+
write_meetings(meetings)
213187

214188
return meetings

src/models/meeting.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,24 @@
44

55
from typing import Optional
66

7+
from dyntastic import Dyntastic
78
from pydantic import BaseModel, Field, HttpUrl
89

910

10-
class Meeting(BaseModel):
11+
class Meeting(Dyntastic):
1112
"""
1213
Model representing a government meeting
1314
"""
1415

16+
__table_name__ = "tgov-meeting"
17+
__hash_key__ = "clip_id"
18+
19+
clip_id: Optional[str] = Field(None, description="Granicus clip ID")
1520
meeting: str = Field(description="Name of the meeting")
1621
date: str = Field(description="Date and time of the meeting")
1722
duration: str = Field(description="Duration of the meeting")
1823
agenda: Optional[HttpUrl] = Field(None, description="URL to the meeting agenda")
1924
video: Optional[HttpUrl] = Field(None, description="URL to the meeting video")
20-
clip_id: Optional[str] = Field(None, description="Granicus clip ID")
2125

2226
def __str__(self) -> str:
2327
"""String representation of the meeting"""

tasks/meetings.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ async def get_new_meetings():
1010
# TODO: accept max_limit parameter
1111
tgov_meetings: Sequence[Meeting] = await get_tgov_meetings()
1212
print(f"Got {len(tgov_meetings)} tgov meetings.")
13-
tgov_clip_ids = [tm.clip_id for tm in tgov_meetings]
1413
# print(f"tgov_clip_ids: {tgov_clip_ids}")
1514

1615
registry_meetings: Sequence[Meeting] = get_registry_meetings()

0 commit comments

Comments
 (0)