Skip to content

Commit

Permalink
feat expose metrics for monitoring (#90)
Browse files Browse the repository at this point in the history
* feat: added StatsD for metrics collection
  • Loading branch information
ramank775 authored Oct 8, 2022
1 parent 7d4ea22 commit ce8223c
Show file tree
Hide file tree
Showing 21 changed files with 689 additions and 136 deletions.
3 changes: 3 additions & 0 deletions .gitpod.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ tasks:
command: mkdir -p /workspace/data && mongod --dbpath /workspace/data
- name: redis
command: redis-server
- name: statsd
init: chmod +x deployment/scripts/*.bash && ./deployment/scripts/setup.bash
command: ./deployment/scripts/init-statsd.bash

vscode:
extensions:
Expand Down
1 change: 1 addition & 0 deletions deployment/config/statsd/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
config
113 changes: 113 additions & 0 deletions deployment/config/statsd/config.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
Graphite Required Variable:
(Leave this unset to avoid sending stats to Graphite.
Set debug flag and leave this unset to run in 'dry' debug mode -
useful for testing statsd clients without a Graphite server.)
graphiteHost: hostname or IP of Graphite server
Optional Variables:
graphitePort: port for the graphite text collector [default: 2003]
graphitePicklePort: port for the graphite pickle collector [default: 2004]
graphiteProtocol: either 'text' or 'pickle' [default: 'text']
backends: an array of backends to load. Each backend must exist
by name in the directory backends/. If not specified,
the default graphite backend will be loaded.
* example for console and graphite:
[ "./backends/console", "./backends/graphite" ]
servers: an array of server configurations.
If not specified, the server, address,
address_ipv6, and port top-level configuration
options are used to configure a single server for
backwards-compatibility
Each server configuration supports the following keys:
server: the server to load. The server must exist by name in the directory
servers/. If not specified, the default udp server will be loaded.
* example for tcp server:
"./servers/tcp"
address: address to listen on [default: 0.0.0.0]
address_ipv6: defines if the address is an IPv4 or IPv6 address [true or false, default: false]
port: port to listen for messages on [default: 8125]
socket: (only for tcp servers) path to unix domain socket which will be used to receive
metrics [default: undefined]
socket_mod: (only for tcp servers) file mode which should be applied to unix domain socket, relevant
only if socket option is used [default: undefined]
debug: debug flag [default: false]
mgmt_address: address to run the management TCP interface on
[default: 0.0.0.0]
mgmt_port: port to run the management TCP interface on [default: 8126]
title: Allows for overriding the process title. [default: statsd]
if set to false, will not override the process title and let the OS set it.
The length of the title has to be less than or equal to the binary name + cli arguments
NOTE: This does not work on Mac's with node versions prior to v0.10
healthStatus: default health status to be returned and statsd process starts ['up' or 'down', default: 'up']
dumpMessages: log all incoming messages
flushInterval: interval (in ms) to flush metrics to each backend
percentThreshold: for time information, calculate the Nth percentile(s)
(can be a single value or list of floating-point values)
negative values mean to use "top" Nth percentile(s) values
[%, default: 90]
flush_counts: send stats_counts metrics [default: true]
keyFlush: log the most frequently sent keys [object, default: undefined]
interval: how often to log frequent keys [ms, default: 0]
percent: percentage of frequent keys to log [%, default: 100]
log: location of log file for frequent keys [default: STDOUT]
deleteIdleStats: don't send values to graphite for inactive counters, sets, gauges, or timers
as opposed to sending 0. For gauges, this unsets the gauge (instead of sending
the previous value). Can be individually overridden. [default: false]
deleteGauges: don't send values to graphite for inactive gauges, as opposed to sending the previous value [default: false]
gaugesMaxTTL: number of flush cycles to wait before the gauge is marked as inactive, to use in combination with deleteGauges [default: 1]
deleteTimers: don't send values to graphite for inactive timers, as opposed to sending 0 [default: false]
deleteSets: don't send values to graphite for inactive sets, as opposed to sending 0 [default: false]
deleteCounters: don't send values to graphite for inactive counters, as opposed to sending 0 [default: false]
prefixStats: prefix to use for the statsd statistics data for this running instance of statsd [default: statsd]
applies to both legacy and new namespacing
keyNameSanitize: sanitize all stat names on ingress [default: true]
If disabled, it is up to the backends to sanitize keynames
as appropriate per their storage requirements.
calculatedTimerMetrics: List of timer metrics that will be sent. Default will send all metrics.
To filter on percents and top percents: append '_percent' to the metric name.
Example: calculatedTimerMetrics: ['count', 'median', 'upper_percent', 'histogram']
console:
prettyprint: whether to prettyprint the console backend
output [true or false, default: true]
log: log settings [object, default: undefined]
backend: where to log: stdout or syslog [string, default: stdout]
application: name of the application for syslog [string, default: statsd]
level: log level for [node-]syslog [string, default: LOG_INFO]
graphite:
legacyNamespace: use the legacy namespace [default: true]
globalPrefix: global prefix to use for sending stats to graphite [default: "stats"]
prefixCounter: graphite prefix for counter metrics [default: "counters"]
prefixTimer: graphite prefix for timer metrics [default: "timers"]
prefixGauge: graphite prefix for gauge metrics [default: "gauges"]
prefixSet: graphite prefix for set metrics [default: "sets"]
globalSuffix: global suffix to use for sending stats to graphite [default: ""]
This is particularly useful for sending per host stats by
settings this value to: require('os').hostname().split('.')[0]
repeater: an array of hashes of the for host: and port:
that details other statsd servers to which the received
packets should be "repeated" (duplicated to).
e.g. [ { host: '10.10.10.10', port: 8125 },
{ host: 'observer', port: 88125 } ]
repeaterProtocol: whether to use udp4, udp6, or tcp for repeaters.
["udp4," "udp6", or "tcp" default: "udp4"]
histogram: for timers, an array of mappings of strings (to match metrics) and
corresponding ordered non-inclusive upper limits of bins.
For all matching metrics, histograms are maintained over
time by writing the frequencies for all bins.
'inf' means infinity. A lower limit of 0 is assumed.
default: [], meaning no histograms for any timer.
First match wins. examples:
* histogram to only track render durations, with unequal
class intervals and catchall for outliers:
[ { metric: 'render', bins: [ 0.01, 0.1, 1, 10, 'inf'] } ]
* histogram for all timers except 'foo' related,
equal class interval and catchall for outliers:
[ { metric: 'foo', bins: [] },
{ metric: '', bins: [ 50, 100, 150, 200, 'inf'] } ]
automaticConfigReload: whether to watch the config file and reload it when it
changes. The default is true. Set this to false to disable.
*/
{
port: 8125
, backends: ["./backends/console"]
}
5 changes: 5 additions & 0 deletions deployment/scripts/init-statsd.bash
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/bash

npm i --location=global statsd

statsd ./deployment/config/statsd/config
4 changes: 4 additions & 0 deletions deployment/scripts/setup.bash
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ create_discovery_service_file() {
cp deployment/config/discovery_service/services.json.example deployment/config/discovery_service/services.json
}

create_statsd_config_file() {
cp deployment/config/statsd/config.example deployment/config/statsd/config
}

create_env;
create_discovery_service_file;
create_firebase_service_file;
create_statsd_config_file;
4 changes: 4 additions & 0 deletions libs/event-args/message.js
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,10 @@ class MessageEvent extends IEventArg {
get destination() {
return this._destination;
}

get isServerAck() {
return this.type === MESSAGE_TYPE.SERVER_ACK;
}
}

module.exports = {
Expand Down
122 changes: 96 additions & 26 deletions libs/event-store/kafka.js
Original file line number Diff line number Diff line change
Expand Up @@ -314,12 +314,16 @@ class KafkaEventStore extends IEventStore {
/** @type { string[] } */
#listenerEvents

/** @type {import('../stats-client/iStatsClient').IStatsClient} */
statsClient;

constructor(context) {
super();
this.#options = context.options;
this.#logger = context.log;
this.#listenerEvents = context.listenerEvents;
this.#asyncStorage = context.asyncStorage;
this.statsClient = context.statsClient;
}

/**
Expand Down Expand Up @@ -383,31 +387,73 @@ class KafkaEventStore extends IEventStore {
this.#logger.info(`Running consumer`);
await this.#consumer.run({
partitionsConsumedConcurrently: this.#listenerEvents.length,
eachMessage: ({ topic, partition, message }) => {
const start = Date.now();
const trackId = message.headers.track_id.toString() || shortuuid();
this.#asyncStorage.run(trackId, () => {
const data = {
key: message.key ? message.key.toString() : null,
value: message.value
};
const logInfo = {
topic,
eachMessage: async ({ topic, partition, message }) => {
const start = new Date();
const key = message.key ? message.key.toString() : null;
this.statsClient.increment({
stat: 'event.receive.count',
tags: {
event: topic,
partition,
offset: message.offset,
key: data.key,
};
/** @type {import('./iEventArg').IEventArg} */
const Message = decodeMessageCb(topic)

this.#logger.info(`new data received`, logInfo);
const sConsume = Date.now();
const eventArg = Message.fromBinary(data.value);
this.on(topic, eventArg, data.key);
logInfo.latency = Date.now() - start;
logInfo.consume_latency = Date.now() - sConsume;
this.#logger.info('message consumed', logInfo);
key,
broker: 'kafka'
}
});
const trackId = message.headers.track_id.toString() || shortuuid();
try {
await this.#asyncStorage.run(trackId, async () => {
const data = {
key,
value: message.value
};
const logInfo = {
topic,
partition,
offset: message.offset,
key: data.key,
};
/** @type {import('./iEventArg').IEventArg} */
const Message = decodeMessageCb(topic)

this.#logger.info(`new data received`, logInfo);
const sProcess = new Date();
const eventArg = Message.fromBinary(data.value);
await this.on(topic, eventArg, data.key);
this.statsClient.timing({
stat: 'event.process.latency',
value: sProcess,
tags: {
event: topic,
partition,
key,
broker: 'kafka',
}
});
this.#logger.info('message consumed', logInfo);
});
} catch (e) {
this.statsClient.increment({
stat: 'event.process.error_count',
tags: {
event: topic,
partition,
key,
broker: 'kafka'
}
});
throw e;
} finally {
this.statsClient.timing({
stat: 'event.consume.latency',
value: start,
tags: {
event: topic,
partition,
key,
broker: 'kafka'
}
});
}
}
});
} catch (error) {
Expand Down Expand Up @@ -440,7 +486,7 @@ class KafkaEventStore extends IEventStore {
async emit(event, args, key) {
const trackId = this.#asyncStorage.getStore() || shortuuid();
try {
const start = Date.now();
const start = new Date();
const [response] = await this.#producer.send({
topic: event,
messages: [
Expand All @@ -454,16 +500,40 @@ class KafkaEventStore extends IEventStore {
],
acks: 1,
});
const elasped = Date.now() - start;
this.statsClient.timing({
stat: 'event.emit.latency',
value: start,
tags: {
event,
key,
broker: 'kafka',
}
});
this.statsClient.increment({
stat: 'event.emit.count',
tags: {
event,
key,
broker: 'kafka'
}
});

this.#logger.info(`Sucessfully produced message`, {
event,
partition: response.partition,
offset: response.baseOffset,
key,
produceIn: elasped
});
} catch (error) {
this.#logger.error(`Error while producing message`, { error });
this.statsClient.increment({
stat: 'event.emit.error_count',
tags: {
event,
key,
broker: 'kafka'
}
});
throw error;
}
}
Expand Down
Loading

0 comments on commit ce8223c

Please sign in to comment.