diff --git a/.gitpod.yml b/.gitpod.yml index e6af663..b896d6b 100644 --- a/.gitpod.yml +++ b/.gitpod.yml @@ -21,6 +21,9 @@ tasks: command: mkdir -p /workspace/data && mongod --dbpath /workspace/data - name: redis command: redis-server + - name: statsd + init: chmod +x deployment/scripts/*.bash && ./deployment/scripts/setup.bash + command: ./deployment/scripts/init-statsd.bash vscode: extensions: diff --git a/deployment/config/statsd/.gitignore b/deployment/config/statsd/.gitignore new file mode 100644 index 0000000..04204c7 --- /dev/null +++ b/deployment/config/statsd/.gitignore @@ -0,0 +1 @@ +config diff --git a/deployment/config/statsd/config.example b/deployment/config/statsd/config.example new file mode 100644 index 0000000..365c80b --- /dev/null +++ b/deployment/config/statsd/config.example @@ -0,0 +1,113 @@ +/* +Graphite Required Variable: +(Leave this unset to avoid sending stats to Graphite. + Set debug flag and leave this unset to run in 'dry' debug mode - + useful for testing statsd clients without a Graphite server.) + graphiteHost: hostname or IP of Graphite server +Optional Variables: + graphitePort: port for the graphite text collector [default: 2003] + graphitePicklePort: port for the graphite pickle collector [default: 2004] + graphiteProtocol: either 'text' or 'pickle' [default: 'text'] + backends: an array of backends to load. Each backend must exist + by name in the directory backends/. If not specified, + the default graphite backend will be loaded. + * example for console and graphite: + [ "./backends/console", "./backends/graphite" ] + servers: an array of server configurations. + If not specified, the server, address, + address_ipv6, and port top-level configuration + options are used to configure a single server for + backwards-compatibility + Each server configuration supports the following keys: + server: the server to load. The server must exist by name in the directory + servers/. If not specified, the default udp server will be loaded. + * example for tcp server: + "./servers/tcp" + address: address to listen on [default: 0.0.0.0] + address_ipv6: defines if the address is an IPv4 or IPv6 address [true or false, default: false] + port: port to listen for messages on [default: 8125] + socket: (only for tcp servers) path to unix domain socket which will be used to receive + metrics [default: undefined] + socket_mod: (only for tcp servers) file mode which should be applied to unix domain socket, relevant + only if socket option is used [default: undefined] + debug: debug flag [default: false] + mgmt_address: address to run the management TCP interface on + [default: 0.0.0.0] + mgmt_port: port to run the management TCP interface on [default: 8126] + title: Allows for overriding the process title. [default: statsd] + if set to false, will not override the process title and let the OS set it. + The length of the title has to be less than or equal to the binary name + cli arguments + NOTE: This does not work on Mac's with node versions prior to v0.10 + healthStatus: default health status to be returned and statsd process starts ['up' or 'down', default: 'up'] + dumpMessages: log all incoming messages + flushInterval: interval (in ms) to flush metrics to each backend + percentThreshold: for time information, calculate the Nth percentile(s) + (can be a single value or list of floating-point values) + negative values mean to use "top" Nth percentile(s) values + [%, default: 90] + flush_counts: send stats_counts metrics [default: true] + keyFlush: log the most frequently sent keys [object, default: undefined] + interval: how often to log frequent keys [ms, default: 0] + percent: percentage of frequent keys to log [%, default: 100] + log: location of log file for frequent keys [default: STDOUT] + deleteIdleStats: don't send values to graphite for inactive counters, sets, gauges, or timers + as opposed to sending 0. For gauges, this unsets the gauge (instead of sending + the previous value). Can be individually overridden. [default: false] + deleteGauges: don't send values to graphite for inactive gauges, as opposed to sending the previous value [default: false] + gaugesMaxTTL: number of flush cycles to wait before the gauge is marked as inactive, to use in combination with deleteGauges [default: 1] + deleteTimers: don't send values to graphite for inactive timers, as opposed to sending 0 [default: false] + deleteSets: don't send values to graphite for inactive sets, as opposed to sending 0 [default: false] + deleteCounters: don't send values to graphite for inactive counters, as opposed to sending 0 [default: false] + prefixStats: prefix to use for the statsd statistics data for this running instance of statsd [default: statsd] + applies to both legacy and new namespacing + keyNameSanitize: sanitize all stat names on ingress [default: true] + If disabled, it is up to the backends to sanitize keynames + as appropriate per their storage requirements. + calculatedTimerMetrics: List of timer metrics that will be sent. Default will send all metrics. + To filter on percents and top percents: append '_percent' to the metric name. + Example: calculatedTimerMetrics: ['count', 'median', 'upper_percent', 'histogram'] + console: + prettyprint: whether to prettyprint the console backend + output [true or false, default: true] + log: log settings [object, default: undefined] + backend: where to log: stdout or syslog [string, default: stdout] + application: name of the application for syslog [string, default: statsd] + level: log level for [node-]syslog [string, default: LOG_INFO] + graphite: + legacyNamespace: use the legacy namespace [default: true] + globalPrefix: global prefix to use for sending stats to graphite [default: "stats"] + prefixCounter: graphite prefix for counter metrics [default: "counters"] + prefixTimer: graphite prefix for timer metrics [default: "timers"] + prefixGauge: graphite prefix for gauge metrics [default: "gauges"] + prefixSet: graphite prefix for set metrics [default: "sets"] + globalSuffix: global suffix to use for sending stats to graphite [default: ""] + This is particularly useful for sending per host stats by + settings this value to: require('os').hostname().split('.')[0] + repeater: an array of hashes of the for host: and port: + that details other statsd servers to which the received + packets should be "repeated" (duplicated to). + e.g. [ { host: '10.10.10.10', port: 8125 }, + { host: 'observer', port: 88125 } ] + repeaterProtocol: whether to use udp4, udp6, or tcp for repeaters. + ["udp4," "udp6", or "tcp" default: "udp4"] + histogram: for timers, an array of mappings of strings (to match metrics) and + corresponding ordered non-inclusive upper limits of bins. + For all matching metrics, histograms are maintained over + time by writing the frequencies for all bins. + 'inf' means infinity. A lower limit of 0 is assumed. + default: [], meaning no histograms for any timer. + First match wins. examples: + * histogram to only track render durations, with unequal + class intervals and catchall for outliers: + [ { metric: 'render', bins: [ 0.01, 0.1, 1, 10, 'inf'] } ] + * histogram for all timers except 'foo' related, + equal class interval and catchall for outliers: + [ { metric: 'foo', bins: [] }, + { metric: '', bins: [ 50, 100, 150, 200, 'inf'] } ] + automaticConfigReload: whether to watch the config file and reload it when it + changes. The default is true. Set this to false to disable. +*/ +{ + port: 8125 +, backends: ["./backends/console"] +} diff --git a/deployment/scripts/init-statsd.bash b/deployment/scripts/init-statsd.bash new file mode 100755 index 0000000..03fdd27 --- /dev/null +++ b/deployment/scripts/init-statsd.bash @@ -0,0 +1,5 @@ +#!/bin/bash + +npm i --location=global statsd + +statsd ./deployment/config/statsd/config diff --git a/deployment/scripts/setup.bash b/deployment/scripts/setup.bash index d2ba2f6..04be9c4 100755 --- a/deployment/scripts/setup.bash +++ b/deployment/scripts/setup.bash @@ -12,7 +12,11 @@ create_discovery_service_file() { cp deployment/config/discovery_service/services.json.example deployment/config/discovery_service/services.json } +create_statsd_config_file() { + cp deployment/config/statsd/config.example deployment/config/statsd/config +} create_env; create_discovery_service_file; create_firebase_service_file; +create_statsd_config_file; diff --git a/libs/event-args/message.js b/libs/event-args/message.js index 228b472..3b4a57a 100644 --- a/libs/event-args/message.js +++ b/libs/event-args/message.js @@ -271,6 +271,10 @@ class MessageEvent extends IEventArg { get destination() { return this._destination; } + + get isServerAck() { + return this.type === MESSAGE_TYPE.SERVER_ACK; + } } module.exports = { diff --git a/libs/event-store/kafka.js b/libs/event-store/kafka.js index b0cf41a..7b27457 100644 --- a/libs/event-store/kafka.js +++ b/libs/event-store/kafka.js @@ -314,12 +314,16 @@ class KafkaEventStore extends IEventStore { /** @type { string[] } */ #listenerEvents + /** @type {import('../stats-client/iStatsClient').IStatsClient} */ + statsClient; + constructor(context) { super(); this.#options = context.options; this.#logger = context.log; this.#listenerEvents = context.listenerEvents; this.#asyncStorage = context.asyncStorage; + this.statsClient = context.statsClient; } /** @@ -383,31 +387,73 @@ class KafkaEventStore extends IEventStore { this.#logger.info(`Running consumer`); await this.#consumer.run({ partitionsConsumedConcurrently: this.#listenerEvents.length, - eachMessage: ({ topic, partition, message }) => { - const start = Date.now(); - const trackId = message.headers.track_id.toString() || shortuuid(); - this.#asyncStorage.run(trackId, () => { - const data = { - key: message.key ? message.key.toString() : null, - value: message.value - }; - const logInfo = { - topic, + eachMessage: async ({ topic, partition, message }) => { + const start = new Date(); + const key = message.key ? message.key.toString() : null; + this.statsClient.increment({ + stat: 'event.receive.count', + tags: { + event: topic, partition, - offset: message.offset, - key: data.key, - }; - /** @type {import('./iEventArg').IEventArg} */ - const Message = decodeMessageCb(topic) - - this.#logger.info(`new data received`, logInfo); - const sConsume = Date.now(); - const eventArg = Message.fromBinary(data.value); - this.on(topic, eventArg, data.key); - logInfo.latency = Date.now() - start; - logInfo.consume_latency = Date.now() - sConsume; - this.#logger.info('message consumed', logInfo); + key, + broker: 'kafka' + } }); + const trackId = message.headers.track_id.toString() || shortuuid(); + try { + await this.#asyncStorage.run(trackId, async () => { + const data = { + key, + value: message.value + }; + const logInfo = { + topic, + partition, + offset: message.offset, + key: data.key, + }; + /** @type {import('./iEventArg').IEventArg} */ + const Message = decodeMessageCb(topic) + + this.#logger.info(`new data received`, logInfo); + const sProcess = new Date(); + const eventArg = Message.fromBinary(data.value); + await this.on(topic, eventArg, data.key); + this.statsClient.timing({ + stat: 'event.process.latency', + value: sProcess, + tags: { + event: topic, + partition, + key, + broker: 'kafka', + } + }); + this.#logger.info('message consumed', logInfo); + }); + } catch (e) { + this.statsClient.increment({ + stat: 'event.process.error_count', + tags: { + event: topic, + partition, + key, + broker: 'kafka' + } + }); + throw e; + } finally { + this.statsClient.timing({ + stat: 'event.consume.latency', + value: start, + tags: { + event: topic, + partition, + key, + broker: 'kafka' + } + }); + } } }); } catch (error) { @@ -440,7 +486,7 @@ class KafkaEventStore extends IEventStore { async emit(event, args, key) { const trackId = this.#asyncStorage.getStore() || shortuuid(); try { - const start = Date.now(); + const start = new Date(); const [response] = await this.#producer.send({ topic: event, messages: [ @@ -454,16 +500,40 @@ class KafkaEventStore extends IEventStore { ], acks: 1, }); - const elasped = Date.now() - start; + this.statsClient.timing({ + stat: 'event.emit.latency', + value: start, + tags: { + event, + key, + broker: 'kafka', + } + }); + this.statsClient.increment({ + stat: 'event.emit.count', + tags: { + event, + key, + broker: 'kafka' + } + }); + this.#logger.info(`Sucessfully produced message`, { event, partition: response.partition, offset: response.baseOffset, key, - produceIn: elasped }); } catch (error) { this.#logger.error(`Error while producing message`, { error }); + this.statsClient.increment({ + stat: 'event.emit.error_count', + tags: { + event, + key, + broker: 'kafka' + } + }); throw error; } } diff --git a/libs/event-store/nats.js b/libs/event-store/nats.js index ff37424..34e405d 100644 --- a/libs/event-store/nats.js +++ b/libs/event-store/nats.js @@ -42,6 +42,7 @@ ${options.natsAuthNkey} } return authOptions; } + function parseNatsOptions(options) { const servers = options.natsServerList.split(',') const authOptions = parseAuthOptions(options) @@ -84,12 +85,16 @@ class NatsEventStore extends IEventStore { /** @type {(topic: string) => IEventArg} */ #decodeMessageCb; + /** @type {import('../stats-client/iStatsClient').IStatsClient} */ + statsClient; + constructor(context) { super(); this.#options = context.options; this.#logger = context.log; this.#subjects = context.listenerEvents; this.#asyncStorage = context.asyncStorage; + this.statsClient = context.statsClient; } /** @@ -155,36 +160,72 @@ class NatsEventStore extends IEventStore { } async #eachMessage(msg) { - const start = Date.now(); - const trackId = msg.headers.get('track_id') || shortuuid(); + const start = new Date(); const [topic, key, partition] = msg.subject.split('.', 3); - - await this.#asyncStorage.run(trackId, async () => { - const Message = this.#decodeMessageCb(topic) - const logInfo = { - topic, + this.statsClient.increment({ + stat: 'event.receive.count', + tags: { + event: topic, partition, - offset: msg.seq, - key - }; - this.#logger.info(`new data received`, logInfo); - const sConsume = Date.now(); - const message = Message.fromBinary(msg.data) - try { - await this.on(topic, message, key); - msg.ack(); - } catch (e) { - this.#logger.error(`Error while processing message`, { err: e }); - // TODO: wait to msg to have retryCount - if (msg.redelivered) - msg.term(); - else - msg.nak(); + key, + broker: 'nats' } - logInfo.latency = Date.now() - start; - logInfo.consume_latency = Date.now() - sConsume; - this.#logger.info('message consumed', logInfo); - }); + }) + const trackId = msg.headers.get('track_id') || shortuuid(); + try { + await this.#asyncStorage.run(trackId, async () => { + const Message = this.#decodeMessageCb(topic) + const logInfo = { + topic, + partition, + offset: msg.seq, + key + }; + this.#logger.info(`new data received`, logInfo); + const sProcess = new Date(); + const message = Message.fromBinary(msg.data) + await this.on(topic, message, key); + this.statsClient.timing({ + stat: 'event.process.latency', + value: sProcess, + tags: { + event: topic, + partition, + key, + broker: 'nats', + } + }); + this.#logger.info('message consumed', logInfo); + }); + msg.ack(); + } catch (e) { + this.#logger.error(`Error while processing message`, { err: e }); + // TODO: wait to msg to have retryCount + if (msg.redelivered) + msg.term(); + else + msg.nak(); + this.statsClient.increment({ + stat: 'event.process.error_count', + tags: { + event: topic, + partition, + key, + broker: 'nats' + } + }) + } finally { + this.statsClient.timing({ + stat: 'event.consume.latency', + value: start, + tags: { + event: topic, + partition, + key, + broker: 'nats' + } + }); + } } /** @@ -207,7 +248,7 @@ class NatsEventStore extends IEventStore { async emit(event, args, key) { const trackId = this.#asyncStorage.getStore() || shortuuid(); try { - const start = Date.now(); + const start = new Date(); const jc = await this.#getJetStreamClient() const headers = nats.headers() headers.append('track_id', trackId) @@ -215,17 +256,41 @@ class NatsEventStore extends IEventStore { const response = await jc.publish(`${event}.${key}`, data, { headers, }); - const elasped = Date.now() - start; + this.statsClient.timing({ + stat: 'event.emit.latency', + value: start, + tags: { + event, + key, + broker: 'nats', + } + }); + this.statsClient.increment({ + stat: 'event.emit.count', + tags: { + event, + key, + broker: 'nats' + } + }); + this.#logger.info(`Sucessfully produced message`, { event, stream: response.stream, offset: response.seq, duplicate: response.duplicate, key, - produceIn: elasped }); } catch (error) { this.#logger.error(`Error while producing message`, { error }); + this.statsClient.increment({ + stat: 'event.emit.error_count', + tags: { + event, + key, + broker: 'nats' + } + }); throw error; } } diff --git a/libs/http-service-base.js b/libs/http-service-base.js index ba7cd5c..360cd9b 100644 --- a/libs/http-service-base.js +++ b/libs/http-service-base.js @@ -63,20 +63,28 @@ class HttpServiceBase extends ServiceBase { this.httpServer = this.hapiServer.listener; this.hapiServer.ext('onRequest', async (req, h) => { + req.startTime = new Date(); + this.statsClient.increment({ + stat: 'http.request.count', + tags: { + url: req.url.pathname, + } + }) const trackId = extractInfoFromRequest(req, 'x-request-id') || shortuuid(); + req.trackId = trackId; asyncStorage.enterWith(trackId); - const meter = this.meterDict[req.url.pathname]; - if (meter) meter.mark(); - req.startTime = Date.now(); this.log.info(`new request : ${req.url}`); return h.continue; }); this.hapiServer.ext('onPreResponse', (req, h) => { - const timeElapsed = Date.now() - req.startTime; - const hist = this.histDict[req.url.pathname]; - if (hist) hist.set(timeElapsed); - this.log.info(`${req.url.pathname} time elapsed ${timeElapsed}ms`); + this.statsClient.timing({ + stat: 'http.request.latency', + value: req.startTime, + tags: { + url: req.url.pathname, + } + }) return h.continue; }); @@ -93,15 +101,6 @@ class HttpServiceBase extends ServiceBase { addRoute(uri, method, handler, options = {}) { const path = `${this.baseRoute}${uri}`; - this.meterDict[path] = this.statsClient.meter({ - name: `${path}/sec`, - type: 'meter' - }); - this.histDict[path] = this.statsClient.metric({ - name: path, - type: 'histogram', - measurement: 'median' - }); if (options && options.validate) { options.validate.options = { abortEarly: false diff --git a/libs/service-base.js b/libs/service-base.js index 07cb35f..8164e30 100644 --- a/libs/service-base.js +++ b/libs/service-base.js @@ -15,14 +15,14 @@ async function initDefaultResources(options) { asyncStorage }; ctx = await initLogger(ctx); - ctx = await statsClient.initStatsClient(ctx); + ctx = await statsClient.initializeStatsClient(ctx); return ctx; } function initDefaultOptions() { - const cmd = new commander.Command('my name').allowUnknownOption(); + let cmd = new commander.Command('my name').allowUnknownOption(); cmd .option('--app-name ', 'application name') .option('--log-level ', 'logging level debug/info/error', 'info') @@ -31,6 +31,7 @@ function initDefaultOptions() { cmd.help(); process.exit(0); }); + cmd = statsClient.addStatsClientOptions(cmd); return cmd; } diff --git a/libs/stats-client.js b/libs/stats-client.js deleted file mode 100644 index 82c47a3..0000000 --- a/libs/stats-client.js +++ /dev/null @@ -1,11 +0,0 @@ -const io = require('@pm2/io'); - -async function initStatsClient(context) { - const statsClient = io; - context.statsClient = statsClient; - return context; -} - -module.exports = { - initStatsClient -}; diff --git a/libs/stats-client/iStatsClient.js b/libs/stats-client/iStatsClient.js new file mode 100644 index 0000000..c7e8d81 --- /dev/null +++ b/libs/stats-client/iStatsClient.js @@ -0,0 +1,65 @@ +/* eslint-disable class-methods-use-this */ +/** + * @typedef {Object} Stats + * @property {string} stat name of the stat + * @property {number|Date} value value of the stat + * @property {number} sample_rate Determines the sampling rate. Range (0.0-1.0) + * @property {Object} tags Set additional meta data for the stat. + */ + +/** + * @abstract + * Base class for the stats client. + */ +class IStatsClient { + + constructor() { + if (new.target === IStatsClient) { + throw new TypeError("Cannot construct IStatsClient instances directly"); + } + } + + /** + * Increments a stat by specified amount + * @param {Stats} stats + * @returns + */ + // eslint-disable-next-line no-unused-vars + increment(stats) { + throw new Error("Not Implemented Exception") + } + + /** + * Decrements a stat by a specified amount + * @param {Stats} stats + * @returns + */ + // eslint-disable-next-line no-unused-vars + decrement(stats) { + throw new Error("Not Implemented Exception") + } + + /** + * Measure the value of a resource. It maintain its value until it is next set. + * @param {Stats} stats + * @returns + */ + // eslint-disable-next-line no-unused-vars + gauge(stats) { + throw new Error("Not Implemented Exception") + } + + /** + * Mesaure the time taken by an operation. + * @param {Stats} stats + * @returns + */ + // eslint-disable-next-line no-unused-vars + timing(stats) { + throw new Error("Not Implemented Exception") + } +} + +module.exports = { + IStatsClient +} diff --git a/libs/stats-client/index.js b/libs/stats-client/index.js new file mode 100644 index 0000000..7543add --- /dev/null +++ b/libs/stats-client/index.js @@ -0,0 +1,55 @@ +const { IStatsClient } = require('./iStatsClient'); +const StatsD = require('./statsd'); + +const STATS_CLIENT = [ + StatsD +]; + +/** + * Add command line options for Stats Client + * @param {import('commander').Command} cmd + * @returns {import('commander').Command} + */ +function addOptions(cmd) { + cmd = cmd.option('--stats-client', 'Which stats client to use (statsd)', 'statsd'); + STATS_CLIENT.forEach((client) => { + if (client.initOptions) { + cmd = client.initOptions(cmd); + } + }); + return cmd; +} + +/** + * @private + * Get Stats client as per the context options + * @param {{options: {statsClient: string}}} context + * @returns + */ +function getStatsClientImpl(context) { + const { + options: { statsClient } + } = context; + const client = STATS_CLIENT.find((c) => c.code === statsClient); + if (!client) { + throw new Error(`${statsClient} is not a registered stats client`); + } + return client; +} + +/** + * Initialize Stats client + * @param {Object} context + */ +async function initialize(context) { + const impl = getStatsClientImpl(context); + const client = await impl.initialize(context, context.options); + context.statsClient = client; + return context; +} + +module.exports = { + IStatsClient, + addStatsClientOptions: addOptions, + initializeStatsClient: initialize, +} diff --git a/libs/stats-client/statsd.js b/libs/stats-client/statsd.js new file mode 100644 index 0000000..eea5578 --- /dev/null +++ b/libs/stats-client/statsd.js @@ -0,0 +1,81 @@ +const { StatsD } = require('hot-shots'); +const { IStatsClient } = require('./iStatsClient'); + +/** @typedef {import('./iStatsClient').Stats} Stats */ + +class StatsDClient extends IStatsClient { + /** @type {import('hot-shots').StatsD} */ + client; + + constructor(options) { + super(); + + this.client = new StatsD({ + host: options.statsdHost || '127.0.0.1', + port: options.statsdPort || 8125, + protocol: options.statsdProtocol || 'udp', + globalTags: { + app: options.appName, + } + }) + } + + /** + * Increments a stat by specified amount + * @param {Stats} stats + * @returns + */ + increment(stats) { + this.client.increment(stats.stat, stats.value, stats.sample_rate, stats.tags); + } + + /** + * Decrements a stat by a specified amount + * @param {Stats} stats + * @returns + */ + decrement(stats) { + this.client.increment(stats.stat, stats.value, stats.sample_rate, stats.tags); + } + + /** + * Measure the value of a resource. It maintain its value until it is next set. + * @param {Stats} stats + * @returns + */ + gauge(stats) { + this.client.gauge(stats.stat, stats.value, stats.sample_rate, stats.tags); + } + + /** + * Mesaure the time taken by an operation. + * @param {Stats} stats + * @returns + */ + timing(stats) { + this.client.timing(stats.stat, stats.value, stats.sample_rate, stats.tags); + } + +} + +/** + * Add StatsD options + * @param {import('commander').Command} cmd + */ +function initOptions(cmd) { + return cmd + .option('--statsd-host', 'StatsD server host, default 127.0.0.1', '127.0.0.1') + .option('--statsd-port', 'StatsD server port, default 8125', (c) => Number(c), 8125) + .option('statsd-protocol', 'StatsD protocol, default UDP', 'udp') +} + +async function initialize(context, options) { + const client = new StatsDClient(options); + return client; +} + +module.exports = { + code: 'statsd', + initOptions, + initialize, +} diff --git a/package.json b/package.json index 0565b50..40240ec 100644 --- a/package.json +++ b/package.json @@ -21,6 +21,7 @@ "aws-sdk": "^2.1185.0", "commander": "^9.4.0", "firebase-admin": "^11.0.1", + "hot-shots": "^9.2.0", "ioredis": "^5.2.2", "joi": "^17.6.0", "kafkajs": "2.1.0", diff --git a/services/connection-gateway/gateway.js b/services/connection-gateway/gateway.js index d870f37..d993f4f 100644 --- a/services/connection-gateway/gateway.js +++ b/services/connection-gateway/gateway.js @@ -92,14 +92,6 @@ class Gateway extends HttpServiceBase { this.publishEvent = async (event, eventArgs, key) => { await eventStore.emit(events[event], eventArgs, key); }; - this.userConnectedCounter = this.statsClient.counter({ - name: 'userConnected' - }); - this.newMessageMeter = this.statsClient.meter({ - name: 'newMessage/sec', - type: 'meter' - }); - this.userSocketMapping = new Map(); } @@ -141,6 +133,17 @@ class Gateway extends HttpServiceBase { const { format, ack } = req.query; const isbinary = format === 'binary'; const messages = req.payload; + + this.statsClient.increment({ + stat: 'message.received.count', + value: messages.length, + tags: { + channel: 'rest', + gateway: this.options.gatewayName, + user, + } + }); + const promises = messages.map(async (msg) => { let event; const options = { @@ -170,6 +173,14 @@ class Gateway extends HttpServiceBase { } async onMessage(payload, isBinary, user) { + this.statsClient.increment({ + stat: 'message.received.count', + tags: { + channel: 'websocket', + gateway: this.options.gatewayName, + user, + } + }); const ws = this.userSocketMapping.get(user); if (!isBinary) { const msg = payload.toString(); @@ -180,7 +191,6 @@ class Gateway extends HttpServiceBase { } const trackId = shortuuid(); this.context.asyncStorage.run(trackId, async () => { - this.newMessageMeter.mark(); const options = { source: user } @@ -201,14 +211,30 @@ class Gateway extends HttpServiceBase { async onConnect(user, ws) { this.userSocketMapping.set(user, ws); - this.userConnectedCounter.inc(1); + this.statsClient.gauge({ + stat: 'user.connected.count', + value: '+1', + tags: { + service: 'gateway', + gateway: this.options.gatewayName, + user, + } + }); const message = ConnectionStateEvent.connect(user, this.options.gatewayName); await this.publishEvent(EVENT_TYPE.CONNECTION_EVENT, message, user); } async onDisconnect(user) { this.userSocketMapping.delete(user); - this.userConnectedCounter.dec(1); + this.statsClient.gauge({ + stat: 'user.connected.count', + value: -1, + tags: { + service: 'gateway', + gateway: this.options.gatewayName, + user, + } + }); const message = ConnectionStateEvent.disconnect(user, this.options.gatewayName); await this.publishEvent(EVENT_TYPE.CONNECTION_EVENT, message, user); } @@ -221,17 +247,22 @@ class Gateway extends HttpServiceBase { if (!messages.length) return; const ws = this.userSocketMapping.get(receiver); if (ws) { - const latencies = [] const uError = [] messages.forEach((m) => { try { - const message = MessageEvent.fromBinary(Buffer.from(m.raw)) - this.sendWebsocketMessage(receiver, message) - latencies.push({ - retry: meta.retry, - saved: meta.saved, - sid: message.server_id, - latency: getUTCTime() - message.server_timestamp, + const message = MessageEvent.fromBinary(Buffer.from(m.raw)); + this.sendWebsocketMessage(receiver, message); + this.statsClient.timing({ + stat: 'message.delivery.latency', + value: getUTCTime() - message.server_timestamp, + tags: { + gateway: this.options.gatewayName, + channel: 'websocket', + user: receiver, + retry: meta.retry || 0, + saved: meta.saved || false, + sid: message.server_id, + } }) } catch (e) { uError.push({ @@ -245,13 +276,33 @@ class Gateway extends HttpServiceBase { receiver, messages: uError }) - this.log.info(`Message delivery to user`, { latencies }); + if (uError.length) { + this.statsClient.increment({ + stat: 'message.delivery.error_count', + value: uError.length, + tags: { + channel: 'websocket', + gateway: this.options.gatewayName, + user: receiver, + code: 500, + } + }) + } } else { errors.push({ code: 404, receiver, messages: messages.map((m) => ({ sid: m.sid })) }); + this.statsClient.increment({ + stat: 'message.delivery.error_count', + tags: { + channel: 'websocket', + gateway: this.options.gatewayName, + user: receiver, + code: 404, + } + }) } }); return { @@ -271,6 +322,16 @@ class Gateway extends HttpServiceBase { } else { ws.send(message.toString()); } + this.statsClient.increment({ + stat: 'message.delivery.count', + tags: { + serverAck: message.isServerAck, + channel: 'websocket', + gateway: this.options.gatewayName, + user, + format: ws.isbinary ? 'binary' : 'text', + } + }); } async shutdown() { diff --git a/services/group-ms/group-message-router-ms.js b/services/group-ms/group-message-router-ms.js index efa135c..0224ed6 100644 --- a/services/group-ms/group-message-router-ms.js +++ b/services/group-ms/group-message-router-ms.js @@ -86,7 +86,6 @@ class GroupMessageRouterMS extends ServiceBase { * @param {string} key */ async redirectMessage(message) { - const start = Date.now(); const users = await this.getGroupUsers(message.destination, message.source); let event; switch (message.type) { @@ -100,7 +99,7 @@ class GroupMessageRouterMS extends ServiceBase { await this.publish(event, message, user); }) await Promise.all(promises) - this.log.info('Message redirected', { sid: message.server_id, latency: Date.now() - start }); + this.log.info('Message redirected', { sid: message.server_id }); } async publish(event, message, key) { diff --git a/services/message-delivery/message-delivery-ms.js b/services/message-delivery/message-delivery-ms.js index d08cd80..9f99c09 100644 --- a/services/message-delivery/message-delivery-ms.js +++ b/services/message-delivery/message-delivery-ms.js @@ -117,13 +117,6 @@ class MessageDeliveryMS extends ServiceBase { this.eventStore = this.context.eventStore; this.events = this.context.events; this.ackAlias = this.options.ackActionAlias; - this.userConnectedCounter = this.statsClient.counter({ - name: 'userconnected' - }); - this.getServerMeter = this.statsClient.meter({ - name: 'getServer/sec', - type: 'meter' - }); } init() { @@ -168,14 +161,28 @@ class MessageDeliveryMS extends ServiceBase { async onConnect(user, server) { await this.memCache.set(user, server); this.sendPendingMessage(user); - this.userConnectedCounter.inc(1); + this.statsClient.gauge({ + stat: 'user.connected.count', + value: '+1', + tags: { + service: 'delivery-ms', + user, + } + }); } async onDisconnect(user, server) { const exitingServer = await this.memCache.get(user); if (exitingServer === server) { await this.memCache.del(user); - this.userConnectedCounter.dec(1); + this.statsClient.gauge({ + stat: 'user.connected.count', + value: -1, + tags: { + service: 'delivery-ms', + user, + } + }); } } diff --git a/services/message-router-ms/message-router-ms.js b/services/message-router-ms/message-router-ms.js index 9893c92..2a6faf2 100644 --- a/services/message-router-ms/message-router-ms.js +++ b/services/message-router-ms/message-router-ms.js @@ -76,15 +76,10 @@ class MessageRouterMS extends ServiceBase { this.eventStore = this.context.eventStore; this.events = this.context.events; this.maxRetryCount = this.options.messageMaxRetries; - this.redirectMessageMeter = this.statsClient.meter({ - name: 'redirectMessage/sec', - type: 'meter' - }); } init() { this.eventStore.on = (_, message, key) => { - this.redirectMessageMeter.mark(); this.redirectMessage(message, key); }; } @@ -95,7 +90,6 @@ class MessageRouterMS extends ServiceBase { * @param {string} key */ async redirectMessage(message, key) { - const start = Date.now(); let event; switch (message.channel) { case CHANNEL_TYPE.GROUP: @@ -113,7 +107,7 @@ class MessageRouterMS extends ServiceBase { } await this.publish(event, message, key); - this.log.info('Message redirected', { sid: message.server_id, latency: Date.now() - start }); + this.log.info('Message redirected', { sid: message.server_id, }); } async publish(event, message, key) { diff --git a/services/notification-ms/notification-ms.js b/services/notification-ms/notification-ms.js index b946832..41f0872 100644 --- a/services/notification-ms/notification-ms.js +++ b/services/notification-ms/notification-ms.js @@ -75,15 +75,6 @@ class NotificationMS extends ServiceBase { /** @type {import('./pns/pn-service').IPushNotificationService} */ this.pns = context.pns; - this.notificationMeter = this.statsClient.meter({ - name: 'notificationMeter/sec', - type: 'meter' - }); - - this.failedNotificationMeter = this.statsClient.meter({ - name: 'failedNotification/sec', - type: 'meter' - }); /** @type {import('../../libs/event-store/iEventStore').IEventStore} */ this.eventStore = this.context.eventStore; this.events = this.context.events; @@ -126,10 +117,23 @@ class NotificationMS extends ServiceBase { const record = await this.notifDB.getToken(user, { deviceId: 'default' }); if (!record) return; const payload = (record.messageVersion || 2.1) < 3 ? message.toString() : message.toBinary() - await this.pns.push(record.notificationToken, payload).catch((err) => { - this.failedNotificationMeter.mark(); - this.log.error(`Error while sending push notification ${err}`, err); - }); + await this.pns.push(record.notificationToken, payload) + .then(() => { + this.statsClient.increment({ + stat: 'notificaton.delivery.count', + tags: { + user, + } + }); + }).catch((err) => { + this.statsClient.increment({ + stat: 'notificaton.delivery.error_count', + tags: { + user, + } + }); + this.log.error(`Error while sending push notification ${err}`, err); + }); } async shutdown() { diff --git a/yarn.lock b/yarn.lock index c487b19..dc1c6ac 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2007,6 +2007,13 @@ bignumber.js@^9.0.0: resolved "https://registry.yarnpkg.com/bignumber.js/-/bignumber.js-9.0.2.tgz#71c6c6bed38de64e24a65ebe16cfcf23ae693673" integrity sha512-GAcQvbpsM0pUb0zw1EI0KhQEZ+lRwR5fYaAp3vPOYuP7aDvGy6cVN6XHLauvF8SOga2y0dcLcjt3iQDTSEliyw== +bindings@^1.3.0: + version "1.5.0" + resolved "https://registry.yarnpkg.com/bindings/-/bindings-1.5.0.tgz#10353c9e945334bc0511a6d90b38fbc7c9c504df" + integrity sha512-p2q/t/mhvuOj/UeLlV6566GD/guowlr0hHxClI0W9m7MWYkL1F0hLo+0Aexs9HSPCtR1SXQ0TD3MMKrXZajbiQ== + dependencies: + file-uri-to-path "1.0.0" + brace-expansion@^1.1.7: version "1.1.11" resolved "https://registry.yarnpkg.com/brace-expansion/-/brace-expansion-1.1.11.tgz#3c7fcbf529d87226f3d2f52b966ff5271eb441dd" @@ -2747,6 +2754,11 @@ file-entry-cache@^6.0.1: dependencies: flat-cache "^3.0.4" +file-uri-to-path@1.0.0: + version "1.0.0" + resolved "https://registry.yarnpkg.com/file-uri-to-path/-/file-uri-to-path-1.0.0.tgz#553a7b8446ff6f684359c445f1e37a05dacc33dd" + integrity sha512-0Zt+s3L7Vf1biwWZ29aARiVYLx7iMGnEUl9x33fbB/j3jR81u/O2LbqK+Bm1CDSNDKVtJ/YjwY7TUd5SkeLQLw== + fill-range@^7.0.1: version "7.0.1" resolved "https://registry.yarnpkg.com/fill-range/-/fill-range-7.0.1.tgz#1919a6a7c75fe38b2c7c77e5198535da9acdda40" @@ -3082,6 +3094,13 @@ has@^1.0.3: dependencies: function-bind "^1.1.1" +hot-shots@^9.2.0: + version "9.2.0" + resolved "https://registry.yarnpkg.com/hot-shots/-/hot-shots-9.2.0.tgz#b1460f63b7f2789d301bcd9f4bbfc46c3cb24777" + integrity sha512-MNn/562GiAMo3LIua/CLUHIkIr7Myh8ycGl/0mgbRh9w+iW+uDFme6+TbW2TAGeMMjJmRjvTAJdYetOZn3zGxw== + optionalDependencies: + unix-dgram "2.0.x" + http-parser-js@>=0.5.1: version "0.5.8" resolved "https://registry.yarnpkg.com/http-parser-js/-/http-parser-js-0.5.8.tgz#af23090d9ac4e24573de6f6aecc9d84a48bf20e3" @@ -3746,6 +3765,11 @@ ms@^2.1.1: resolved "https://registry.yarnpkg.com/ms/-/ms-2.1.3.tgz#574c8138ce1d2b5861f0b44579dbadd60c6615b2" integrity sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA== +nan@^2.13.2: + version "2.16.0" + resolved "https://registry.yarnpkg.com/nan/-/nan-2.16.0.tgz#664f43e45460fb98faf00edca0bb0d7b8dce7916" + integrity sha512-UdAqHyFngu7TfQKsCBgAA6pWDkT8MAO7d0jyOecVhN5354xbLqdn8mV9Tat9gepAupm0bt2DbeaSC8vS52MuFA== + nats@^2.7.1: version "2.7.1" resolved "https://registry.yarnpkg.com/nats/-/nats-2.7.1.tgz#d390a48df1a348a335143abc09afad03b2eb9687" @@ -4585,6 +4609,14 @@ unicode-property-aliases-ecmascript@^2.0.0: resolved "https://registry.yarnpkg.com/unicode-property-aliases-ecmascript/-/unicode-property-aliases-ecmascript-2.0.0.tgz#0a36cb9a585c4f6abd51ad1deddb285c165297c8" integrity sha512-5Zfuy9q/DFr4tfO7ZPeVXb1aPoeQSdeFMLpYuFebehDAhbuevLs5yxSZmIFN1tP5F9Wl4IpJrYojg85/zgyZHQ== +unix-dgram@2.0.x: + version "2.0.4" + resolved "https://registry.yarnpkg.com/unix-dgram/-/unix-dgram-2.0.4.tgz#14d4fc21e539742b8fb027de16eccd4e5503a344" + integrity sha512-7tpK6x7ls7J7pDrrAU63h93R0dVhRbPwiRRCawR10cl+2e1VOvF3bHlVJc6WI1dl/8qk5He673QU+Ogv7bPNaw== + dependencies: + bindings "^1.3.0" + nan "^2.13.2" + update-browserslist-db@^1.0.4: version "1.0.4" resolved "https://registry.yarnpkg.com/update-browserslist-db/-/update-browserslist-db-1.0.4.tgz#dbfc5a789caa26b1db8990796c2c8ebbce304824"