diff --git a/src/lib/ws/gateway.ts b/src/lib/ws/gateway.ts index b6c7bed5..c7e71fca 100644 --- a/src/lib/ws/gateway.ts +++ b/src/lib/ws/gateway.ts @@ -38,10 +38,10 @@ io // Handlers subscribeWithHandler(socket, 'probe:status:update', handleStatusUpdate(probe)); - socket.on('probe:isIPv6Supported:update', handleIsIPv6SupportedUpdate(probe)); - socket.on('probe:isIPv4Supported:update', handleIsIPv4SupportedUpdate(probe)); - socket.on('probe:dns:update', handleDnsUpdate(probe)); - socket.on('probe:stats:report', handleStatsReport(probe)); + subscribeWithHandler(socket, 'probe:isIPv6Supported:update', handleIsIPv6SupportedUpdate(probe)); + subscribeWithHandler(socket, 'probe:isIPv4Supported:update', handleIsIPv4SupportedUpdate(probe)); + subscribeWithHandler(socket, 'probe:dns:update', handleDnsUpdate(probe)); + subscribeWithHandler(socket, 'probe:stats:report', handleStatsReport(probe)); socket.onAnyOutgoing(listenMeasurementRequest(probe)); subscribeWithHandler(socket, 'probe:measurement:ack', handleMeasurementAck(probe)); subscribeWithHandler(socket, 'probe:measurement:progress', handleMeasurementProgress(probe)); diff --git a/src/lib/ws/helper/subscribe-handler.ts b/src/lib/ws/helper/subscribe-handler.ts index 4dc910fd..e123a867 100644 --- a/src/lib/ws/helper/subscribe-handler.ts +++ b/src/lib/ws/helper/subscribe-handler.ts @@ -5,7 +5,7 @@ import type { ServerSocket } from '../server.js'; const logger = scopedLogger('ws:handler:error'); const isError = (error: unknown): error is Error => Boolean(error as Error['message']); -type HandlerMethod = (...args: never[]) => Promise; +type HandlerMethod = (...args: never[]) => Promise | void; export const subscribeWithHandler = (socket: ServerSocket, event: string, method: HandlerMethod) => { socket.on(event, async (...args) => { @@ -21,11 +21,7 @@ export const subscribeWithHandler = (socket: ServerSocket, event: string, method } if (Joi.isError(error)) { - const messages = error.details.map(({ message, context }) => `${message}. Received: "${context?.value}"`); - - if (messages.length) { - details = messages.join('\n'); - } + details = formatJoiError(error); } logger.info(`Event "${event}" failed to handle for (${details})`, { @@ -36,3 +32,17 @@ export const subscribeWithHandler = (socket: ServerSocket, event: string, method } }); }; + +const formatJoiError = (error: Joi.ValidationError) => { + const messages = error.details.map(({ message, context }) => { + let str = `${message}.`; + + if (context?.value) { + str += `Received: "${context?.value}".`; + } + + return str; + }); + + return messages.length ? error.message : messages.join('\n'); +}; diff --git a/src/measurement/handler/progress.ts b/src/measurement/handler/progress.ts index 5aab8cc4..73942b26 100644 --- a/src/measurement/handler/progress.ts +++ b/src/measurement/handler/progress.ts @@ -1,11 +1,29 @@ +import Joi from 'joi'; import type { Probe } from '../../probe/types.js'; import type { MeasurementProgressMessage } from '../types.js'; import { getMeasurementRunner } from '../runner.js'; import { getProbeValidator } from '../../lib/probe-validator.js'; +const schema = Joi.object({ + testId: Joi.string().required(), + measurementId: Joi.string().required(), + overwrite: Joi.boolean(), + result: Joi.object({ + rawOutput: Joi.string().required(), + rawHeaders: Joi.string(), + rawBody: Joi.string(), + }).required(), +}).required(); + const runner = getMeasurementRunner(); export const handleMeasurementProgress = (probe: Probe) => async (data: MeasurementProgressMessage): Promise => { - await getProbeValidator().validateProbe(data.measurementId, data.testId, probe.uuid); - await runner.recordProgress(data); + const validation = schema.validate(data); + + if (validation.error) { + throw validation.error; + } + + await getProbeValidator().validateProbe(validation.value.measurementId, validation.value.testId, probe.uuid); + await runner.recordProgress(validation.value); }; diff --git a/src/measurement/handler/result.ts b/src/measurement/handler/result.ts index ca447753..7a0171f3 100644 --- a/src/measurement/handler/result.ts +++ b/src/measurement/handler/result.ts @@ -1,11 +1,47 @@ +import Joi from 'joi'; import type { Probe } from '../../probe/types.js'; -import type { MeasurementResultMessage } from '../types.js'; +import type { MeasurementResultMessage, PingResult } from '../types.js'; import { getMeasurementRunner } from '../runner.js'; import { getProbeValidator } from '../../lib/probe-validator.js'; +const pingResultSchema = Joi.object({ + status: Joi.string().required(), + rawOutput: Joi.string().required(), + resolvedAddress: Joi.string().required().allow(null), + resolvedHostname: Joi.string().required().allow(null), + timings: Joi.array().items(Joi.object({ + rtt: Joi.number().required(), + ttl: Joi.number().required(), + })).required(), + stats: Joi.object({ + min: Joi.number().required().allow(null), + max: Joi.number().required().allow(null), + avg: Joi.number().required().allow(null), + total: Joi.number().required().allow(null), + loss: Joi.number().required().allow(null), + rcv: Joi.number().required().allow(null), + drop: Joi.number().required().allow(null), + }).required(), +}); + +const schema = Joi.object({ + testId: Joi.string().required(), + measurementId: Joi.string().required(), + overwrite: Joi.boolean(), + result: Joi.alternatives([ + pingResultSchema, + ]).required(), +}).required(); + const runner = getMeasurementRunner(); export const handleMeasurementResult = (probe: Probe) => async (data: MeasurementResultMessage): Promise => { + const validation = schema.validate(data); + + if (validation.error) { + throw validation.error; + } + await getProbeValidator().validateProbe(data.measurementId, data.testId, probe.uuid); await runner.recordResult(data); }; diff --git a/src/measurement/types.ts b/src/measurement/types.ts index efaeb610..b3bde808 100644 --- a/src/measurement/types.ts +++ b/src/measurement/types.ts @@ -20,13 +20,17 @@ type PingTiming = { }; export type PingResult = TestResult & { + resolvedAddress: string | null, + resolvedHostname: string | null, timings: PingTiming[]; stats: { - min: number; - avg: number; - max: number; - stddev: number; - packetLoss: number; + min: number | null, + max: number | null, + avg: number | null, + total: number | null, + loss: number | null, + rcv: number | null, + drop: number | null, }; }; diff --git a/src/probe/handler/dns.ts b/src/probe/handler/dns.ts index 00719ac1..f4064e08 100644 --- a/src/probe/handler/dns.ts +++ b/src/probe/handler/dns.ts @@ -1,5 +1,14 @@ +import Joi from 'joi'; import type { Probe } from '../types.js'; +const schema = Joi.array().items(Joi.string()).required(); + export const handleDnsUpdate = (probe: Probe) => (list: string[]): void => { - probe.resolvers = list; + const validation = schema.validate(list); + + if (validation.error) { + throw validation.error; + } + + probe.resolvers = validation.value; }; diff --git a/src/probe/handler/ip-version.ts b/src/probe/handler/ip-version.ts index 82bd0342..0d7ded4f 100644 --- a/src/probe/handler/ip-version.ts +++ b/src/probe/handler/ip-version.ts @@ -1,9 +1,24 @@ +import Joi from 'joi'; import type { Probe } from '../types.js'; +const schema = Joi.boolean().required(); + export const handleIsIPv4SupportedUpdate = (probe: Probe) => (isIPv4Supported: boolean): void => { - probe.isIPv4Supported = isIPv4Supported; + const validation = schema.validate(isIPv4Supported); + + if (validation.error) { + throw validation.error; + } + + probe.isIPv4Supported = validation.value; }; export const handleIsIPv6SupportedUpdate = (probe: Probe) => (isIPv6Supported: boolean): void => { - probe.isIPv6Supported = isIPv6Supported; + const validation = schema.validate(isIPv6Supported); + + if (validation.error) { + throw validation.error; + } + + probe.isIPv6Supported = validation.value; }; diff --git a/src/probe/handler/stats.ts b/src/probe/handler/stats.ts index 39e8ab9d..6a34d79e 100644 --- a/src/probe/handler/stats.ts +++ b/src/probe/handler/stats.ts @@ -1,5 +1,23 @@ +import Joi from 'joi'; import type { Probe, ProbeStats } from '../types.js'; +const schema = Joi.object({ + cpu: Joi.object({ + load: Joi.array().items(Joi.object({ + usage: Joi.number().required(), + })).required(), + }).required(), + jobs: Joi.object({ + count: Joi.number().required(), + }).required(), +}).required(); + export const handleStatsReport = (probe: Probe) => (report: ProbeStats): void => { - probe.stats = report; + const validation = schema.validate(report); + + if (validation.error) { + throw validation.error; + } + + probe.stats = validation.value; }; diff --git a/src/probe/handler/status.ts b/src/probe/handler/status.ts index b7e2ea1d..f4fa64a7 100644 --- a/src/probe/handler/status.ts +++ b/src/probe/handler/status.ts @@ -1,10 +1,10 @@ import Joi from 'joi'; import type { Probe } from '../types.js'; -const schema = Joi.string().valid('initializing', 'ready', 'unbuffer-missing', 'ping-test-failed', 'sigterm'); +const schema = Joi.string().valid('initializing', 'ready', 'unbuffer-missing', 'ping-test-failed', 'sigterm').required(); -export const handleStatusUpdate = (probe: Probe) => async (input: unknown) => { - const validation = schema.validate(input); +export const handleStatusUpdate = (probe: Probe) => (status: Probe['status']) => { + const validation = schema.validate(status); if (validation.error) { throw validation.error;