Skip to content

Commit a219e5a

Browse files
committed
Use PG COPY FROM (binary) in netflow writer
1 parent a9fb8a5 commit a219e5a

File tree

2 files changed

+102
-69
lines changed

2 files changed

+102
-69
lines changed

dbutils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ def migration_step_2():
134134
# It still allows us to perform queries, but if the database crashes we lose the raw records.
135135
c.execute(f"""
136136
CREATE UNLOGGED TABLE {DB_PREFIX}flows (
137-
ts NUMERIC(16,6) NOT NULL,
137+
ts INTEGER NOT NULL,
138138
client_ip INET NOT NULL,
139139
in_bytes INTEGER NOT NULL,
140140
protocol SMALLINT NOT NULL,

netflowwriter.py

Lines changed: 101 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import base64
33
from datetime import datetime, timedelta
44
import gzip
5+
from io import BytesIO
56
import json
67
import logging
78
import os
@@ -37,8 +38,44 @@
3738
log = logging.getLogger("{}.{}".format(__name__, "writer"))
3839

3940

40-
# Amount of time to wait before dropping an undecodable ExportPacket
41-
PACKET_TIMEOUT = 60 * 60
41+
# 11-byte signature (constructed in this way to detect possible mangled bytes), flags, header extension
42+
# https://www.postgresql.org/docs/9.0/sql-copy.html#AEN59377
43+
PG_COPYFROM_INIT = struct.pack('!11sII', b'PGCOPY\n\377\r\n\0', 0, 0)
44+
# 4-byte INETv4 prefix: family, netmask, is_cidr, n bytes
45+
# https://doxygen.postgresql.org/network_8c_source.html#l00193
46+
IPV4_PREFIX = struct.pack('!BBBB', socket.AF_INET, 32, 0, 4)
47+
48+
49+
def _pgwriter_init():
50+
pg_writer = BytesIO()
51+
pg_writer.write(PG_COPYFROM_INIT)
52+
return pg_writer
53+
54+
55+
def _pgwriter_write(pgwriter, ts, client_ip, IN_BYTES, PROTOCOL, DIRECTION, L4_DST_PORT, L4_SRC_PORT, INPUT_SNMP, OUTPUT_SNMP, IPV4_DST_ADDR, IPV4_SRC_ADDR):
56+
buf = struct.pack('!HiIi4s4siIiHiHiIiIiHiHi4s4si4s4s',
57+
11, # number of columns
58+
4, int(ts), # integer - beware of Y2038 problem! :)
59+
8, IPV4_PREFIX, socket.inet_aton(client_ip), # 4 bytes prefix + 4 bytes IP
60+
4, IN_BYTES, # integer
61+
2, PROTOCOL,
62+
2, DIRECTION,
63+
4, L4_DST_PORT,
64+
4, L4_SRC_PORT,
65+
2, INPUT_SNMP,
66+
2, OUTPUT_SNMP,
67+
8, IPV4_PREFIX, IPV4_DST_ADDR,
68+
8, IPV4_PREFIX, IPV4_SRC_ADDR,
69+
)
70+
pgwriter.write(buf)
71+
72+
73+
def _pgwriter_finish(pgwriter):
74+
with get_db_cursor() as c:
75+
pgwriter.write(struct.pack('!h', -1))
76+
pgwriter.seek(0)
77+
c.copy_expert(f"COPY {DB_PREFIX}flows FROM STDIN WITH BINARY", pgwriter)
78+
4279

4380
def process_named_pipe(named_pipe_filename):
4481
try:
@@ -143,72 +180,68 @@ def write_buffer(buffer, day_seq):
143180

144181

145182
log.debug(f"Writing {len(buffer)} records to DB for day {day_seq}")
146-
with get_db_cursor() as c:
147-
# save each of the flows within the record, but use execute_values() to perform bulk insert:
148-
def _get_data(buffer):
149-
for ts, client_ip, export in buffer:
150-
netflow_version, flows = export.header.version, export.flows
151-
if netflow_version == 9:
152-
for f in flows:
153-
yield (
154-
ts,
155-
client_ip,
156-
# "IN_BYTES":
157-
f.data["IN_BYTES"],
158-
# "PROTOCOL":
159-
f.data["PROTOCOL"],
160-
# "DIRECTION":
161-
f.data["DIRECTION"],
162-
# "L4_DST_PORT":
163-
f.data["L4_DST_PORT"],
164-
# "L4_SRC_PORT":
165-
f.data["L4_SRC_PORT"],
166-
# "INPUT_SNMP":
167-
f.data["INPUT_SNMP"],
168-
# "OUTPUT_SNMP":
169-
f.data["OUTPUT_SNMP"],
170-
# "IPV4_DST_ADDR":
171-
f.data["IPV4_DST_ADDR"],
172-
# "IPV4_SRC_ADDR":
173-
f.data["IPV4_SRC_ADDR"],
174-
)
175-
elif netflow_version == 5:
176-
for f in flows:
177-
yield (
178-
ts,
179-
client_ip,
180-
# "IN_BYTES":
181-
f.data["IN_OCTETS"],
182-
# "PROTOCOL":
183-
f.data["PROTO"],
184-
# "DIRECTION":
185-
DIRECTION_INGRESS,
186-
# "L4_DST_PORT":
187-
f.data["DST_PORT"],
188-
# "L4_SRC_PORT":
189-
f.data["SRC_PORT"],
190-
# "INPUT_SNMP":
191-
f.data["INPUT"],
192-
# "OUTPUT_SNMP":
193-
f.data["OUTPUT"],
194-
# netflow v5 IP addresses are decoded to integers, which is less suitable for us - pack
195-
# them back to bytes and transform them to strings:
196-
# "IPV4_DST_ADDR":
197-
socket.inet_ntoa(struct.pack('!I', f.data["IPV4_DST_ADDR"])),
198-
# "IPV4_SRC_ADDR":
199-
socket.inet_ntoa(struct.pack('!I', f.data["IPV4_SRC_ADDR"])),
200-
)
201-
else:
202-
log.error(f"[{client_ip}] Only Netflow v5 and v9 currently supported, ignoring record (version: [{export.header.version}])")
203-
204-
data_iterator = _get_data(buffer)
205-
psycopg2.extras.execute_values(
206-
c,
207-
f"INSERT INTO {DB_PREFIX}flows_{day_seq} (ts, client_ip, IN_BYTES, PROTOCOL, DIRECTION, L4_DST_PORT, L4_SRC_PORT, INPUT_SNMP, OUTPUT_SNMP, IPV4_DST_ADDR, IPV4_SRC_ADDR) VALUES %s",
208-
data_iterator,
209-
"(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)",
210-
page_size=500
211-
)
183+
# save each of the flows within the record, but use execute_values() to perform bulk insert:
184+
def _get_data(buffer):
185+
for ts, client_ip, export in buffer:
186+
netflow_version, flows = export.header.version, export.flows
187+
if netflow_version == 9:
188+
for f in flows:
189+
yield (
190+
ts,
191+
client_ip,
192+
# "IN_BYTES":
193+
f.data["IN_BYTES"],
194+
# "PROTOCOL":
195+
f.data["PROTOCOL"],
196+
# "DIRECTION":
197+
f.data["DIRECTION"],
198+
# "L4_DST_PORT":
199+
f.data["L4_DST_PORT"],
200+
# "L4_SRC_PORT":
201+
f.data["L4_SRC_PORT"],
202+
# "INPUT_SNMP":
203+
f.data["INPUT_SNMP"],
204+
# "OUTPUT_SNMP":
205+
f.data["OUTPUT_SNMP"],
206+
# "IPV4_DST_ADDR":
207+
socket.inet_aton(f.data["IPV4_DST_ADDR"]),
208+
# "IPV4_SRC_ADDR":
209+
socket.inet_aton(f.data["IPV4_SRC_ADDR"]),
210+
)
211+
elif netflow_version == 5:
212+
for f in flows:
213+
yield (
214+
ts,
215+
client_ip,
216+
# "IN_BYTES":
217+
f.data["IN_OCTETS"],
218+
# "PROTOCOL":
219+
f.data["PROTO"],
220+
# "DIRECTION":
221+
DIRECTION_INGRESS,
222+
# "L4_DST_PORT":
223+
f.data["DST_PORT"],
224+
# "L4_SRC_PORT":
225+
f.data["SRC_PORT"],
226+
# "INPUT_SNMP":
227+
f.data["INPUT"],
228+
# "OUTPUT_SNMP":
229+
f.data["OUTPUT"],
230+
# netflow v5 IP addresses are decoded to integers, which is less suitable for us - pack
231+
# them back to bytes and transform them to strings:
232+
# "IPV4_DST_ADDR":
233+
struct.pack('!I', f.data["IPV4_DST_ADDR"]),
234+
# "IPV4_SRC_ADDR":
235+
struct.pack('!I', f.data["IPV4_SRC_ADDR"]),
236+
)
237+
else:
238+
log.error(f"[{client_ip}] Only Netflow v5 and v9 currently supported, ignoring record (version: [{export.header.version}])")
239+
240+
pgwriter = _pgwriter_init()
241+
for data in _get_data(buffer):
242+
_pgwriter_write(pgwriter, *data)
243+
_pgwriter_finish(pgwriter)
244+
212245

213246
if __name__ == "__main__":
214247
NAMED_PIPE_FILENAME = os.environ.get('NAMED_PIPE_FILENAME', None)

0 commit comments

Comments
 (0)