Skip to content

Commit 3c018bf

Browse files
committed
Create elasticapture.js
1 parent 27ab04f commit 3c018bf

File tree

1 file changed

+355
-0
lines changed

1 file changed

+355
-0
lines changed

elasticapture.js

+355
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,355 @@
1+
/*
2+
* NODEJS Captagent w/ HEP3 Support via HEP-js module
3+
* (C) 2015 L. Mangani, QXIP BV
4+
*
5+
* This program is free software; you can redistribute it and/or modify
6+
* it under the terms of the GNU General Public License as published by
7+
* the Free Software Foundation; either version 3 of the License, or
8+
* (at your option) any later version.
9+
*
10+
* This program is distributed in the hope that it will be useful,
11+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
* GNU General Public License for more details.
14+
*
15+
* You should have received a copy of the GNU General Public License
16+
* along with this program; if not, write to the Free Software Foundation,
17+
* Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18+
*
19+
20+
nodejs elasticapture.js -debug true -ES 'https://test.facetflow.io:443' -t 15
21+
22+
Daemonize using forever:
23+
24+
npm install forever -g
25+
forever start elasticapture.js
26+
27+
*/
28+
29+
var version = 'v0.3';
30+
var debug = false;
31+
var sipdebug = false;
32+
var stats = { rcvd: 0, parsed: 0, hepsent: 0, err: 0, heperr: 0 };
33+
var counts = { task: 0, batch: 0, drain: 0, pkts: 0 };
34+
35+
/********* HELP MENU *********/
36+
37+
if(process.argv.indexOf("-h") != -1){
38+
console.log('Elasticapture is an HEP3 Capture Agent implementation for HOMER / SIPCAPTURE');
39+
console.log('For more information please visit: http://sipcapture.org ');
40+
console.log('Usage:');
41+
console.log();
42+
console.log(' -r: BPF Capture filter (ie: port 5060)');
43+
console.log();
44+
console.log(' -s: HEP3 Collector IP');
45+
console.log(' -p: HEP3 Collector Port');
46+
console.log(' -i: HEP3 Agent ID');
47+
console.log(' -P: HEP3 Password');
48+
console.log();
49+
console.log(' -ES: ES _Bulk API IP (ie: 127.0.0.1) ');
50+
console.log(' -EP: ES _Bulk API Port (ie: 443) ');
51+
console.log(' -EI: ES _Bulk API Index (ie: captagent)');
52+
console.log(' -ET: ES _Bulk API Type (ie: captagent)');
53+
console.log(' -EU: ES _Bulk API Auth (ie: user:pass)');
54+
console.log(' -t: ES _Bulk Frequency (in seconds)');
55+
console.log();
56+
console.log(' -debug: Debug Internals (ie: -debug true)');
57+
console.log(' CRTL-C: Exit');
58+
console.log();
59+
process.exit();
60+
}
61+
62+
63+
/******** Settings Section ********/
64+
65+
// CAPTURE ARGS & DEFAULTS
66+
var bpf_filter = 'port 5060';
67+
if(process.argv.indexOf("-r") != -1){
68+
bpf_filter = process.argv[process.argv.indexOf("-r") + 1];
69+
}
70+
if(process.argv.indexOf("-debug") != -1){
71+
debug = process.argv[process.argv.indexOf("-debug") + 1];
72+
}
73+
// HEP ARGS & DEFAULTS
74+
var hep_server = 'localhost';
75+
if(process.argv.indexOf("-s") != -1){
76+
hep_server = process.argv[process.argv.indexOf("-s") + 1];
77+
}
78+
var hep_port = 9063;
79+
if(process.argv.indexOf("-p") != -1){
80+
hep_port = process.argv[process.argv.indexOf("-p") + 1];
81+
}
82+
var hep_id = '2001';
83+
if(process.argv.indexOf("-i") != -1){
84+
hep_id = process.argv[process.argv.indexOf("-i") + 1];
85+
}
86+
var hep_pass = 'myHep6';
87+
if(process.argv.indexOf("-P") != -1){
88+
hep_pass = process.argv[process.argv.indexOf("-P") + 1];
89+
}
90+
// ES ARGS & DEFAULTS (experimental, HTTPS default)
91+
var es_on = false;
92+
var es_url = 'http://127.0.0.1:9200';
93+
var es_user = '';
94+
95+
if(process.argv.indexOf("-ES") != -1){
96+
es_url = process.argv[process.argv.indexOf("-ES") + 1];
97+
es_on = true;
98+
}
99+
var es_index = 'captagent';
100+
if(process.argv.indexOf("-EI") != -1){
101+
es_index = process.argv[process.argv.indexOf("-EI") + 1];
102+
}
103+
var es_type = 'captagent';
104+
if(process.argv.indexOf("-ET") != -1){
105+
es_type = process.argv[process.argv.indexOf("-ET") + 1];
106+
}
107+
if(process.argv.indexOf("-EU") != -1){
108+
es_user = process.argv[process.argv.indexOf("-EU") + 1];
109+
}
110+
var es_timeout = 30;
111+
if(process.argv.indexOf("-t") != -1){
112+
es_timeout = parseInt(process.argv[process.argv.indexOf("-t") + 1]);
113+
}
114+
var es_interval = es_timeout * 1000;
115+
116+
117+
console.log('Starting JSAgent '+version);
118+
119+
/********* NODE.JS Requirements ******/
120+
121+
var SIP = require('sipcore');
122+
var Cap = require('cap').Cap,
123+
decoders = require('cap').decoders,
124+
PROTOCOL = decoders.PROTOCOL;
125+
126+
var HEPjs = require('hep-js');
127+
128+
/*********** Elastic Queue ***********/
129+
if (es_on) {
130+
131+
if (es_user.length > 1) { es_url = es_url.replace('://', '://'+es_user+'@'); }
132+
133+
var ElasticQueue, Queue;
134+
var ElasticQueue = require('elastic-queue');
135+
136+
Queue = new ElasticQueue({
137+
elasticsearch: { client: { host: es_url } },
138+
batchSize: 50,
139+
commitTimeout: 1000,
140+
rateLimit: 1000
141+
});
142+
Queue.on('task', function(batch) {
143+
counts.task++;
144+
return;
145+
});
146+
Queue.on('batchComplete', function(resp) {
147+
counts.batch++;
148+
return;
149+
// return console.log("batch complete");
150+
});
151+
Queue.on('drain', function() {
152+
counts.drain++;
153+
return;
154+
// console.log("\n\nQueue is Empty\n\n");
155+
// Queue.close();
156+
// return process.exit();
157+
});
158+
159+
}
160+
161+
162+
/*********** HEP OUT SOCKET ************/
163+
164+
var dgram = require('dgram');
165+
var socket = dgram.createSocket("udp4");
166+
167+
/*********** CAPTURE SOCKET ************/
168+
169+
var c = new Cap(),
170+
device = Cap.findDevice(),
171+
filter = bpf_filter,
172+
bufSize = 10 * 1024 * 1024,
173+
buffer = new Buffer(65535);
174+
175+
/************** APP START **************/
176+
177+
console.log('Capturing from device '+device+ ' with BPF ('+bpf_filter+')');
178+
console.log('Sending HEP3 Packets to '+hep_server+':'+hep_port+' with id '+hep_id);
179+
if (es_on) console.log('Sending JSON Packets to '+es_url+' _Bulk API with type '+es_type);
180+
181+
var linkType = c.open(device, filter, bufSize, buffer);
182+
183+
c.setMinBytes && c.setMinBytes(0);
184+
185+
c.on('packet', function(nbytes, trunc) {
186+
if (debug) console.log('packet: length ' + nbytes + ' bytes, truncated? '
187+
+ (trunc ? 'yes' : 'no'));
188+
189+
stats.rcvd++;
190+
var hep_proto = { "type": "HEP", "version": 3, "payload_type": "SIP", "captureId": hep_id, "capturePass": hep_pass, "ip_family": 2};
191+
192+
if (linkType === 'ETHERNET') {
193+
var ret = decoders.Ethernet(buffer);
194+
195+
var datenow = new Date().getTime();
196+
hep_proto.time_sec = Math.floor(datenow / 1000);
197+
hep_proto.time_usec = datenow - (hep_proto.time_sec*1000);
198+
199+
if (ret.info.type === PROTOCOL.ETHERNET.IPV4) {
200+
if (debug) console.log('Decoding IPv4 ...');
201+
202+
ret = decoders.IPV4(buffer, ret.offset);
203+
if (debug) console.log('from: ' + ret.info.srcaddr + ' to ' + ret.info.dstaddr);
204+
205+
if (ret.info.protocol === PROTOCOL.IP.TCP) {
206+
/* TCP DECODE */
207+
var datalen = ret.info.totallen - ret.hdrlen;
208+
if (debug) console.log('Decoding TCP ...');
209+
210+
var tcpret = decoders.TCP(buffer, ret.offset);
211+
if (debug) console.log(' TCP from: ' + ret.info.srcip + ':' + tcpret.info.srcport + ' to: ' + ret.info.dstaddr + ':' + tcpret.info.dstport);
212+
datalen -= tcpret.hdrlen;
213+
// if (debug) console.log(buffer.toString('binary', tcpret.offset, tcpret.offset + datalen));
214+
var msg = buffer.toString('binary', tcpret.offset, tcpret.offset + datalen);
215+
216+
// Build HEP3
217+
hep_proto.ip_family = 2;
218+
hep_proto.protocol = 6;
219+
hep_proto.proto_type = 1;
220+
hep_proto.srcIp = ret.info.srcaddr;
221+
hep_proto.dstIp = ret.info.dstaddr;
222+
hep_proto.srcPort = tcpret.info.srcport;
223+
hep_proto.dstPort = tcpret.info.dstport;
224+
225+
// Ship to parser
226+
parseSIP(msg, hep_proto);
227+
228+
} else if (ret.info.protocol === PROTOCOL.IP.UDP) {
229+
/* UDP DECODE */
230+
if (debug) console.log('Decoding UDP ...');
231+
var udpret = decoders.UDP(buffer, ret.offset);
232+
if (debug) console.log(' UDP from: ' + ret.info.srcaddr + ':' + udpret.info.srcport + ' to: ' + ret.info.dstaddr+ ':' + udpret.info.dstport);
233+
// if (debug) console.log(buffer.toString('binary', udpret.offset, udpret.offset + udpret.info.length));
234+
var msg = buffer.toString('binary', udpret.offset, udpret.offset + udpret.info.length);
235+
236+
// Build HEP3
237+
hep_proto.ip_family = 2;
238+
hep_proto.protocol = 17;
239+
hep_proto.proto_type = 1;
240+
hep_proto.srcIp = ret.info.srcaddr;
241+
hep_proto.dstIp = ret.info.dstaddr;
242+
hep_proto.srcPort = udpret.info.srcport;
243+
hep_proto.dstPort = udpret.info.dstport;
244+
245+
// Ship to parser
246+
parseSIP(msg, hep_proto);
247+
248+
} else
249+
if (debug) console.log('Unsupported IPv4 protocol: ' + PROTOCOL.IP[ret.info.protocol]);
250+
stats.err++;
251+
} else
252+
if (debug) console.log('Unsupported Ethertype: ' + PROTOCOL.ETHERNET[ret.info.type]);
253+
stats.err++;
254+
}
255+
});
256+
257+
258+
/* SIP Parsing */
259+
260+
var parseSIP = function(msg, rcinfo){
261+
try {
262+
var sipmsg = SIP.parse(msg);
263+
if (sipdebug) console.log(sipmsg);
264+
if (debug) console.log('CSeq: '+sipmsg.headers.cseq);
265+
stats.parsed++;
266+
// SEND HEP3 Packet
267+
sendHEP3(sipmsg,msg, rcinfo);
268+
269+
if (es_on) {
270+
// PARSE USERS/URI for Elasticsearch Indexing
271+
sipmsg.headers["from_uri"] = sipmsg.headers.from.match(/^(<sip)(.*)>/)[0];
272+
sipmsg.headers["to_uri"] = sipmsg.headers.to.match(/^(<sip)(.*)>/)[0];
273+
sipmsg.headers["from_user"] = sipmsg.headers.from.match(/<sip:(.*?)@/)[1] ;
274+
sipmsg.headers["to_user"] = sipmsg.headers.to.match(/<sip:(.*?)@/)[1] ;
275+
// SESSION METHOD
276+
sipmsg.headers["sess_method"] = sipmsg.headers.cseq.replace(/[^A-Za-z\s!?]/g,'');
277+
// INJECT NETWORK/HEP Headers
278+
sipmsg['hep'] = rcinfo;
279+
280+
bufferSIP(sipmsg);
281+
}
282+
}
283+
catch (e) {
284+
if (debug) console.log(e);
285+
var sipmsg = false;
286+
stats.err++;
287+
}
288+
}
289+
290+
291+
/* HEP3 Socket OUT */
292+
293+
var sendHEP3 = function(sipmsg,msg, rcinfo){
294+
if (sipmsg) {
295+
try {
296+
if (debug) console.log('Sending HEP3 Packet...');
297+
var hep_message = HEPjs.encapsulate(msg,rcinfo);
298+
if (hep_message) {
299+
socket = getSocket('udp4');
300+
socket.send(hep_message, 0, hep_message.length, hep_port, hep_server, function(err) {
301+
stats.hepsent++;
302+
});
303+
}
304+
}
305+
catch (e) {
306+
console.log('HEP3 Error sending!');
307+
console.log(e);
308+
stats.heperr++;
309+
}
310+
}
311+
}
312+
313+
314+
315+
/* JSON _Bulk Buffer */
316+
317+
var bufferSIP = function(data){
318+
if (debug) console.log('Queuing SIP packet....');
319+
var now = new Date().toISOString().substring(0, 10).replace(/-/g,'.');
320+
data["@timestamp"] = new Date().toISOString().slice(0, 19) + 'Z';
321+
var doc = {
322+
index: es_index,
323+
type: es_type,
324+
body: JSON.stringify(data)
325+
};
326+
327+
Queue.push(doc, function(err, resp) {
328+
if (err) {
329+
if (debug) console.log(err);
330+
}
331+
if (debug) console.log(resp);
332+
});
333+
}
334+
335+
336+
/* Stats & Kill Thread */
337+
338+
var exit = false;
339+
340+
process.on('SIGINT', function() {
341+
console.log();
342+
console.log('Stats:',counts);
343+
if (exit) {
344+
console.log("Exiting...");
345+
process.exit();
346+
} else {
347+
console.log('Statistics:', stats);
348+
console.log("Press CTRL-C within 2 seconds to Exit...");
349+
exit = true;
350+
setTimeout(function () {
351+
// console.log("Continuing...");
352+
exit = false;
353+
}, 2000)
354+
}
355+
});

0 commit comments

Comments
 (0)