|
4 | 4 | import json
|
5 | 5 | import logging
|
6 | 6 | import os
|
| 7 | +import re |
7 | 8 | import sys
|
8 | 9 | import time
|
9 | 10 | from collections import defaultdict
|
@@ -75,9 +76,36 @@ def _save_current_max_ts(job_id, max_ts):
|
75 | 76 | c.execute(f"INSERT INTO {DB_PREFIX}bot_jobs (job_id, last_used_ts) VALUES (%s, %s) ON CONFLICT (job_id) DO UPDATE SET last_used_ts = %s;", (job_id, max_ts, max_ts))
|
76 | 77 |
|
77 | 78 |
|
| 79 | +def job_maint_remove_old_partitions(*args, **kwargs): |
| 80 | + LEAVE_N_PAST_DAYS = 5 |
| 81 | + with get_db_cursor() as c: |
| 82 | + log.info("MAINT: Maintenance started - removing old partitions") |
| 83 | + today_seq = int(time.time() // (24 * 3600)) |
| 84 | + c.execute(f"SELECT tablename FROM pg_tables WHERE schemaname = 'public' AND tablename LIKE '{DB_PREFIX}flows_%';") |
| 85 | + for tablename, in c.fetchall(): |
| 86 | + m = re.match(f'^{DB_PREFIX}flows_([0-9]+)$', tablename) |
| 87 | + if not m: |
| 88 | + log.warning(f"MAINT: Table {tablename} does not match regex, skipping") |
| 89 | + continue |
| 90 | + day_seq = int(m.group(1)) |
| 91 | + if day_seq > today_seq: |
| 92 | + log.warning(f"MAINT: CAREFUL! Table {tablename} marks a future day (today is {today_seq}); this should never happen! Skipping.") |
| 93 | + continue |
| 94 | + if day_seq < today_seq - LEAVE_N_PAST_DAYS: |
| 95 | + log.info(f"MAINT: Removing old data: {tablename} (today is {today_seq})") |
| 96 | + c.execute(f"DROP TABLE {tablename};") |
| 97 | + else: |
| 98 | + log.info(f"MAINT: Leaving {tablename} (today is {today_seq})") |
| 99 | + log.info("MAINT: Maintenance finished.") |
| 100 | + |
| 101 | + |
78 | 102 | class NetFlowBot(Collector):
|
79 | 103 |
|
80 | 104 | def jobs(self):
|
| 105 | + # remove old partitions: |
| 106 | + job_id = 'maint/remove_old_data' |
| 107 | + yield job_id, [3600], job_maint_remove_old_partitions, {}, 50 |
| 108 | + |
81 | 109 | # first merge together entity infos so that those entities from the same account are together:
|
82 | 110 | accounts_infos = defaultdict(list)
|
83 | 111 | for entity_info in self.fetch_job_configs('netflow'):
|
|
0 commit comments