diff --git a/.vscode/launch.json b/.vscode/launch.json index ded85c4..b0dfddb 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -74,6 +74,8 @@ ], "outputCapture": "std", "console": "internalConsole", + "runtimeVersion": "22", + "nodeVersionHint": 22, }, { "name": "Chunk parser prototyping", diff --git a/package-lock.json b/package-lock.json index 2f7bcb4..f33c041 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12,8 +12,8 @@ "@fastify/cors": "^9.0.1", "@fastify/swagger": "^8.15.0", "@fastify/type-provider-typebox": "^4.1.0", - "@hirosystems/api-toolkit": "^1.9.0", - "@hirosystems/salt-n-pepper-client": "^1.0.4-beta.1", + "@hirosystems/api-toolkit": "^1.9.1", + "@hirosystems/salt-n-pepper-client": "^1.1.1", "@noble/secp256k1": "^2.2.3", "@sinclair/typebox": "^0.28.17", "@stacks/transactions": "^7.0.6", @@ -52,6 +52,9 @@ "ts-node": "^10.9.2", "typescript": "^5.6.3", "typescript-eslint": "^8.13.0" + }, + "engines": { + "node": ">=22.0.0" } }, "client": { @@ -1227,9 +1230,9 @@ } }, "node_modules/@hirosystems/api-toolkit": { - "version": "1.9.0", - "resolved": "https://registry.npmjs.org/@hirosystems/api-toolkit/-/api-toolkit-1.9.0.tgz", - "integrity": "sha512-9SFStFtDohud0U0J8HPhUp5Br0La4BggqOPijUJgOSjmAE3mWK7SNOAcybbe7G3YP9bpf4QMWaxcwH5mAKjHpQ==", + "version": "1.9.1", + "resolved": "https://registry.npmjs.org/@hirosystems/api-toolkit/-/api-toolkit-1.9.1.tgz", + "integrity": "sha512-pV92h6c5tPnwE+kjro1U9fhhMrBSqsvGr6KGVYU66sJdzr9D3Id7vN9tET7GY+lzhPp97tHLGJ5k7NE2AZGbiQ==", "license": "Apache 2.0", "dependencies": { "@fastify/cors": "^8.0.0", @@ -1406,9 +1409,9 @@ } }, "node_modules/@hirosystems/salt-n-pepper-client": { - "version": "1.0.4-beta.1", - "resolved": "https://registry.npmjs.org/@hirosystems/salt-n-pepper-client/-/salt-n-pepper-client-1.0.4-beta.1.tgz", - "integrity": "sha512-znfzRNelipy2UbA0knCYo4S2hVipbzCcuKVUGaZTtO2iXIdUyQLj6ZbxMs8lZFwg+Bf0iY/CF5tfNCmPBSJSug==", + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/@hirosystems/salt-n-pepper-client/-/salt-n-pepper-client-1.1.1.tgz", + "integrity": "sha512-xQqHAX0xIrpGFD9Hc4sqbSZ862vE1yDUT67RFv6ok7hd+OFJUNfCrfr+nMQJZK8qSt3jlmZJt/5R38+D6pQvvw==", "license": "GPL-3.0-only", "dependencies": { "@hirosystems/api-toolkit": "^1.7.2", @@ -2134,9 +2137,9 @@ } }, "node_modules/@redis/client": { - "version": "1.6.0", - "resolved": "https://registry.npmjs.org/@redis/client/-/client-1.6.0.tgz", - "integrity": "sha512-aR0uffYI700OEEH4gYnitAnv3vzVGXCFvYfdpu/CJKvk4pHfLPEy/JSZyrpQ+15WhXe1yJRXLtfQ84s4mEXnPg==", + "version": "1.6.1", + "resolved": "https://registry.npmjs.org/@redis/client/-/client-1.6.1.tgz", + "integrity": "sha512-/KCsg3xSlR+nCK8/8ZYSknYxvXHwubJrU82F3Lm1Fp6789VQ0/3RJKfsmRXjqfaTA++23CvC3hqmqe/2GEt6Kw==", "license": "MIT", "dependencies": { "cluster-key-slot": "1.1.2", @@ -10183,16 +10186,16 @@ } }, "node_modules/redis": { - "version": "4.7.0", - "resolved": "https://registry.npmjs.org/redis/-/redis-4.7.0.tgz", - "integrity": "sha512-zvmkHEAdGMn+hMRXuMBtu4Vo5P6rHQjLoHftu+lBqq8ZTA3RCVC/WzD790bkKKiNFp7d5/9PcSD19fJyyRvOdQ==", + "version": "4.7.1", + "resolved": "https://registry.npmjs.org/redis/-/redis-4.7.1.tgz", + "integrity": "sha512-S1bJDnqLftzHXHP8JsT5II/CtHWQrASX5K96REjWjlmWKrviSOLWmM7QnRLstAWsu1VBBV1ffV6DzCvxNP0UJQ==", "license": "MIT", "workspaces": [ "./packages/*" ], "dependencies": { "@redis/bloom": "1.2.0", - "@redis/client": "1.6.0", + "@redis/client": "1.6.1", "@redis/graph": "1.1.1", "@redis/json": "1.0.7", "@redis/search": "1.2.0", diff --git a/package.json b/package.json index 1ddcb44..90ecc42 100644 --- a/package.json +++ b/package.json @@ -5,6 +5,9 @@ "main": "index.js", "author": "Hiro Systems PBC (https://hiro.so)", "license": "GPL-3.0", + "engines": { + "node": ">=22.0.0" + }, "scripts": { "build": "rimraf ./dist && tsc --project tsconfig.build.json", "build:tests": "tsc -p tsconfig.json --noEmit", @@ -56,8 +59,8 @@ "@fastify/cors": "^9.0.1", "@fastify/swagger": "^8.15.0", "@fastify/type-provider-typebox": "^4.1.0", - "@hirosystems/api-toolkit": "^1.9.0", - "@hirosystems/salt-n-pepper-client": "^1.0.4-beta.1", + "@hirosystems/api-toolkit": "^1.9.1", + "@hirosystems/salt-n-pepper-client": "^1.1.1", "@noble/secp256k1": "^2.2.3", "@sinclair/typebox": "^0.28.17", "@stacks/transactions": "^7.0.6", diff --git a/src/event-stream/event-stream.ts b/src/event-stream/event-stream.ts index f99e6cb..1510424 100644 --- a/src/event-stream/event-stream.ts +++ b/src/event-stream/event-stream.ts @@ -6,30 +6,19 @@ import { CoreNodeNakamotoBlockMessage, StackerDbChunk, } from './core-node-message'; -import { logger as defaultLogger, stopwatch } from '@hirosystems/api-toolkit'; +import { logger as defaultLogger, stopwatch, WorkerThreadManager } from '@hirosystems/api-toolkit'; import { ENV } from '../env'; -import { - ParsedNakamotoBlock, - ParsedStackerDbChunk, - parseNakamotoBlockMsg, - parseStackerDbChunk, -} from './msg-parsing'; +import { ParsedNakamotoBlock, ParsedStackerDbChunk } from './msg-parsing'; import { SignerMessagesEventPayload } from '../pg/types'; -import { ThreadedParser } from './threaded-parser'; import { SERVER_VERSION } from '@hirosystems/api-toolkit'; import { EventEmitter } from 'node:events'; - -// TODO: move this into the @hirosystems/salt-n-pepper-client lib -function sanitizeRedisClientName(value: string): string { - const nameSanitizer = /[^!-~]+/g; - return value.trim().replace(nameSanitizer, '-'); -} +import * as msgParserWorkerBlocks from './threaded-parser-worker'; export class EventStreamHandler { db: PgStore; logger = defaultLogger.child({ name: 'EventStreamHandler' }); eventStream: StacksEventStream; - threadedParser: ThreadedParser; + threadedParser = new WorkerThreadManager(msgParserWorkerBlocks); readonly events = new EventEmitter<{ processedMessage: [{ msgId: string }]; @@ -37,9 +26,7 @@ export class EventStreamHandler { constructor(opts: { db: PgStore; lastMessageId: string }) { this.db = opts.db; - const appName = sanitizeRedisClientName( - `signer-metrics-api ${SERVER_VERSION.tag} (${SERVER_VERSION.branch}:${SERVER_VERSION.commit})` - ); + const appName = `signer-metrics-api ${SERVER_VERSION.tag} (${SERVER_VERSION.branch}:${SERVER_VERSION.commit})`; this.eventStream = new StacksEventStream({ redisUrl: ENV.REDIS_URL, redisStreamPrefix: ENV.REDIS_STREAM_KEY_PREFIX, @@ -47,7 +34,6 @@ export class EventStreamHandler { lastMessageId: opts.lastMessageId, appName, }); - this.threadedParser = new ThreadedParser(); } async start() { @@ -71,8 +57,9 @@ export class EventStreamHandler { ); } if ('signer_signature_hash' in blockMsg) { - const parsed = await this.threadedParser.parseNakamotoBlock(nakamotoBlockMsg); - await this.handleNakamotoBlockMsg(messageId, parseInt(timestamp), parsed); + const parsed = await this.threadedParser.exec({ kind: 'block', msg: nakamotoBlockMsg }); + const result = parsed.result as ParsedNakamotoBlock; + await this.handleNakamotoBlockMsg(messageId, parseInt(timestamp), result); } else { // ignore pre-Nakamoto blocks } @@ -81,8 +68,9 @@ export class EventStreamHandler { case '/stackerdb_chunks': { const msg = body as StackerDbChunk; - const parsed = await this.threadedParser.parseStackerDbChunk(msg); - await this.handleStackerDbMsg(messageId, parseInt(timestamp), parsed); + const parsed = await this.threadedParser.exec({ kind: 'chunk', msg }); + const result = parsed.result as ParsedStackerDbChunk[]; + await this.handleStackerDbMsg(messageId, parseInt(timestamp), result); break; } diff --git a/src/event-stream/threaded-parser-worker.ts b/src/event-stream/threaded-parser-worker.ts index 6cd2445..2110e1f 100644 --- a/src/event-stream/threaded-parser-worker.ts +++ b/src/event-stream/threaded-parser-worker.ts @@ -1,92 +1,16 @@ -import * as WorkerThreads from 'node:worker_threads'; -import { CoreNodeNakamotoBlockMessage, StackerDbChunk } from './core-node-message'; -import { - ParsedNakamotoBlock, - ParsedStackerDbChunk, - parseNakamotoBlockMsg, - parseStackerDbChunk, -} from './msg-parsing'; - -export const workerFile = __filename; - -export enum ThreadedParserMsgType { - NakamotoBlock = 'NakamotoBlock', - StackerDbChunk = 'StackerDbChunk', -} - -interface ThreadMsg { - type: ThreadedParserMsgType; - msgId: number; -} - -export interface NakamotoBlockMsgRequest extends ThreadMsg { - type: ThreadedParserMsgType.NakamotoBlock; - msgId: number; - block: CoreNodeNakamotoBlockMessage; -} - -export interface NakamotoBlockMsgReply extends ThreadMsg { - type: ThreadedParserMsgType.NakamotoBlock; - msgId: number; - block: ParsedNakamotoBlock; -} - -export interface StackerDbChunkMsgRequest extends ThreadMsg { - type: ThreadedParserMsgType.StackerDbChunk; - msgId: number; - chunk: StackerDbChunk; -} - -export interface StackerDbChunkMsgReply extends ThreadMsg { - type: ThreadedParserMsgType.StackerDbChunk; - msgId: number; - chunk: ParsedStackerDbChunk[]; -} - -export type ThreadedParserMsgRequest = NakamotoBlockMsgRequest | StackerDbChunkMsgRequest; -export type ThreadedParserMsgReply = NakamotoBlockMsgReply | StackerDbChunkMsgReply; - -if (!WorkerThreads.isMainThread) { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const mainThreadPort = WorkerThreads.parentPort!; - mainThreadPort.on('messageerror', err => { - console.error(`Worker thread message error`, err); - }); - mainThreadPort.on('message', (msg: ThreadedParserMsgRequest) => { - try { - handleWorkerMsg(msg); - } catch (err) { - console.error(`Failed to parse message: ${JSON.stringify(msg)}`); - console.error(`Error handling message from main thread`, err); - } - }); -} - -function handleWorkerMsg(msg: ThreadedParserMsgRequest) { - let reply: ThreadedParserMsgReply; - switch (msg.type) { - case ThreadedParserMsgType.NakamotoBlock: { - reply = { - type: ThreadedParserMsgType.NakamotoBlock, - msgId: msg.msgId, - block: parseNakamotoBlockMsg(msg.block), - } satisfies NakamotoBlockMsgReply; - break; - } - case ThreadedParserMsgType.StackerDbChunk: { - reply = { - type: ThreadedParserMsgType.StackerDbChunk, - msgId: msg.msgId, - chunk: parseStackerDbChunk(msg.chunk), - } satisfies StackerDbChunkMsgReply; - break; - } - default: { - const _exhaustiveCheck: never = msg; - throw new Error(`Unhandled message type: ${msg}`); - } +import type { CoreNodeNakamotoBlockMessage, StackerDbChunk } from './core-node-message'; +import { parseNakamotoBlockMsg, parseStackerDbChunk } from './msg-parsing'; + +export function processTask( + args: + | { kind: 'block'; msg: CoreNodeNakamotoBlockMessage } + | { kind: 'chunk'; msg: StackerDbChunk } +) { + if (args.kind === 'block') { + return { kind: 'block', result: parseNakamotoBlockMsg(args.msg) }; + } else { + return { kind: 'chunk', result: parseStackerDbChunk(args.msg) }; } - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const mainThreadPort = WorkerThreads.parentPort!; - mainThreadPort.postMessage(reply); } + +export const workerModule = module; diff --git a/src/event-stream/threaded-parser.ts b/src/event-stream/threaded-parser.ts deleted file mode 100644 index bd954b9..0000000 --- a/src/event-stream/threaded-parser.ts +++ /dev/null @@ -1,82 +0,0 @@ -import * as WorkerThreads from 'node:worker_threads'; -import * as path from 'node:path'; -import { waiter, Waiter, logger as defaultLogger } from '@hirosystems/api-toolkit'; -import { CoreNodeNakamotoBlockMessage, StackerDbChunk } from './core-node-message'; -import { ParsedNakamotoBlock, ParsedStackerDbChunk } from './msg-parsing'; -import { - NakamotoBlockMsgReply, - NakamotoBlockMsgRequest, - StackerDbChunkMsgReply, - StackerDbChunkMsgRequest, - ThreadedParserMsgReply, - ThreadedParserMsgType, - workerFile, -} from './threaded-parser-worker'; - -export class ThreadedParser { - private readonly worker: WorkerThreads.Worker; - private readonly msgRequests: Map> = new Map(); - private readonly logger = defaultLogger.child({ module: 'ThreadedParser' }); - private lastMsgId = 0; - - constructor() { - if (!WorkerThreads.isMainThread) { - throw new Error('ThreadedParser must be instantiated in the main thread'); - } - const workerOpt: WorkerThreads.WorkerOptions = {}; - if (path.extname(workerFile) === '.ts') { - if (process.env.NODE_ENV !== 'test') { - throw new Error( - 'Worker threads are being created with ts-node outside of a test environment' - ); - } - workerOpt.execArgv = ['-r', 'ts-node/register/transpile-only']; - } - this.worker = new WorkerThreads.Worker(workerFile, workerOpt); - this.worker.on('error', err => { - this.logger.error(err, 'Worker error'); - }); - this.worker.on('messageerror', err => { - this.logger.error(err, 'Worker message error'); - }); - this.worker.on('message', (msg: ThreadedParserMsgReply) => { - const waiter = this.msgRequests.get(msg.msgId); - if (waiter) { - waiter.finish(msg); - this.msgRequests.delete(msg.msgId); - } else { - this.logger.warn('Received unexpected message from worker', msg); - } - }); - } - - async parseNakamotoBlock(block: CoreNodeNakamotoBlockMessage): Promise { - const replyWaiter = waiter(); - const msg: NakamotoBlockMsgRequest = { - type: ThreadedParserMsgType.NakamotoBlock, - msgId: this.lastMsgId++, - block, - }; - this.msgRequests.set(msg.msgId, replyWaiter as Waiter); - this.worker.postMessage(msg); - const reply = await replyWaiter; - return reply.block; - } - - async parseStackerDbChunk(chunk: StackerDbChunk): Promise { - const replyWaiter = waiter(); - const msg: StackerDbChunkMsgRequest = { - type: ThreadedParserMsgType.StackerDbChunk, - msgId: this.lastMsgId++, - chunk, - }; - this.msgRequests.set(msg.msgId, replyWaiter as Waiter); - this.worker.postMessage(msg); - const reply = await replyWaiter; - return reply.chunk; - } - - async close() { - await this.worker.terminate(); - } -} diff --git a/src/pg/pg-store.ts b/src/pg/pg-store.ts index 5995143..77f7520 100644 --- a/src/pg/pg-store.ts +++ b/src/pg/pg-store.ts @@ -10,7 +10,6 @@ import { import * as path from 'path'; import { PgWriteStore } from './ingestion/pg-write-store'; import { BlockIdParam, normalizeHexString, sleep } from '../helpers'; -import { Fragment } from 'postgres'; import { DbBlockProposalQueryResponse } from './types'; import { NotificationPgStore } from './notifications/pg-notifications'; @@ -491,20 +490,18 @@ export class PgStore extends BasePgStore { } async getSignerDataForBlock({ sql, blockId }: { sql: PgSqlClient; blockId: BlockIdParam }) { - let blockFilter: Fragment; - switch (blockId.type) { - case 'height': - blockFilter = sql`block_height = ${blockId.height}`; - break; - case 'hash': - blockFilter = sql`block_hash = ${normalizeHexString(blockId.hash)}`; - break; - case 'latest': - blockFilter = sql`block_height = (SELECT block_height FROM chain_tip)`; - break; - default: - throw new Error(`Invalid blockId type: ${blockId}`); - } + const blockFilter = (() => { + switch (blockId.type) { + case 'height': + return sql`block_height = ${blockId.height}`; + case 'hash': + return sql`block_hash = ${normalizeHexString(blockId.hash)}`; + case 'latest': + return sql`block_height = (SELECT block_height FROM chain_tip)`; + default: + throw new Error(`Invalid blockId type: ${blockId}`); + } + })(); const result = await sql< { diff --git a/tests/db/jest-global-setup.ts b/tests/db/jest-global-setup.ts index 77a7778..8118cec 100644 --- a/tests/db/jest-global-setup.ts +++ b/tests/db/jest-global-setup.ts @@ -36,9 +36,6 @@ async function pruneContainers(docker: Docker, label: string) { const containers = await docker.listContainers({ all: true, filters: { label: [label] } }); for (const container of containers) { const c = docker.getContainer(container.Id); - if (container.State !== 'exited') { - await c.stop().catch(_err => {}); - } await c.remove({ v: true, force: true }); } await docker.pruneContainers({ filters: { label: [label] } }); @@ -83,10 +80,11 @@ async function startContainer(args: { console.log(`${image} container started on ports ${JSON.stringify(ports)}`); - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - const containerIds = ((globalThis as any).__TEST_DOCKER_CONTAINER_IDS as string[]) ?? []; - containerIds.push(container.id); - Object.assign(globalThis, { __TEST_DOCKER_CONTAINER_IDS: containerIds }); + const containers: { id: string; image: string }[] = + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + (globalThis as any).__TEST_DOCKER_CONTAINERS ?? []; + containers.push({ id: container.id, image }); + Object.assign(globalThis, { __TEST_DOCKER_CONTAINERS: containers }); return { image, containerId: container.id }; } catch (error) { @@ -126,6 +124,7 @@ async function waitForPostgres(): Promise { }, }); await sql`SELECT 1`; + await sql.end(); console.log('Postgres is ready'); } diff --git a/tests/db/jest-global-teardown.ts b/tests/db/jest-global-teardown.ts index 05a2484..60490ac 100644 --- a/tests/db/jest-global-teardown.ts +++ b/tests/db/jest-global-teardown.ts @@ -2,20 +2,14 @@ import * as Docker from 'dockerode'; // Jest global teardown to stop and remove the container export default async function teardown(): Promise { - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - const containerIds = ((globalThis as any).__TEST_DOCKER_CONTAINER_IDS as string[]) ?? []; - for (const containerId of containerIds) { - console.log(`Stopping and removing container ${containerId}...`); + const containers: { id: string; image: string }[] = + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + (globalThis as any).__TEST_DOCKER_CONTAINERS ?? []; + for (const { id, image } of containers) { + console.log(`Stopping and removing container ${image} - ${id}...`); const docker = new Docker(); - const container = docker.getContainer(containerId); - const info = await container - .inspect() - .then(i => i.Image) - .catch(() => '?'); - await container.stop().catch(error => { - console.error(`Failed to stop container ${containerId}: ${error}`); - }); + const container = docker.getContainer(id); await container.remove({ v: true, force: true }); - console.log(`Test docker container ${info} ${containerId} stopped and removed`); + console.log(`Test docker container ${image} ${id} stopped and removed`); } }