diff --git a/lib/apps/agent/index.ts b/lib/apps/agent/index.ts index b65ce3d..0eb6e74 100644 --- a/lib/apps/agent/index.ts +++ b/lib/apps/agent/index.ts @@ -20,11 +20,12 @@ import * as os from 'os' import UDPServer from 'lib/udp/server' import config from 'lib/config' import Logger from 'lib/logger' +import UDPClientFactory from 'lib/udp/clientFactory' const webServer = new WebServer(config) const discovery = new ServiceDiscovery(config, got) const metrics = new Metrics(config) const logger = new Logger('agent') - +const udpClientFactory = new UDPClientFactory(config) const udpServer = new UDPServer(config) const delay = (ms: number) => { return new Promise((resolve) => setTimeout(resolve, ms)) @@ -41,7 +42,14 @@ const delay = (ms: number) => { process.exit(1) } logger.info(`loaded metadata`, me) - const tester = new Tester(config, got, discovery, metrics, me) + const tester = new Tester( + config, + got, + discovery, + metrics, + me, + udpClientFactory + ) const handlerInit = (app: Application): Promise => { const indexController = new IndexController(metrics, tester, discovery) new IndexRoutes().applyRoutes(app, indexController) diff --git a/lib/tester/index.ts b/lib/tester/index.ts index f07ac9f..0180fc7 100644 --- a/lib/tester/index.ts +++ b/lib/tester/index.ts @@ -2,10 +2,11 @@ import { Got } from 'got/dist/source' import { IDiscovery, IAgent } from 'lib/discovery' import { PlainResponse } from 'got/dist/source/core' import { IMetrics } from 'lib/apps/agent/metrics' -import UDPClient, { IUDPClient, IUDPPingResult } from 'lib/udp/client' +import { IUDPPingResult } from 'lib/udp/client' import { IConfig } from 'lib/config' import Logger, { ILogger } from 'lib/logger' import * as dns from 'dns' +import { IUdpClientFactory as IUDPClientFactory } from 'lib/udp/clientFactory' export interface ITester { start() @@ -44,21 +45,23 @@ export default class Tester implements ITester { private me: IAgent private running = false private config: IConfig - private clients: { [key: string]: IUDPClient } = {} private resolver = new dns.promises.Resolver() + private readonly udpClientFactory: IUDPClientFactory constructor( config: IConfig, got: Got, discovery: IDiscovery, metrics: IMetrics, - me: IAgent + me: IAgent, + udpClientFactory: IUDPClientFactory ) { this.got = got this.discovery = discovery this.metrics = metrics this.me = me this.config = config + this.udpClientFactory = udpClientFactory } public async start(): Promise { @@ -145,39 +148,37 @@ export default class Tester implements ITester { } public async runUDPTests(agents: IAgent[]): Promise { - agents.forEach((agent) => { - if (!this.clients[agent.ip]) { - this.logger.info(`new udp client created for ${agent.ip}`) - this.clients[agent.ip] = new UDPClient(agent.ip, this.config.port) - } - }) - Object.keys(this.clients).forEach((ip) => { - const agent = agents.find((a) => a.ip === ip) - if (!agent) { - this.logger.info(`udp client removed for ${ip}`) - this.clients[ip].destroy() - delete this.clients[ip] - } - }) - const results: IUDPTestResult[] = [] + this.udpClientFactory.generateClientsForAgents(agents) + const testAgent = async (agent: IAgent): Promise => { - const client = this.clients[agent.ip] - const result = await client.ping( - this.config.testConfig.udp.timeout, - this.config.testConfig.udp.packets - ) - if (result.loss > 0) { - this.logger.warn('packet loss detected', result) - } - const testResult: IUDPTestResult = { - source: this.me, - destination: agent, - timings: result, - result: result.loss > 0 ? 'fail' : 'pass' + const client = this.udpClientFactory.clientFor(agent) + try { + const result = await client.ping( + this.config.testConfig.udp.timeout, + this.config.testConfig.udp.packets + ) + if (result.loss > 0) { + this.logger.warn('packet loss detected', result) + } + const testResult: IUDPTestResult = { + source: this.me, + destination: agent, + timings: result, + result: result.loss > 0 ? 'fail' : 'pass' + } + results.push(testResult) + this.metrics.handleUDPTestResult(testResult) + } catch (ex) { + this.logger.error('Failed to execute UDP test', ex) + const testResult: IUDPTestResult = { + source: this.me, + destination: agent, + result: 'fail' + } + results.push(testResult) + this.metrics.handleUDPTestResult(testResult) } - results.push(testResult) - this.metrics.handleUDPTestResult(testResult) } const promises = agents.map(testAgent) await Promise.allSettled(promises) diff --git a/lib/udp/clientFactory.ts b/lib/udp/clientFactory.ts new file mode 100644 index 0000000..cfec8d7 --- /dev/null +++ b/lib/udp/clientFactory.ts @@ -0,0 +1,39 @@ +import { IAgent } from 'lib/discovery' +import UDPClient, { IUDPClient } from 'lib/udp/client' +import Logger from 'lib/logger' +import { IConfig } from 'lib/config' + +export interface IUdpClientFactory { + generateClientsForAgents(agents: IAgent[]): void + clientFor(agent: IAgent): IUDPClient +} + +export default class UDPClientFactory implements IUdpClientFactory { + private clients: { [key: string]: IUDPClient } = {} + private readonly logger = new Logger('udp-client-factory') + private readonly config: IConfig + constructor(config: IConfig) { + this.config = config + } + + public generateClientsForAgents(agents: IAgent[]) { + agents.forEach((agent) => { + if (!this.clients[agent.ip]) { + this.logger.info(`new udp client created for ${agent.ip}`) + this.clients[agent.ip] = new UDPClient(agent.ip, this.config.port) + } + }) + Object.keys(this.clients).forEach((ip) => { + const agent = agents.find((a) => a.ip === ip) + if (!agent) { + this.logger.info(`udp client removed for ${ip}`) + this.clients[ip].destroy() + delete this.clients[ip] + } + }) + } + + public clientFor(agent: IAgent): IUDPClient { + return this.clients[agent.ip] + } +} diff --git a/test/tester.test.ts b/test/tester.test.ts index e3747ea..0ca944a 100644 --- a/test/tester.test.ts +++ b/test/tester.test.ts @@ -4,32 +4,31 @@ import { IConfig } from 'lib/config' import { IDiscovery, IAgent } from 'lib/discovery' import { IMetrics } from 'lib/apps/agent/metrics' import * as should from 'should' -import UDPServer, { IUDPServer } from 'lib/udp/server' import { Got } from 'got/dist/source' +import { IUdpClientFactory } from 'lib/udp/clientFactory' +import { IUDPClient, IUDPPingResult } from 'lib/udp/client' describe('Tester', () => { let sut: ITester let config: IConfig - let udpserver: IUDPServer let got: Got + let udpClientFactory: IUdpClientFactory + before(async () => { config = td.object() config.testConfig.udp.timeout = 500 config.testConfig.udp.packets = 1 config.testConfig.tcp.timeout = 500 config.port = 8080 - udpserver = new UDPServer(config) - await udpserver.start() }) + beforeEach(async () => { const discovery = td.object() const metrics = td.object() const me = td.object() got = td.function() - sut = new Tester(config, got, discovery, metrics, me) - }) - after(async () => { - await udpserver.stop() + udpClientFactory = td.object() + sut = new Tester(config, got, discovery, metrics, me, udpClientFactory) }) it('should do a dns test', async () => { @@ -39,15 +38,47 @@ describe('Tester', () => { }) it('should do a udp test', async () => { + const udpClient = td.object() + const udpPingResult = td.object() + udpPingResult.success = true + td.when( + udpClient.ping( + config.testConfig.udp.timeout, + config.testConfig.udp.packets + ) + ).thenResolve(udpPingResult) const agent = td.object() agent.ip = '127.0.0.1' agent.name = 'local' agent.nodeName = 'some-node' agent.zone = 'some-zone' + td.when(udpClientFactory.clientFor(agent)).thenReturn(udpClient) + const result = await sut.runUDPTests([agent]) should(result[0].result).eql('pass') }) + it('should should capture a failed ping as an error', async () => { + const udpClient = td.object() + const udpPingResult = td.object() + udpPingResult.success = true + td.when( + udpClient.ping( + config.testConfig.udp.timeout, + config.testConfig.udp.packets + ) + ).thenReject(new Error('boom')) + const agent = td.object() + agent.ip = '127.0.0.1' + agent.name = 'local' + agent.nodeName = 'some-node' + agent.zone = 'some-zone' + td.when(udpClientFactory.clientFor(agent)).thenReturn(udpClient) + + const result = await sut.runUDPTests([agent]) + should(result[0].result).eql('fail') + }) + it('should do a tcp test', async () => { td.when( got('http://127.0.0.1:8080/readiness', { timeout: 500 })