|
1 | 1 | import type { Row } from '@clickhouse/client';
|
| 2 | +import { ClickhouseClient } from '@hyperdx/common-utils/dist/clickhouse'; |
| 3 | +import { getMetadata } from '@hyperdx/common-utils/dist/metadata'; |
| 4 | +import { renderChartConfig } from '@hyperdx/common-utils/dist/renderChartConfig'; |
2 | 5 | import opentelemetry, { SpanStatusCode } from '@opentelemetry/api';
|
3 | 6 | import express from 'express';
|
4 |
| -import { isNumber, parseInt } from 'lodash'; |
| 7 | +import { parseInt } from 'lodash'; |
5 | 8 | import { serializeError } from 'serialize-error';
|
| 9 | +import { z } from 'zod'; |
| 10 | +import { validateRequest } from 'zod-express-middleware'; |
6 | 11 |
|
7 |
| -import * as clickhouse from '@/clickhouse'; |
8 |
| -import { getTeam } from '@/controllers/team'; |
| 12 | +import { getConnectionById } from '@/controllers/connection'; |
| 13 | +import { getNonNullUserWithTeam } from '@/middleware/auth'; |
| 14 | +import { Source } from '@/models/source'; |
9 | 15 | import logger from '@/utils/logger';
|
| 16 | +import { objectIdSchema } from '@/utils/zod'; |
10 | 17 |
|
11 | 18 | const router = express.Router();
|
12 | 19 |
|
13 |
| -router.get('/', async (req, res, next) => { |
14 |
| - try { |
15 |
| - const teamId = req.user?.team; |
16 |
| - const { endTime, q, startTime } = req.query; |
17 |
| - if (teamId == null) { |
18 |
| - return res.sendStatus(403); |
19 |
| - } |
20 |
| - const startTimeNum = parseInt(startTime as string); |
21 |
| - const endTimeNum = parseInt(endTime as string); |
22 |
| - if (!isNumber(startTimeNum) || !isNumber(endTimeNum)) { |
23 |
| - return res.sendStatus(400); |
24 |
| - } |
| 20 | +router.get( |
| 21 | + '/:sessionId/rrweb', |
| 22 | + validateRequest({ |
| 23 | + params: z.object({ |
| 24 | + sessionId: z.string(), |
| 25 | + }), |
| 26 | + query: z.object({ |
| 27 | + sourceId: objectIdSchema, |
| 28 | + startTime: z.string().regex(/^\d+$/, 'Must be an integer string'), |
| 29 | + endTime: z.string().regex(/^\d+$/, 'Must be an integer string'), |
| 30 | + limit: z.string().regex(/^\d+$/, 'Must be an integer string'), |
| 31 | + offset: z.string().regex(/^\d+$/, 'Must be an integer string'), |
| 32 | + }), |
| 33 | + }), |
| 34 | + async (req, res, next) => { |
| 35 | + try { |
| 36 | + const { sessionId } = req.params; |
| 37 | + const { endTime, sourceId, limit, offset, startTime } = req.query; |
25 | 38 |
|
26 |
| - const team = await getTeam(teamId); |
27 |
| - if (team == null) { |
28 |
| - return res.sendStatus(403); |
29 |
| - } |
| 39 | + const { teamId } = getNonNullUserWithTeam(req); |
30 | 40 |
|
31 |
| - res.json( |
32 |
| - await clickhouse.getSessions({ |
33 |
| - endTime: endTimeNum, |
34 |
| - limit: 500, // fixed limit for now |
35 |
| - offset: 0, // fixed offset for now |
36 |
| - q: q as string, |
37 |
| - startTime: startTimeNum, |
38 |
| - tableVersion: team.logStreamTableVersion, |
39 |
| - teamId: teamId.toString(), |
40 |
| - }), |
41 |
| - ); |
42 |
| - } catch (e) { |
43 |
| - const span = opentelemetry.trace.getActiveSpan(); |
44 |
| - span?.recordException(e as Error); |
45 |
| - span?.setStatus({ code: SpanStatusCode.ERROR }); |
46 |
| - |
47 |
| - next(e); |
48 |
| - } |
49 |
| -}); |
50 |
| - |
51 |
| -router.get('/:sessionId/rrweb', async (req, res, next) => { |
52 |
| - try { |
53 |
| - const teamId = req.user?.team; |
54 |
| - const { sessionId } = req.params; |
55 |
| - const { endTime, limit, offset, startTime } = req.query; |
56 |
| - if (teamId == null) { |
57 |
| - return res.sendStatus(403); |
58 |
| - } |
59 |
| - const startTimeNum = parseInt(startTime as string); |
60 |
| - const endTimeNum = parseInt(endTime as string); |
61 |
| - const limitNum = parseInt(limit as string); |
62 |
| - const offsetNum = parseInt(offset as string); |
63 |
| - if ( |
64 |
| - !isNumber(startTimeNum) || |
65 |
| - !isNumber(endTimeNum) || |
66 |
| - !isNumber(limitNum) || |
67 |
| - !isNumber(offsetNum) |
68 |
| - ) { |
69 |
| - return res.sendStatus(400); |
70 |
| - } |
| 41 | + const source = await Source.findById(sourceId); |
| 42 | + |
| 43 | + if (!source) { |
| 44 | + res.status(404).send('Source not found'); |
| 45 | + return; |
| 46 | + } |
| 47 | + |
| 48 | + const connection = await getConnectionById( |
| 49 | + teamId.toString(), |
| 50 | + source.connection.toString(), |
| 51 | + true, |
| 52 | + ); |
| 53 | + |
| 54 | + if (!connection) { |
| 55 | + res.status(404).send('Connection not found'); |
| 56 | + return; |
| 57 | + } |
71 | 58 |
|
72 |
| - const MAX_LIMIT = 1e6; |
73 |
| - |
74 |
| - res.setHeader('Cache-Control', 'no-cache'); |
75 |
| - res.setHeader('Content-Type', 'text/event-stream'); |
76 |
| - res.setHeader('Connection', 'keep-alive'); |
77 |
| - res.flushHeaders(); // flush the headers to establish SSE with client |
78 |
| - |
79 |
| - const stream = await clickhouse.getRrwebEvents({ |
80 |
| - sessionId: sessionId as string, |
81 |
| - startTime: startTimeNum, |
82 |
| - endTime: endTimeNum, |
83 |
| - limit: Math.min(MAX_LIMIT, limitNum), |
84 |
| - offset: offsetNum, |
85 |
| - }); |
86 |
| - |
87 |
| - stream.on('data', (rows: Row[]) => { |
88 |
| - res.write(`${rows.map(row => `data: ${row.text}`).join('\n')}\n\n`); |
89 |
| - res.flush(); |
90 |
| - }); |
91 |
| - stream.on('end', () => { |
92 |
| - logger.info('Stream ended'); |
93 |
| - |
94 |
| - res.write('event: end\ndata:\n\n'); |
| 59 | + const MAX_LIMIT = 1e6; |
| 60 | + |
| 61 | + res.setHeader('Cache-Control', 'no-cache'); |
| 62 | + res.setHeader('Content-Type', 'text/event-stream'); |
| 63 | + res.setHeader('Connection', 'keep-alive'); |
| 64 | + res.flushHeaders(); // flush the headers to establish SSE with client |
| 65 | + |
| 66 | + const clickhouseClient = new ClickhouseClient({ |
| 67 | + host: connection.host, |
| 68 | + username: connection.username, |
| 69 | + password: connection.password, |
| 70 | + }); |
| 71 | + |
| 72 | + const metadata = getMetadata(clickhouseClient); |
| 73 | + const query = await renderChartConfig( |
| 74 | + { |
| 75 | + // FIXME: add mappings to session source |
| 76 | + select: [ |
| 77 | + { |
| 78 | + valueExpression: `${source.implicitColumnExpression}`, |
| 79 | + alias: 'b', |
| 80 | + }, |
| 81 | + { |
| 82 | + valueExpression: `JSONExtractRaw(${source.implicitColumnExpression}, CAST('type', 'String'))`, |
| 83 | + alias: 't', |
| 84 | + }, |
| 85 | + { |
| 86 | + valueExpression: `${source.eventAttributesExpression}['rr-web.chunk']`, |
| 87 | + alias: 'ck', |
| 88 | + }, |
| 89 | + { |
| 90 | + valueExpression: `${source.eventAttributesExpression}['rr-web.total-chunks']`, |
| 91 | + alias: 'tcks', |
| 92 | + }, |
| 93 | + ], |
| 94 | + dateRange: [ |
| 95 | + new Date(parseInt(startTime)), |
| 96 | + new Date(parseInt(endTime)), |
| 97 | + ], |
| 98 | + from: source.from, |
| 99 | + whereLanguage: 'lucene', |
| 100 | + where: `${source.resourceAttributesExpression}.rum.sessionId:"${sessionId}"`, |
| 101 | + timestampValueExpression: source.timestampValueExpression, |
| 102 | + implicitColumnExpression: source.implicitColumnExpression, |
| 103 | + connection: connection.id, |
| 104 | + orderBy: `${source.timestampValueExpression}, ck ASC`, |
| 105 | + limit: { |
| 106 | + limit: Math.min(MAX_LIMIT, parseInt(limit)), |
| 107 | + offset: parseInt(offset), |
| 108 | + }, |
| 109 | + }, |
| 110 | + metadata, |
| 111 | + ); |
| 112 | + |
| 113 | + const resultSet = await clickhouseClient.query({ |
| 114 | + query: query.sql, |
| 115 | + query_params: query.params, |
| 116 | + format: 'JSONEachRow', |
| 117 | + clickhouse_settings: { |
| 118 | + wait_end_of_query: 0, |
| 119 | + send_progress_in_http_headers: 0, |
| 120 | + }, |
| 121 | + }); |
| 122 | + const stream = resultSet.stream(); |
| 123 | + |
| 124 | + stream.on('data', (rows: Row[]) => { |
| 125 | + res.write(`${rows.map(row => `data: ${row.text}`).join('\n')}\n\n`); |
| 126 | + res.flush(); |
| 127 | + }); |
| 128 | + stream.on('end', () => { |
| 129 | + logger.info('Stream ended'); |
| 130 | + |
| 131 | + res.write('event: end\ndata:\n\n'); |
| 132 | + res.end(); |
| 133 | + }); |
| 134 | + } catch (e) { |
| 135 | + const span = opentelemetry.trace.getActiveSpan(); |
| 136 | + span?.recordException(e as Error); |
| 137 | + span?.setStatus({ code: SpanStatusCode.ERROR }); |
| 138 | + // WARNING: no need to call next(e) here, as the stream will be closed |
| 139 | + logger.error({ |
| 140 | + message: 'Error while streaming rrweb events', |
| 141 | + error: serializeError(e), |
| 142 | + teamId: req.user?.team, |
| 143 | + query: req.query, |
| 144 | + }); |
95 | 145 | res.end();
|
96 |
| - }); |
97 |
| - } catch (e) { |
98 |
| - const span = opentelemetry.trace.getActiveSpan(); |
99 |
| - span?.recordException(e as Error); |
100 |
| - span?.setStatus({ code: SpanStatusCode.ERROR }); |
101 |
| - // WARNING: no need to call next(e) here, as the stream will be closed |
102 |
| - logger.error({ |
103 |
| - message: 'Error while streaming rrweb events', |
104 |
| - error: serializeError(e), |
105 |
| - teamId: req.user?.team, |
106 |
| - query: req.query, |
107 |
| - }); |
108 |
| - res.end(); |
109 |
| - } |
110 |
| -}); |
| 146 | + } |
| 147 | + }, |
| 148 | +); |
111 | 149 |
|
112 | 150 | export default router;
|
0 commit comments