Skip to content
This repository was archived by the owner on Feb 8, 2018. It is now read-only.

Commit

Permalink
initial commit; needs some cleaning
Browse files Browse the repository at this point in the history
  • Loading branch information
scoates committed Sep 20, 2012
0 parents commit 35cff45
Show file tree
Hide file tree
Showing 12 changed files with 424 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
node_modules/*
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Funnel metrics into StatsD
67 changes: 67 additions & 0 deletions funnel.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
var shared = require('./plugin/shared');

var collect = function (sourcesdotdotdot) {

var fetchers = [];

// arguments is not a real array; no forEach
var len = arguments.length;
for (var i=0; i<len; i++) {
fetchers.push(arguments[i]);
}

var fixMetricName = function (name, preserveDot) {
var re = preserveDot ? /[^a-z0-9._-]/ig : /[^a-z0-9_-]/ig;
return name.replace(re, '-').replace(/-+/g, '-').toLowerCase();
};

var asMetricName = function (data, preserveDot) {
var name = ['funnel'];
name.push(data.funnel);
name.push(fixMetricName(data.nodeName));
if (data.serviceName) {
name.push(fixMetricName(data.serviceName));
}
name.push(fixMetricName(data.metricName, preserveDot));
return name.join('.');
};

var display = function () {
fetchers.forEach(function (fetcher) {
fetcher(function (data) {
console.log(asMetricName(data, data.preserveMetricNameDot), data.reading);
});
});
};

var toStatsD = function (host, port) {
port = port || 8125;
var SDC = require('statsd-client'),
sdc = new SDC({host: host, port: port, debug: true});
fetchers.forEach(function (fetcher) {
fetcher(function (data) {
sdc.gauge(asMetricName(data, data.preserveMetricNameDot), data.reading)
});
});
};

return {
toStatsD: toStatsD,
display: display
};
};

module.exports = {
collect: collect,
nagios: require('./plugin/nagios'),
mongo: require('./plugin/mongo'),
munin: require('./plugin/munin'),
json: require('./plugin/json'),
cloudwatch: require('./plugin/cloudwatch'),
dbi: require('./plugin/dbi'),

COUNT: shared.COUNT,
ALL: shared.ALL,
dbiSolo: shared.dbiSolo
}

26 changes: 26 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"name": "metrics-funnel",
"description": "Funnel metrics from various sources into StatsD",
"version": "0.1.0",
"author": "Sean Coates <[email protected]>",
"contributors": [
{ "name": "Sean Coates", "email": "[email protected]" }
],
"keywords": ["funnel","statsd","graphite","nagios","munin","json","cloudwatch","mongodb", "dbi"],
"repository": {
"type": "git",
"url": "https://github.com/fictivekin/metrics-funnel.git"
},
"bin": {
"metrics_funnel": "funnel.js"
},
"files": ["funnel.js"],
"dependencies": {
"munin-client": ">=0.1.0",
"mongodb": ">=1.1.0",
"aws-lib": ">=0.1.2",
"node-dbi": ">=0.6.1",
"pg": ">=0.7.2"
},
"engines": { "node": ">=0.8.0" }
}
37 changes: 37 additions & 0 deletions plugin/cloudwatch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
module.exports = function (service) {
var aws = require ('aws-lib');
cw = aws.createCWClient(service.from.id, service.from.secret);
var now = new Date,
start = new Date(now.getTime() - 1 * 60 * 1000); // 1 min ago

return function(funneler) {
for (var serviceName in service.services) {
var thisService = service.services[serviceName];
(function (serviceName, thisService) {
cw.call(
"GetMetricStatistics",
{
EndTime: now.toISOString(),
StartTime: start.toISOString(),
Namespace: thisService.namespace,
MetricName: thisService.metric,
'Statistics.member.1': thisService.type,
'Dimensions.member.1.Name': thisService.name,
'Dimensions.member.1.Value': thisService.value,
Unit: thisService.unit,
Period: 60,
},
function(err, result) {
funneler({
'funnel': 'cloudwatch',
'nodeName': thisService.value,
'metricName': serviceName,
'reading': result.GetMetricStatisticsResult.Datapoints.member[thisService.type]
});
}
);
})(serviceName, thisService);
}
}
};

68 changes: 68 additions & 0 deletions plugin/dbi.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
var shared = require('./shared');

module.exports = function (service) {
var pending = 0;

var DBWrapper = require('node-dbi').DBWrapper;
var DBExpr = require('node-dbi').DBExpr;

var dbWrapper = new DBWrapper(service.from.adapter, service.from);

return function(funneler) {

dbWrapper.connect();

for (var serviceName in service.services) {

(function (serviceName, thisService) {
pending++;
dbWrapper.fetchAll(thisService.query, null, function(err, result) {
pending--;
if (err) {
console.log("Error in dbi:", err);
if (0 == pending) {
dbWrapper.close(shared.nocb);
}
return;
}

var reading = thisService.callback(result);

if ("object" == typeof reading) {
if (serviceName.indexOf('%') === -1) {
throw "Can't emit multiple values if metric name does not contain %";
}
for (var k in reading) {
var val = reading[k];
var metricName = serviceName.replace('%', k);
funneler({
'funnel': 'dbi',
'nodeName': thisService.name || service.from.adapter + '-' + service.from.host,
'metricName': metricName,
'reading': val,
'preserveMetricNameDot': true
});

}

} else { // scalar
funneler({
'funnel': 'dbi',
'nodeName': thisService.name || service.from.adapter + '-' + service.from.host,
'metricName': serviceName,
'reading': reading,
});

}

if (0 == pending) {
dbWrapper.close(shared.nocb);
}
});
})(serviceName, service.services[serviceName]);

};

};
};

21 changes: 21 additions & 0 deletions plugin/json.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
var shared = require('./shared');

module.exports = function (service) {
var vm = require('vm');
return function(funneler) {
shared.fromJsonUrl(service.from, function (body, urlParts) {
for (var serviceName in service.services) {
var thisService = service.services[serviceName];
var reading = vm.runInNewContext(thisService, body);
funneler({
'funnel': 'json',
'nodeName': service.name || urlParts.host,
'metricName': serviceName,
'reading': reading
});
}
});
}

};

67 changes: 67 additions & 0 deletions plugin/mongo.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
var shared = require('./shared');

module.exports = function (service) {
var pending = 0;

var doCount = function (conn, from, funneler, collection) {
conn.collection(collection, function (err, coll) {
pending++;
coll.count(function (err, count) {
pending--;
funneler({
'funnel': 'mongo',
'nodeName': from.replace(/^mongodb:\/\//, ''),
'serviceName': collection,
'metricName': 'count',
'reading': count
});
if (pending == 0) {
conn.close();
}
});
});
};

var doQuery = function (conn, from, funneler, serviceName, collection, query) {
conn.collection(collection, function (err, coll) {
pending++;
coll.count(query, function (err, count) {
pending--;
funneler({
'funnel': 'mongo',
'nodeName': from.replace(/^mongodb:\/\//, ''),
'serviceName': serviceName,
'metricName': 'query',
'reading': count
});
if (pending == 0) {
conn.close();
}
});
});
};

return function(funneler) {
if (typeof service.from == 'string') {
service.from = [service.from];
}
var mongodb = require('mongodb');
service.from.forEach(function (from) {
mongodb.connect(from, function(err, conn) {

for (var serviceName in service.services) {
var collectionName;
if (service.services[serviceName] === shared.COUNT) {
doCount(conn, from, funneler, serviceName);
} else {
doQuery(conn, from, funneler, serviceName, service.services[serviceName].collection, service.services[serviceName].query);
}
}

});

});

};
};

42 changes: 42 additions & 0 deletions plugin/munin.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
var shared = require('./shared');

module.exports = function (service) {
var Munin = require('munin-client');

return function (funneler) {
var from = service.from;
// cast to array
if (typeof from == 'string') {
from = [from];
}
from.forEach(function (host) {
var munin = new Munin(host);
for (var sName in service.services) {
(function (serviceName) { // yum! delicious scope!
munin.fetch(serviceName, function(metrics) {
for (var metricName in metrics) {

var capture = false;
if (shared.ALL == service.services[serviceName]) {
capture = true;
} else if (service.services[serviceName].indexOf(metricName) !== -1) {
capture = true;
}
if (capture) {
funneler({
'funnel': 'munin',
'nodeName': host,
'serviceName': serviceName,
'metricName': metricName,
'reading': metrics[metricName]
});
}
}
});
})(sName);
}
munin.quit();
});
}
};

49 changes: 49 additions & 0 deletions plugin/nagios.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
var shared = require('./shared');

module.exports = function (service) {
// TODO: validation
return function(funneler) {
shared.fromJsonUrl(service.source, function (body) {
service.from.forEach(function (from) {
var node;
if (body.content[from]) {
// long name
node = body.content[from];
} else if (body.content[from.split('.')[0]]) {
// short name
node = body.content[from.split('.')[0]];
} else {
console.log("Could not find server", from);
return;
}

for (var serviceName in service.services) {
if (serviceName in node.services) {
var perfDataNames;
if (shared.ALL == service.services[serviceName]) {
perfDataNames = Object.keys(node.services[serviceName].performance_data);
} else {
perfDataNames = service.services[serviceName];
if (typeof perfDataNames == 'string') {
perfDataNames = [perfDataNames];
}
}

perfDataNames.forEach(function (perfName) {
if (undefined !== node.services[serviceName].performance_data[perfName]) {
funneler({
'funnel': 'nagios',
'nodeName': from,
'serviceName': serviceName,
'metricName': perfName,
'reading': node.services[serviceName].performance_data[perfName]
});
}
});
}
}

});
})
}
};
Loading

0 comments on commit 35cff45

Please sign in to comment.