Skip to content

support influxdb version 1.4 #41

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
219 changes: 200 additions & 19 deletions lib/influxdb.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ function InfluxdbBackend(startupTime, config, events) {
}
}

if (self.version >= 0.9) {
if (self.version >= 1.4) {
self.assembleEvent = self.assembleEvent_v14;
self.httpPOST = self.httpPOST_v14;
} else if (self.version >= 0.9) {
self.assembleEvent = self.assembleEvent_v09;
self.httpPOST = self.httpPOST_v09;
} else {
Expand Down Expand Up @@ -139,7 +142,73 @@ function millisecondsSince(start) {

InfluxdbBackend.prototype.log = function (msg) {
util.log('[influxdb] ' + msg);
}
};

InfluxdbBackend.prototype.logResponse = function (msg, response) {
var output = '' + msg;

if (output === '') {
output = 'response';
}

output = JSON.stringify(output);

if (response !== null && typeof response === 'object') {
// build status code line
var temp = '';

if (typeof response.statusCode === 'number' && response.statusCode > 0) {
temp += response.statusCode;
}

output += ' ' + JSON.stringify(temp);

// build status text
temp = '';

if (typeof response.statusText === 'string' && response.statusText !== '') {
temp += response.statusText
}

output += ' ' + JSON.stringify(temp);

// build method + address
temp = '';

if (response.request !== null && typeof response.request === 'object') {
if (typeof response.request.method === 'string' && response.request.method !== '') {
temp += response.request.method;
}

if (typeof response.request.url === 'string' && response.request.url !== '') {
if (temp !== '') {
temp += ' ';
}

temp += response.request.url;
}
}

output += ' ' + JSON.stringify(temp);

// build headers
temp = [];

if (response.headers !== null && typeof response.headers === 'object') {
for (var key in response.headers) {
if (!response.headers.hasOwnProperty(key)) {
continue;
}

temp.push('' + key + ' = ' + response.headers[key]);
}
}

output += ' ' +JSON.stringify(temp.join('; '));
}

this.log(output);
};

InfluxdbBackend.prototype.logDebug = function (msg) {
if (this.debug) {
Expand All @@ -153,7 +222,7 @@ InfluxdbBackend.prototype.logDebug = function (msg) {

util.log('[influxdb] (DEBUG) ' + string);
}
}
};

/**
* Flush strategy handler
Expand Down Expand Up @@ -187,29 +256,68 @@ InfluxdbBackend.prototype.processFlush = function (timestamp, metrics) {
if (!self.includeStatsdMetrics && key.match(statsPrefixRegexp)) { continue; }

var value = counters[key],
k = key + '.counter';
k = key;

if (value) {
points.push(self.assembleEvent(k, [{value: value, time: timestamp}]));
points.push(self.assembleEvent(k, [{value: value, time: timestamp,type:"counter"}]));
}
}

for (set in sets) {
sets[set].map(function (v) {
points.push(self.assembleEvent(set, [{value: v, time: timestamp}]));
})
points.push(self.assembleEvent(set + "_count", [{value: sets[set].length, time: timestamp}]));
// influxdb only allows one type per measurement per shard - check for any non-int values
var nonIntInSet = false;
for (var i = 0; i < sets[set].length; i++) {
var setValueStr = '' + sets[set][i]; // take the string value for checking...
var setValueInt = parseInt(setValueStr); // convert to int (well... float...)
// and check it's sane
if (!isFinite(setValueInt) || isNaN(setValueInt)) {
nonIntInSet = true;
break;
}
// convert setValueInt back to a string
setValueInt = '' + setValueInt;
// compare with the original, which is difficult because javascript numbers == float :(
if (setValueInt.length !== setValueStr.length) {
nonIntInSet = true;
break;
}
for (var j = 0; j < setValueStr.length; j++) {
var setValueStrChar = setValueStr.charAt(j);
// we only check the first 5 chars for exact match, after that we just check if numeric
if (j >= 5) {
if (setValueStrChar < '0' || setValueStrChar > '9') {
nonIntInSet = true;
break;
}
} else {
if (setValueInt.charAt(j) !== setValueStrChar) {
nonIntInSet = true;
break;
}
}
}
if (nonIntInSet) {
break;
}
}
if (!nonIntInSet) { // only add the original set values if they are all ints
sets[set].map(function (v) {
points.push(self.assembleEvent(set, [{value: v, time: timestamp,type:"set"}]));
});
}
// add the actual count
points.push(self.assembleEvent(set , [{value: sets[set].length, time: timestamp,type:"set_count"}]));
}

for (key in gauges) {
/* Do not include statsd gauges. */
if (!self.includeStatsdMetrics && key.match(statsPrefixRegexp)) { continue; }

var value = gauges[key],
k = key + '.gauge';
k = key ;

if (!isNaN(parseFloat(value)) && isFinite(value)) {
points.push(self.assembleEvent(k, [{value: value, time: timestamp}]));
points.push(self.assembleEvent(k, [{value: value, time: timestamp,type:"gauge" }]));
}
}

Expand All @@ -223,9 +331,9 @@ InfluxdbBackend.prototype.processFlush = function (timestamp, metrics) {

for (histoKey in histoMetrics) {
var value = histoMetrics[histoKey],
k = key + '.timer.histogram.' + histoKey;
k = key;

points.push(self.assembleEvent(k, [{value: value, time: timestamp}]));
points.push(self.assembleEvent(k, [{value: value, time: timestamp,type:'timer.histogram.'+timer.histogram,}]));
}

// Delete here so it isn't iterated over later:
Expand All @@ -235,9 +343,9 @@ InfluxdbBackend.prototype.processFlush = function (timestamp, metrics) {
// Iterate over normal metrics:
for (timerKey in timerMetrics) {
var value = timerMetrics[timerKey],
k = key + '.timer' + '.' + timerKey;
k = key;

points.push(self.assembleEvent(k, [{value: value, time: timestamp}]));
points.push(self.assembleEvent(k, [{value: value, time: timestamp,type:'timer' + '.' + timerKey}]));
}
}

Expand All @@ -252,10 +360,10 @@ InfluxdbBackend.prototype.processFlush = function (timestamp, metrics) {

for (key in statsdMetrics) {
var value = statsdMetrics[key],
k = self.prefixStats + '.' + key;
k = self.prefixStats;

if (!isNaN(parseFloat(value)) && isFinite(value)) {
points.push(self.assembleEvent(k, [{value: value, time: timestamp}]));
points.push(self.assembleEvent(k, [{value: value, time: timestamp,type:key}]));
}
}
}
Expand Down Expand Up @@ -368,7 +476,7 @@ InfluxdbBackend.prototype.clearRegistry = function () {

InfluxdbBackend.prototype.assembleEvent_v08 = function (name, events) {
var self = this;

name = name + events[0].type;
var payload = {
name: name,
columns: Object.keys(events[0]),
Expand All @@ -393,7 +501,7 @@ InfluxdbBackend.prototype.assembleEvent_v08 = function (name, events) {

InfluxdbBackend.prototype.assembleEvent_v09 = function (name, events) {
var self = this;

name = name + events[0].type;
var payload = {
measurement: name,
fields: { value: events[0]['value'] }
Expand All @@ -402,6 +510,17 @@ InfluxdbBackend.prototype.assembleEvent_v09 = function (name, events) {
return payload;
}

InfluxdbBackend.prototype.assembleEvent_v14 = function (name, events) {
var self = this;

var payload = {
measurement: name,
fields: { value: events[0]['value'],time: events[0]['time'],type: events[0]['type']}
}

return payload;
}

InfluxdbBackend.prototype.httpPOST_v08 = function (points) {
/* Do not send if there are no points. */
if (!points.length) { return; }
Expand Down Expand Up @@ -516,6 +635,68 @@ InfluxdbBackend.prototype.httpPOST_v09 = function (points) {
req.end();
}

InfluxdbBackend.prototype.httpPOST_v14 = function (points) {
/* Do not send if there are no points. */
if (!points.length) { return; }

var self = this,
query = {u: self.user, p: self.pass,db:self.database},
protocolName = self.protocol == http ? 'HTTP' : 'HTTPS',
startTime;

self.logDebug(function () {
return 'Sending ' + points.length + ' different points via ' + protocolName;
});

self.influxdbStats.numStats = points.length;

var options = {
hostname: self.host,
port: self.port,
path: '/write?' + querystring.stringify(query),
method: 'POST',
agent: false // Is it okay to use "undefined" here? (keep-alive)
};

var req = self.protocol.request(options);
self.logDebug(JSON.stringify(options));
req.on('socket', function (res) {
startTime = process.hrtime();
});

req.on('response', function (res) {
var status = res.statusCode;

self.influxdbStats.httpResponseTime = millisecondsSince(startTime);

if (status >= 400) {
self.logResponse(protocolName + ' Error', res);
}
});

req.on('error', function (e, i) {
self.log(e);
});

var arr = [];
for(var k in points){
var o = points[k];
arr.push(o.measurement+",metric_type="+o.fields.type+" value="+o.fields.value+" "+o.fields.time*1000000);
}

var payload = arr.join("\n");

self.influxdbStats.payloadSize = Buffer.byteLength(payload);

self.logDebug(function () {
var size = (self.influxdbStats.payloadSize / 1024).toFixed(2);
return 'Payload size ' + size + ' KB';
});
self.logDebug(payload);
req.write(payload);
req.end();
}

InfluxdbBackend.prototype.configCheck = function () {
var self = this,
success = true;
Expand Down