Skip to content

Commit b55af4f

Browse files
committed
New NetFlow exporters are added as entities to all known accounts
1 parent 57033c8 commit b55af4f

File tree

3 files changed

+77
-1
lines changed

3 files changed

+77
-1
lines changed

dbutils.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,3 +150,7 @@ def migration_step_2():
150150
c.execute(f'CREATE INDEX {DB_PREFIX}flows_ts on {DB_PREFIX}flows (ts);')
151151

152152
c.execute(f'CREATE TABLE {DB_PREFIX}bot_jobs (job_id TEXT NOT NULL PRIMARY KEY, last_used_ts NUMERIC(16,6) NOT NULL);')
153+
154+
def migration_step_3():
155+
with get_db_cursor() as c:
156+
c.execute(f'CREATE TABLE {DB_PREFIX}exporters (ip INET NOT NULL PRIMARY KEY);')

netflowbot.py

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,55 @@ def job_maint_remove_old_partitions(*args, **kwargs):
9595
c.execute(f"DROP TABLE {tablename};")
9696
else:
9797
log.info(f"MAINT: Leaving {tablename} (today is {today_seq})")
98-
log.info("MAINT: Maintenance finished.")
98+
log.info("MAINT: Maintenance finished (removing old partitions).")
99+
100+
101+
def job_maint_suggest_entities(*args, **job_params):
102+
log.info("MAINT: Maintenance started - making suggestions for device entities")
103+
104+
backend_url = job_params['backend_url']
105+
bot_token = job_params['bot_token']
106+
requests_session = requests.Session()
107+
108+
# for each account, add any new netflow exporters (entities) that might not exist yet:
109+
# find all the accounts we have access to:
110+
r = requests_session.get(f'{backend_url}/accounts/?b={bot_token}')
111+
if r.status_code != 200:
112+
raise Exception("Invalid bot token or network error, got status {} while retrieving {}/accounts".format(r.status_code, backend_url))
113+
j = r.json()
114+
accounts_ids = [a["id"] for a in j["list"]]
115+
116+
# find all entities for each of the accounts:
117+
for account_id in accounts_ids:
118+
r = requests_session.get('{}/accounts/{}/entities/?b={}'.format(backend_url, account_id, bot_token))
119+
if r.status_code != 200:
120+
raise Exception("Network error, got status {} while retrieving {}/accounts/{}/entities".format(r.status_code, backend_url, account_id))
121+
j = r.json()
122+
entities_ips = [e["details"]["ipv4"] for e in j["list"] if e["entity_type"] == "device"]
123+
124+
with get_db_cursor() as c:
125+
# Ideally, we would just run "select distinct(client_ip) from netflow_flows;", but unfortunately
126+
# I was unable to find a performant way to run this query. So we are using netflow_exporters:
127+
c.execute(f"SELECT ip FROM {DB_PREFIX}exporters;")
128+
for client_ip, in c.fetchall():
129+
if client_ip in entities_ips:
130+
log.info(f"MAINT: We already know exporter [{client_ip}]")
131+
continue
132+
133+
log.info(f"MAINT: Unknown exporter found, inserting [{client_ip}] to account [{account_id}]")
134+
url = f'{backend_url}/accounts/{account_id}/entities/?b={bot_token}'
135+
params = {
136+
"name": f'{client_ip} (NetFlow exporter)',
137+
"entity_type": "device",
138+
"details": {
139+
"ipv4": client_ip,
140+
},
141+
}
142+
r = requests_session.post(url, json=params)
143+
if r.status_code > 299:
144+
raise Exception("Network error, got status {} while posting to {}/accounts/{}/entities: {}".format(r.status_code, backend_url, account_id, r.content))
145+
146+
log.info("MAINT: Maintenance finished (device entities suggestions).")
99147

100148

101149
class NetFlowBot(Collector):
@@ -105,6 +153,14 @@ def jobs(self):
105153
job_id = 'maint/remove_old_data'
106154
yield job_id, [3600], job_maint_remove_old_partitions, {}, 50
107155

156+
# suggest new netflow exporters / entities:
157+
job_id = f'maint/suggest_entities'
158+
job_params = {
159+
"backend_url": self.backend_url,
160+
"bot_token": self.bot_token,
161+
}
162+
yield job_id, [2*60], job_maint_suggest_entities, job_params, 10
163+
108164
# first merge together entity infos so that those entities from the same account are together:
109165
accounts_infos = defaultdict(list)
110166
for entity_info in self.fetch_job_configs('netflow'):
@@ -157,6 +213,8 @@ def perform_account_aggr_job(*args, **job_params):
157213
if last_used_ts is None:
158214
log.info(f"Counter was not yet initialized for job {job_id}, skipping.")
159215
return
216+
217+
# WATCH OUT! This hack changes all of the units from Bps to B! (should be cleaned up)
160218
#time_between = float(max_ts - last_used_ts)
161219
time_between = 1 # we want to use bytes as unit, not bytes per second
162220

netflowwriter.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ def process_named_pipe(named_pipe_filename):
8888
last_record_seqs = {}
8989
last_partition_no = None
9090
buffer = [] # we merge together writes to DB
91+
known_exporters = set()
9192
MAX_BUFFER_SIZE = 5
9293
while True:
9394
with open(named_pipe_filename, "rb") as fp:
@@ -103,6 +104,12 @@ def process_named_pipe(named_pipe_filename):
103104
client_ip, _ = client
104105
data = base64.b64decode(data_b64)
105106

107+
# if client_ip doesn't exist yet, mark it as unknown so that we can advise user to add it:
108+
if client_ip not in known_exporters:
109+
ensure_exporter(client_ip)
110+
known_exporters.add(client_ip)
111+
log.warning(f"[{client_ip}] New exporter!")
112+
106113
# sequence number of the (24h) day from UNIX epoch helps us determine the
107114
# DB partition we are working with:
108115
partition_no = int(ts // S_PER_PARTITION)
@@ -147,11 +154,18 @@ def ensure_flow_table_partition_exists(partition_no):
147154
with get_db_cursor() as c:
148155
# "When creating a range partition, the lower bound specified with FROM is an inclusive bound, whereas
149156
# the upper bound specified with TO is an exclusive bound."
157+
# PARTITION OF: "Any indexes, constraints and user-defined row-level triggers that exist in the parent
158+
# table are cloned on the new partition."
150159
# https://www.postgresql.org/docs/12/sql-createtable.html
151160
c.execute(f"CREATE UNLOGGED TABLE IF NOT EXISTS {DB_PREFIX}flows_{partition_no} PARTITION OF {DB_PREFIX}flows FOR VALUES FROM ({ts_start}) TO ({ts_end})")
152161
return partition_no
153162

154163

164+
def ensure_exporter(client_ip):
165+
with get_db_cursor() as c:
166+
c.execute(f"INSERT INTO {DB_PREFIX}exporters (ip) VALUES (%s) ON CONFLICT DO NOTHING;", (client_ip,))
167+
168+
155169
def write_buffer(buffer, partition_no):
156170
# {
157171
# "DST_AS": 0,

0 commit comments

Comments
 (0)