diff --git a/lib/influxdb.js b/lib/influxdb.js index f7a8913..d61b2c2 100644 --- a/lib/influxdb.js +++ b/lib/influxdb.js @@ -90,7 +90,10 @@ function InfluxdbBackend(startupTime, config, events) { } } - if (self.version >= 0.9) { + if (self.version >= 1.4) { + self.assembleEvent = self.assembleEvent_v14; + self.httpPOST = self.httpPOST_v14; + } else if (self.version >= 0.9) { self.assembleEvent = self.assembleEvent_v09; self.httpPOST = self.httpPOST_v09; } else { @@ -139,7 +142,73 @@ function millisecondsSince(start) { InfluxdbBackend.prototype.log = function (msg) { util.log('[influxdb] ' + msg); -} +}; + +InfluxdbBackend.prototype.logResponse = function (msg, response) { + var output = '' + msg; + + if (output === '') { + output = 'response'; + } + + output = JSON.stringify(output); + + if (response !== null && typeof response === 'object') { + // build status code line + var temp = ''; + + if (typeof response.statusCode === 'number' && response.statusCode > 0) { + temp += response.statusCode; + } + + output += ' ' + JSON.stringify(temp); + + // build status text + temp = ''; + + if (typeof response.statusText === 'string' && response.statusText !== '') { + temp += response.statusText + } + + output += ' ' + JSON.stringify(temp); + + // build method + address + temp = ''; + + if (response.request !== null && typeof response.request === 'object') { + if (typeof response.request.method === 'string' && response.request.method !== '') { + temp += response.request.method; + } + + if (typeof response.request.url === 'string' && response.request.url !== '') { + if (temp !== '') { + temp += ' '; + } + + temp += response.request.url; + } + } + + output += ' ' + JSON.stringify(temp); + + // build headers + temp = []; + + if (response.headers !== null && typeof response.headers === 'object') { + for (var key in response.headers) { + if (!response.headers.hasOwnProperty(key)) { + continue; + } + + temp.push('' + key + ' = ' + response.headers[key]); + } + } + + output += ' ' +JSON.stringify(temp.join('; ')); + } + + this.log(output); +}; InfluxdbBackend.prototype.logDebug = function (msg) { if (this.debug) { @@ -153,7 +222,7 @@ InfluxdbBackend.prototype.logDebug = function (msg) { util.log('[influxdb] (DEBUG) ' + string); } -} +}; /** * Flush strategy handler @@ -187,18 +256,57 @@ InfluxdbBackend.prototype.processFlush = function (timestamp, metrics) { if (!self.includeStatsdMetrics && key.match(statsPrefixRegexp)) { continue; } var value = counters[key], - k = key + '.counter'; + k = key; if (value) { - points.push(self.assembleEvent(k, [{value: value, time: timestamp}])); + points.push(self.assembleEvent(k, [{value: value, time: timestamp,type:"counter"}])); } } for (set in sets) { - sets[set].map(function (v) { - points.push(self.assembleEvent(set, [{value: v, time: timestamp}])); - }) - points.push(self.assembleEvent(set + "_count", [{value: sets[set].length, time: timestamp}])); + // influxdb only allows one type per measurement per shard - check for any non-int values + var nonIntInSet = false; + for (var i = 0; i < sets[set].length; i++) { + var setValueStr = '' + sets[set][i]; // take the string value for checking... + var setValueInt = parseInt(setValueStr); // convert to int (well... float...) + // and check it's sane + if (!isFinite(setValueInt) || isNaN(setValueInt)) { + nonIntInSet = true; + break; + } + // convert setValueInt back to a string + setValueInt = '' + setValueInt; + // compare with the original, which is difficult because javascript numbers == float :( + if (setValueInt.length !== setValueStr.length) { + nonIntInSet = true; + break; + } + for (var j = 0; j < setValueStr.length; j++) { + var setValueStrChar = setValueStr.charAt(j); + // we only check the first 5 chars for exact match, after that we just check if numeric + if (j >= 5) { + if (setValueStrChar < '0' || setValueStrChar > '9') { + nonIntInSet = true; + break; + } + } else { + if (setValueInt.charAt(j) !== setValueStrChar) { + nonIntInSet = true; + break; + } + } + } + if (nonIntInSet) { + break; + } + } + if (!nonIntInSet) { // only add the original set values if they are all ints + sets[set].map(function (v) { + points.push(self.assembleEvent(set, [{value: v, time: timestamp,type:"set"}])); + }); + } + // add the actual count + points.push(self.assembleEvent(set , [{value: sets[set].length, time: timestamp,type:"set_count"}])); } for (key in gauges) { @@ -206,10 +314,10 @@ InfluxdbBackend.prototype.processFlush = function (timestamp, metrics) { if (!self.includeStatsdMetrics && key.match(statsPrefixRegexp)) { continue; } var value = gauges[key], - k = key + '.gauge'; + k = key ; if (!isNaN(parseFloat(value)) && isFinite(value)) { - points.push(self.assembleEvent(k, [{value: value, time: timestamp}])); + points.push(self.assembleEvent(k, [{value: value, time: timestamp,type:"gauge" }])); } } @@ -223,9 +331,9 @@ InfluxdbBackend.prototype.processFlush = function (timestamp, metrics) { for (histoKey in histoMetrics) { var value = histoMetrics[histoKey], - k = key + '.timer.histogram.' + histoKey; + k = key; - points.push(self.assembleEvent(k, [{value: value, time: timestamp}])); + points.push(self.assembleEvent(k, [{value: value, time: timestamp,type:'timer.histogram.'+timer.histogram,}])); } // Delete here so it isn't iterated over later: @@ -235,9 +343,9 @@ InfluxdbBackend.prototype.processFlush = function (timestamp, metrics) { // Iterate over normal metrics: for (timerKey in timerMetrics) { var value = timerMetrics[timerKey], - k = key + '.timer' + '.' + timerKey; + k = key; - points.push(self.assembleEvent(k, [{value: value, time: timestamp}])); + points.push(self.assembleEvent(k, [{value: value, time: timestamp,type:'timer' + '.' + timerKey}])); } } @@ -252,10 +360,10 @@ InfluxdbBackend.prototype.processFlush = function (timestamp, metrics) { for (key in statsdMetrics) { var value = statsdMetrics[key], - k = self.prefixStats + '.' + key; + k = self.prefixStats; if (!isNaN(parseFloat(value)) && isFinite(value)) { - points.push(self.assembleEvent(k, [{value: value, time: timestamp}])); + points.push(self.assembleEvent(k, [{value: value, time: timestamp,type:key}])); } } } @@ -368,7 +476,7 @@ InfluxdbBackend.prototype.clearRegistry = function () { InfluxdbBackend.prototype.assembleEvent_v08 = function (name, events) { var self = this; - + name = name + events[0].type; var payload = { name: name, columns: Object.keys(events[0]), @@ -393,7 +501,7 @@ InfluxdbBackend.prototype.assembleEvent_v08 = function (name, events) { InfluxdbBackend.prototype.assembleEvent_v09 = function (name, events) { var self = this; - + name = name + events[0].type; var payload = { measurement: name, fields: { value: events[0]['value'] } @@ -402,6 +510,17 @@ InfluxdbBackend.prototype.assembleEvent_v09 = function (name, events) { return payload; } +InfluxdbBackend.prototype.assembleEvent_v14 = function (name, events) { + var self = this; + + var payload = { + measurement: name, + fields: { value: events[0]['value'],time: events[0]['time'],type: events[0]['type']} + } + + return payload; +} + InfluxdbBackend.prototype.httpPOST_v08 = function (points) { /* Do not send if there are no points. */ if (!points.length) { return; } @@ -516,6 +635,68 @@ InfluxdbBackend.prototype.httpPOST_v09 = function (points) { req.end(); } +InfluxdbBackend.prototype.httpPOST_v14 = function (points) { + /* Do not send if there are no points. */ + if (!points.length) { return; } + + var self = this, + query = {u: self.user, p: self.pass,db:self.database}, + protocolName = self.protocol == http ? 'HTTP' : 'HTTPS', + startTime; + + self.logDebug(function () { + return 'Sending ' + points.length + ' different points via ' + protocolName; + }); + + self.influxdbStats.numStats = points.length; + + var options = { + hostname: self.host, + port: self.port, + path: '/write?' + querystring.stringify(query), + method: 'POST', + agent: false // Is it okay to use "undefined" here? (keep-alive) + }; + + var req = self.protocol.request(options); + self.logDebug(JSON.stringify(options)); + req.on('socket', function (res) { + startTime = process.hrtime(); + }); + + req.on('response', function (res) { + var status = res.statusCode; + + self.influxdbStats.httpResponseTime = millisecondsSince(startTime); + + if (status >= 400) { + self.logResponse(protocolName + ' Error', res); + } + }); + + req.on('error', function (e, i) { + self.log(e); + }); + + var arr = []; + for(var k in points){ + var o = points[k]; + arr.push(o.measurement+",metric_type="+o.fields.type+" value="+o.fields.value+" "+o.fields.time*1000000); + } + + var payload = arr.join("\n"); + + self.influxdbStats.payloadSize = Buffer.byteLength(payload); + + self.logDebug(function () { + var size = (self.influxdbStats.payloadSize / 1024).toFixed(2); + return 'Payload size ' + size + ' KB'; + }); + self.logDebug(payload); + req.write(payload); + req.end(); +} + InfluxdbBackend.prototype.configCheck = function () { var self = this, success = true;