From e38a41a81eb9550c8acffe87cdffa5cf23892724 Mon Sep 17 00:00:00 2001 From: Matthew Little Date: Wed, 4 Jun 2025 16:14:18 +0700 Subject: [PATCH 1/4] chore: use threaded worker util from toolkit for msg parsing --- .vscode/launch.json | 2 + chunk-parser/src/recover-slot-pubkey.ts | 13 ++- package-lock.json | 8 +- package.json | 2 +- src/event-stream/event-stream.ts | 33 +++---- src/event-stream/msg-parser-worker-blocks.ts | 8 ++ .../msg-parser-worker-stackerdb.ts | 7 ++ src/event-stream/threaded-parser-worker.ts | 92 ------------------- src/event-stream/threaded-parser.ts | 82 ----------------- src/pg/pg-store.ts | 27 +++--- 10 files changed, 57 insertions(+), 217 deletions(-) create mode 100644 src/event-stream/msg-parser-worker-blocks.ts create mode 100644 src/event-stream/msg-parser-worker-stackerdb.ts delete mode 100644 src/event-stream/threaded-parser-worker.ts delete mode 100644 src/event-stream/threaded-parser.ts diff --git a/.vscode/launch.json b/.vscode/launch.json index ded85c4..b0dfddb 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -74,6 +74,8 @@ ], "outputCapture": "std", "console": "internalConsole", + "runtimeVersion": "22", + "nodeVersionHint": 22, }, { "name": "Chunk parser prototyping", diff --git a/chunk-parser/src/recover-slot-pubkey.ts b/chunk-parser/src/recover-slot-pubkey.ts index d9e4cb7..5bf3fdf 100644 --- a/chunk-parser/src/recover-slot-pubkey.ts +++ b/chunk-parser/src/recover-slot-pubkey.ts @@ -1,7 +1,18 @@ import crypto from 'node:crypto'; -import * as secp from '@noble/secp256k1'; +// import * as secp from '@noble/secp256k1'; import { ModifiedSlot, NewNakamotoBlockMessage, toU32BeBytes } from './common'; +let secp: typeof import('@noble/secp256k1'); + +async function getSecp() { + if (!secp) secp = await import('@noble/secp256k1'); + return secp; +} +getSecp().catch(error => { + console.error(`Failed to load secp256k1: ${error}`, error); + throw error; +}); + /** Get the digest to sign that authenticates this chunk data and metadata */ function authDigest(slot: ModifiedSlot): Buffer { const hasher = crypto.createHash('sha512-256'); diff --git a/package-lock.json b/package-lock.json index 2f7bcb4..2b449fb 100644 --- a/package-lock.json +++ b/package-lock.json @@ -13,7 +13,7 @@ "@fastify/swagger": "^8.15.0", "@fastify/type-provider-typebox": "^4.1.0", "@hirosystems/api-toolkit": "^1.9.0", - "@hirosystems/salt-n-pepper-client": "^1.0.4-beta.1", + "@hirosystems/salt-n-pepper-client": "^1.1.1", "@noble/secp256k1": "^2.2.3", "@sinclair/typebox": "^0.28.17", "@stacks/transactions": "^7.0.6", @@ -1406,9 +1406,9 @@ } }, "node_modules/@hirosystems/salt-n-pepper-client": { - "version": "1.0.4-beta.1", - "resolved": "https://registry.npmjs.org/@hirosystems/salt-n-pepper-client/-/salt-n-pepper-client-1.0.4-beta.1.tgz", - "integrity": "sha512-znfzRNelipy2UbA0knCYo4S2hVipbzCcuKVUGaZTtO2iXIdUyQLj6ZbxMs8lZFwg+Bf0iY/CF5tfNCmPBSJSug==", + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/@hirosystems/salt-n-pepper-client/-/salt-n-pepper-client-1.1.1.tgz", + "integrity": "sha512-xQqHAX0xIrpGFD9Hc4sqbSZ862vE1yDUT67RFv6ok7hd+OFJUNfCrfr+nMQJZK8qSt3jlmZJt/5R38+D6pQvvw==", "license": "GPL-3.0-only", "dependencies": { "@hirosystems/api-toolkit": "^1.7.2", diff --git a/package.json b/package.json index 1ddcb44..6d388e0 100644 --- a/package.json +++ b/package.json @@ -57,7 +57,7 @@ "@fastify/swagger": "^8.15.0", "@fastify/type-provider-typebox": "^4.1.0", "@hirosystems/api-toolkit": "^1.9.0", - "@hirosystems/salt-n-pepper-client": "^1.0.4-beta.1", + "@hirosystems/salt-n-pepper-client": "^1.1.1", "@noble/secp256k1": "^2.2.3", "@sinclair/typebox": "^0.28.17", "@stacks/transactions": "^7.0.6", diff --git a/src/event-stream/event-stream.ts b/src/event-stream/event-stream.ts index f99e6cb..3674bbc 100644 --- a/src/event-stream/event-stream.ts +++ b/src/event-stream/event-stream.ts @@ -6,30 +6,21 @@ import { CoreNodeNakamotoBlockMessage, StackerDbChunk, } from './core-node-message'; -import { logger as defaultLogger, stopwatch } from '@hirosystems/api-toolkit'; +import { logger as defaultLogger, stopwatch, WorkerThreadManager } from '@hirosystems/api-toolkit'; import { ENV } from '../env'; -import { - ParsedNakamotoBlock, - ParsedStackerDbChunk, - parseNakamotoBlockMsg, - parseStackerDbChunk, -} from './msg-parsing'; +import { ParsedNakamotoBlock, ParsedStackerDbChunk } from './msg-parsing'; import { SignerMessagesEventPayload } from '../pg/types'; -import { ThreadedParser } from './threaded-parser'; import { SERVER_VERSION } from '@hirosystems/api-toolkit'; import { EventEmitter } from 'node:events'; - -// TODO: move this into the @hirosystems/salt-n-pepper-client lib -function sanitizeRedisClientName(value: string): string { - const nameSanitizer = /[^!-~]+/g; - return value.trim().replace(nameSanitizer, '-'); -} +import * as msgParserWorkerBlocks from './msg-parser-worker-blocks'; +import * as msgParserWorkerStackerDb from './msg-parser-worker-stackerdb'; export class EventStreamHandler { db: PgStore; logger = defaultLogger.child({ name: 'EventStreamHandler' }); eventStream: StacksEventStream; - threadedParser: ThreadedParser; + threadedParserBlocks = new WorkerThreadManager(msgParserWorkerBlocks); + threadedParserStackerDb = new WorkerThreadManager(msgParserWorkerStackerDb); readonly events = new EventEmitter<{ processedMessage: [{ msgId: string }]; @@ -37,9 +28,7 @@ export class EventStreamHandler { constructor(opts: { db: PgStore; lastMessageId: string }) { this.db = opts.db; - const appName = sanitizeRedisClientName( - `signer-metrics-api ${SERVER_VERSION.tag} (${SERVER_VERSION.branch}:${SERVER_VERSION.commit})` - ); + const appName = `signer-metrics-api ${SERVER_VERSION.tag} (${SERVER_VERSION.branch}:${SERVER_VERSION.commit})`; this.eventStream = new StacksEventStream({ redisUrl: ENV.REDIS_URL, redisStreamPrefix: ENV.REDIS_STREAM_KEY_PREFIX, @@ -47,7 +36,6 @@ export class EventStreamHandler { lastMessageId: opts.lastMessageId, appName, }); - this.threadedParser = new ThreadedParser(); } async start() { @@ -71,7 +59,7 @@ export class EventStreamHandler { ); } if ('signer_signature_hash' in blockMsg) { - const parsed = await this.threadedParser.parseNakamotoBlock(nakamotoBlockMsg); + const parsed = await this.threadedParserBlocks.exec(nakamotoBlockMsg); await this.handleNakamotoBlockMsg(messageId, parseInt(timestamp), parsed); } else { // ignore pre-Nakamoto blocks @@ -81,7 +69,7 @@ export class EventStreamHandler { case '/stackerdb_chunks': { const msg = body as StackerDbChunk; - const parsed = await this.threadedParser.parseStackerDbChunk(msg); + const parsed = await this.threadedParserStackerDb.exec(msg); await this.handleStackerDbMsg(messageId, parseInt(timestamp), parsed); break; } @@ -109,7 +97,8 @@ export class EventStreamHandler { async stop(): Promise { await this.eventStream.stop(); - await this.threadedParser.close(); + await this.threadedParserBlocks.close(); + await this.threadedParserStackerDb.close(); } async handleStackerDbMsg( diff --git a/src/event-stream/msg-parser-worker-blocks.ts b/src/event-stream/msg-parser-worker-blocks.ts new file mode 100644 index 0000000..8ff0905 --- /dev/null +++ b/src/event-stream/msg-parser-worker-blocks.ts @@ -0,0 +1,8 @@ +import type { CoreNodeNakamotoBlockMessage } from './core-node-message'; +import { ParsedNakamotoBlock, parseNakamotoBlockMsg } from './msg-parsing'; + +export function processTask(msg: CoreNodeNakamotoBlockMessage): ParsedNakamotoBlock { + return parseNakamotoBlockMsg(msg); +} + +export const workerModule = module; diff --git a/src/event-stream/msg-parser-worker-stackerdb.ts b/src/event-stream/msg-parser-worker-stackerdb.ts new file mode 100644 index 0000000..d97f2c1 --- /dev/null +++ b/src/event-stream/msg-parser-worker-stackerdb.ts @@ -0,0 +1,7 @@ +import type { StackerDbChunk } from './core-node-message'; +import { parseStackerDbChunk } from './msg-parsing'; + +export function processTask(msg: StackerDbChunk) { + return parseStackerDbChunk(msg); +} +export const workerModule = module; diff --git a/src/event-stream/threaded-parser-worker.ts b/src/event-stream/threaded-parser-worker.ts deleted file mode 100644 index 6cd2445..0000000 --- a/src/event-stream/threaded-parser-worker.ts +++ /dev/null @@ -1,92 +0,0 @@ -import * as WorkerThreads from 'node:worker_threads'; -import { CoreNodeNakamotoBlockMessage, StackerDbChunk } from './core-node-message'; -import { - ParsedNakamotoBlock, - ParsedStackerDbChunk, - parseNakamotoBlockMsg, - parseStackerDbChunk, -} from './msg-parsing'; - -export const workerFile = __filename; - -export enum ThreadedParserMsgType { - NakamotoBlock = 'NakamotoBlock', - StackerDbChunk = 'StackerDbChunk', -} - -interface ThreadMsg { - type: ThreadedParserMsgType; - msgId: number; -} - -export interface NakamotoBlockMsgRequest extends ThreadMsg { - type: ThreadedParserMsgType.NakamotoBlock; - msgId: number; - block: CoreNodeNakamotoBlockMessage; -} - -export interface NakamotoBlockMsgReply extends ThreadMsg { - type: ThreadedParserMsgType.NakamotoBlock; - msgId: number; - block: ParsedNakamotoBlock; -} - -export interface StackerDbChunkMsgRequest extends ThreadMsg { - type: ThreadedParserMsgType.StackerDbChunk; - msgId: number; - chunk: StackerDbChunk; -} - -export interface StackerDbChunkMsgReply extends ThreadMsg { - type: ThreadedParserMsgType.StackerDbChunk; - msgId: number; - chunk: ParsedStackerDbChunk[]; -} - -export type ThreadedParserMsgRequest = NakamotoBlockMsgRequest | StackerDbChunkMsgRequest; -export type ThreadedParserMsgReply = NakamotoBlockMsgReply | StackerDbChunkMsgReply; - -if (!WorkerThreads.isMainThread) { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const mainThreadPort = WorkerThreads.parentPort!; - mainThreadPort.on('messageerror', err => { - console.error(`Worker thread message error`, err); - }); - mainThreadPort.on('message', (msg: ThreadedParserMsgRequest) => { - try { - handleWorkerMsg(msg); - } catch (err) { - console.error(`Failed to parse message: ${JSON.stringify(msg)}`); - console.error(`Error handling message from main thread`, err); - } - }); -} - -function handleWorkerMsg(msg: ThreadedParserMsgRequest) { - let reply: ThreadedParserMsgReply; - switch (msg.type) { - case ThreadedParserMsgType.NakamotoBlock: { - reply = { - type: ThreadedParserMsgType.NakamotoBlock, - msgId: msg.msgId, - block: parseNakamotoBlockMsg(msg.block), - } satisfies NakamotoBlockMsgReply; - break; - } - case ThreadedParserMsgType.StackerDbChunk: { - reply = { - type: ThreadedParserMsgType.StackerDbChunk, - msgId: msg.msgId, - chunk: parseStackerDbChunk(msg.chunk), - } satisfies StackerDbChunkMsgReply; - break; - } - default: { - const _exhaustiveCheck: never = msg; - throw new Error(`Unhandled message type: ${msg}`); - } - } - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const mainThreadPort = WorkerThreads.parentPort!; - mainThreadPort.postMessage(reply); -} diff --git a/src/event-stream/threaded-parser.ts b/src/event-stream/threaded-parser.ts deleted file mode 100644 index bd954b9..0000000 --- a/src/event-stream/threaded-parser.ts +++ /dev/null @@ -1,82 +0,0 @@ -import * as WorkerThreads from 'node:worker_threads'; -import * as path from 'node:path'; -import { waiter, Waiter, logger as defaultLogger } from '@hirosystems/api-toolkit'; -import { CoreNodeNakamotoBlockMessage, StackerDbChunk } from './core-node-message'; -import { ParsedNakamotoBlock, ParsedStackerDbChunk } from './msg-parsing'; -import { - NakamotoBlockMsgReply, - NakamotoBlockMsgRequest, - StackerDbChunkMsgReply, - StackerDbChunkMsgRequest, - ThreadedParserMsgReply, - ThreadedParserMsgType, - workerFile, -} from './threaded-parser-worker'; - -export class ThreadedParser { - private readonly worker: WorkerThreads.Worker; - private readonly msgRequests: Map> = new Map(); - private readonly logger = defaultLogger.child({ module: 'ThreadedParser' }); - private lastMsgId = 0; - - constructor() { - if (!WorkerThreads.isMainThread) { - throw new Error('ThreadedParser must be instantiated in the main thread'); - } - const workerOpt: WorkerThreads.WorkerOptions = {}; - if (path.extname(workerFile) === '.ts') { - if (process.env.NODE_ENV !== 'test') { - throw new Error( - 'Worker threads are being created with ts-node outside of a test environment' - ); - } - workerOpt.execArgv = ['-r', 'ts-node/register/transpile-only']; - } - this.worker = new WorkerThreads.Worker(workerFile, workerOpt); - this.worker.on('error', err => { - this.logger.error(err, 'Worker error'); - }); - this.worker.on('messageerror', err => { - this.logger.error(err, 'Worker message error'); - }); - this.worker.on('message', (msg: ThreadedParserMsgReply) => { - const waiter = this.msgRequests.get(msg.msgId); - if (waiter) { - waiter.finish(msg); - this.msgRequests.delete(msg.msgId); - } else { - this.logger.warn('Received unexpected message from worker', msg); - } - }); - } - - async parseNakamotoBlock(block: CoreNodeNakamotoBlockMessage): Promise { - const replyWaiter = waiter(); - const msg: NakamotoBlockMsgRequest = { - type: ThreadedParserMsgType.NakamotoBlock, - msgId: this.lastMsgId++, - block, - }; - this.msgRequests.set(msg.msgId, replyWaiter as Waiter); - this.worker.postMessage(msg); - const reply = await replyWaiter; - return reply.block; - } - - async parseStackerDbChunk(chunk: StackerDbChunk): Promise { - const replyWaiter = waiter(); - const msg: StackerDbChunkMsgRequest = { - type: ThreadedParserMsgType.StackerDbChunk, - msgId: this.lastMsgId++, - chunk, - }; - this.msgRequests.set(msg.msgId, replyWaiter as Waiter); - this.worker.postMessage(msg); - const reply = await replyWaiter; - return reply.chunk; - } - - async close() { - await this.worker.terminate(); - } -} diff --git a/src/pg/pg-store.ts b/src/pg/pg-store.ts index 5995143..77f7520 100644 --- a/src/pg/pg-store.ts +++ b/src/pg/pg-store.ts @@ -10,7 +10,6 @@ import { import * as path from 'path'; import { PgWriteStore } from './ingestion/pg-write-store'; import { BlockIdParam, normalizeHexString, sleep } from '../helpers'; -import { Fragment } from 'postgres'; import { DbBlockProposalQueryResponse } from './types'; import { NotificationPgStore } from './notifications/pg-notifications'; @@ -491,20 +490,18 @@ export class PgStore extends BasePgStore { } async getSignerDataForBlock({ sql, blockId }: { sql: PgSqlClient; blockId: BlockIdParam }) { - let blockFilter: Fragment; - switch (blockId.type) { - case 'height': - blockFilter = sql`block_height = ${blockId.height}`; - break; - case 'hash': - blockFilter = sql`block_hash = ${normalizeHexString(blockId.hash)}`; - break; - case 'latest': - blockFilter = sql`block_height = (SELECT block_height FROM chain_tip)`; - break; - default: - throw new Error(`Invalid blockId type: ${blockId}`); - } + const blockFilter = (() => { + switch (blockId.type) { + case 'height': + return sql`block_height = ${blockId.height}`; + case 'hash': + return sql`block_hash = ${normalizeHexString(blockId.hash)}`; + case 'latest': + return sql`block_height = (SELECT block_height FROM chain_tip)`; + default: + throw new Error(`Invalid blockId type: ${blockId}`); + } + })(); const result = await sql< { From 311a11ba27cfb6c7d1d6ad6bfffc6376fa575e74 Mon Sep 17 00:00:00 2001 From: Matthew Little Date: Wed, 4 Jun 2025 17:40:06 +0700 Subject: [PATCH 2/4] chore: update toolkit lib --- package-lock.json | 25 ++++++++++++++----------- package.json | 5 ++++- tests/db/jest-global-setup.ts | 13 ++++++------- tests/db/jest-global-teardown.ts | 20 +++++++------------- 4 files changed, 31 insertions(+), 32 deletions(-) diff --git a/package-lock.json b/package-lock.json index 2b449fb..f33c041 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12,7 +12,7 @@ "@fastify/cors": "^9.0.1", "@fastify/swagger": "^8.15.0", "@fastify/type-provider-typebox": "^4.1.0", - "@hirosystems/api-toolkit": "^1.9.0", + "@hirosystems/api-toolkit": "^1.9.1", "@hirosystems/salt-n-pepper-client": "^1.1.1", "@noble/secp256k1": "^2.2.3", "@sinclair/typebox": "^0.28.17", @@ -52,6 +52,9 @@ "ts-node": "^10.9.2", "typescript": "^5.6.3", "typescript-eslint": "^8.13.0" + }, + "engines": { + "node": ">=22.0.0" } }, "client": { @@ -1227,9 +1230,9 @@ } }, "node_modules/@hirosystems/api-toolkit": { - "version": "1.9.0", - "resolved": "https://registry.npmjs.org/@hirosystems/api-toolkit/-/api-toolkit-1.9.0.tgz", - "integrity": "sha512-9SFStFtDohud0U0J8HPhUp5Br0La4BggqOPijUJgOSjmAE3mWK7SNOAcybbe7G3YP9bpf4QMWaxcwH5mAKjHpQ==", + "version": "1.9.1", + "resolved": "https://registry.npmjs.org/@hirosystems/api-toolkit/-/api-toolkit-1.9.1.tgz", + "integrity": "sha512-pV92h6c5tPnwE+kjro1U9fhhMrBSqsvGr6KGVYU66sJdzr9D3Id7vN9tET7GY+lzhPp97tHLGJ5k7NE2AZGbiQ==", "license": "Apache 2.0", "dependencies": { "@fastify/cors": "^8.0.0", @@ -2134,9 +2137,9 @@ } }, "node_modules/@redis/client": { - "version": "1.6.0", - "resolved": "https://registry.npmjs.org/@redis/client/-/client-1.6.0.tgz", - "integrity": "sha512-aR0uffYI700OEEH4gYnitAnv3vzVGXCFvYfdpu/CJKvk4pHfLPEy/JSZyrpQ+15WhXe1yJRXLtfQ84s4mEXnPg==", + "version": "1.6.1", + "resolved": "https://registry.npmjs.org/@redis/client/-/client-1.6.1.tgz", + "integrity": "sha512-/KCsg3xSlR+nCK8/8ZYSknYxvXHwubJrU82F3Lm1Fp6789VQ0/3RJKfsmRXjqfaTA++23CvC3hqmqe/2GEt6Kw==", "license": "MIT", "dependencies": { "cluster-key-slot": "1.1.2", @@ -10183,16 +10186,16 @@ } }, "node_modules/redis": { - "version": "4.7.0", - "resolved": "https://registry.npmjs.org/redis/-/redis-4.7.0.tgz", - "integrity": "sha512-zvmkHEAdGMn+hMRXuMBtu4Vo5P6rHQjLoHftu+lBqq8ZTA3RCVC/WzD790bkKKiNFp7d5/9PcSD19fJyyRvOdQ==", + "version": "4.7.1", + "resolved": "https://registry.npmjs.org/redis/-/redis-4.7.1.tgz", + "integrity": "sha512-S1bJDnqLftzHXHP8JsT5II/CtHWQrASX5K96REjWjlmWKrviSOLWmM7QnRLstAWsu1VBBV1ffV6DzCvxNP0UJQ==", "license": "MIT", "workspaces": [ "./packages/*" ], "dependencies": { "@redis/bloom": "1.2.0", - "@redis/client": "1.6.0", + "@redis/client": "1.6.1", "@redis/graph": "1.1.1", "@redis/json": "1.0.7", "@redis/search": "1.2.0", diff --git a/package.json b/package.json index 6d388e0..90ecc42 100644 --- a/package.json +++ b/package.json @@ -5,6 +5,9 @@ "main": "index.js", "author": "Hiro Systems PBC (https://hiro.so)", "license": "GPL-3.0", + "engines": { + "node": ">=22.0.0" + }, "scripts": { "build": "rimraf ./dist && tsc --project tsconfig.build.json", "build:tests": "tsc -p tsconfig.json --noEmit", @@ -56,7 +59,7 @@ "@fastify/cors": "^9.0.1", "@fastify/swagger": "^8.15.0", "@fastify/type-provider-typebox": "^4.1.0", - "@hirosystems/api-toolkit": "^1.9.0", + "@hirosystems/api-toolkit": "^1.9.1", "@hirosystems/salt-n-pepper-client": "^1.1.1", "@noble/secp256k1": "^2.2.3", "@sinclair/typebox": "^0.28.17", diff --git a/tests/db/jest-global-setup.ts b/tests/db/jest-global-setup.ts index 77a7778..8118cec 100644 --- a/tests/db/jest-global-setup.ts +++ b/tests/db/jest-global-setup.ts @@ -36,9 +36,6 @@ async function pruneContainers(docker: Docker, label: string) { const containers = await docker.listContainers({ all: true, filters: { label: [label] } }); for (const container of containers) { const c = docker.getContainer(container.Id); - if (container.State !== 'exited') { - await c.stop().catch(_err => {}); - } await c.remove({ v: true, force: true }); } await docker.pruneContainers({ filters: { label: [label] } }); @@ -83,10 +80,11 @@ async function startContainer(args: { console.log(`${image} container started on ports ${JSON.stringify(ports)}`); - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - const containerIds = ((globalThis as any).__TEST_DOCKER_CONTAINER_IDS as string[]) ?? []; - containerIds.push(container.id); - Object.assign(globalThis, { __TEST_DOCKER_CONTAINER_IDS: containerIds }); + const containers: { id: string; image: string }[] = + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + (globalThis as any).__TEST_DOCKER_CONTAINERS ?? []; + containers.push({ id: container.id, image }); + Object.assign(globalThis, { __TEST_DOCKER_CONTAINERS: containers }); return { image, containerId: container.id }; } catch (error) { @@ -126,6 +124,7 @@ async function waitForPostgres(): Promise { }, }); await sql`SELECT 1`; + await sql.end(); console.log('Postgres is ready'); } diff --git a/tests/db/jest-global-teardown.ts b/tests/db/jest-global-teardown.ts index 05a2484..60490ac 100644 --- a/tests/db/jest-global-teardown.ts +++ b/tests/db/jest-global-teardown.ts @@ -2,20 +2,14 @@ import * as Docker from 'dockerode'; // Jest global teardown to stop and remove the container export default async function teardown(): Promise { - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - const containerIds = ((globalThis as any).__TEST_DOCKER_CONTAINER_IDS as string[]) ?? []; - for (const containerId of containerIds) { - console.log(`Stopping and removing container ${containerId}...`); + const containers: { id: string; image: string }[] = + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + (globalThis as any).__TEST_DOCKER_CONTAINERS ?? []; + for (const { id, image } of containers) { + console.log(`Stopping and removing container ${image} - ${id}...`); const docker = new Docker(); - const container = docker.getContainer(containerId); - const info = await container - .inspect() - .then(i => i.Image) - .catch(() => '?'); - await container.stop().catch(error => { - console.error(`Failed to stop container ${containerId}: ${error}`); - }); + const container = docker.getContainer(id); await container.remove({ v: true, force: true }); - console.log(`Test docker container ${info} ${containerId} stopped and removed`); + console.log(`Test docker container ${image} ${id} stopped and removed`); } } From 8471ecb6552f92a80fe6df187169ac1c30c002c4 Mon Sep 17 00:00:00 2001 From: Matthew Little Date: Wed, 4 Jun 2025 18:16:34 +0700 Subject: [PATCH 3/4] chore: use single WorkerThreadManager --- src/event-stream/event-stream.ts | 19 +++++++++---------- src/event-stream/msg-parser-worker-blocks.ts | 8 -------- .../msg-parser-worker-stackerdb.ts | 7 ------- src/event-stream/threaded-parser-worker.ts | 16 ++++++++++++++++ 4 files changed, 25 insertions(+), 25 deletions(-) delete mode 100644 src/event-stream/msg-parser-worker-blocks.ts delete mode 100644 src/event-stream/msg-parser-worker-stackerdb.ts create mode 100644 src/event-stream/threaded-parser-worker.ts diff --git a/src/event-stream/event-stream.ts b/src/event-stream/event-stream.ts index 3674bbc..1510424 100644 --- a/src/event-stream/event-stream.ts +++ b/src/event-stream/event-stream.ts @@ -12,15 +12,13 @@ import { ParsedNakamotoBlock, ParsedStackerDbChunk } from './msg-parsing'; import { SignerMessagesEventPayload } from '../pg/types'; import { SERVER_VERSION } from '@hirosystems/api-toolkit'; import { EventEmitter } from 'node:events'; -import * as msgParserWorkerBlocks from './msg-parser-worker-blocks'; -import * as msgParserWorkerStackerDb from './msg-parser-worker-stackerdb'; +import * as msgParserWorkerBlocks from './threaded-parser-worker'; export class EventStreamHandler { db: PgStore; logger = defaultLogger.child({ name: 'EventStreamHandler' }); eventStream: StacksEventStream; - threadedParserBlocks = new WorkerThreadManager(msgParserWorkerBlocks); - threadedParserStackerDb = new WorkerThreadManager(msgParserWorkerStackerDb); + threadedParser = new WorkerThreadManager(msgParserWorkerBlocks); readonly events = new EventEmitter<{ processedMessage: [{ msgId: string }]; @@ -59,8 +57,9 @@ export class EventStreamHandler { ); } if ('signer_signature_hash' in blockMsg) { - const parsed = await this.threadedParserBlocks.exec(nakamotoBlockMsg); - await this.handleNakamotoBlockMsg(messageId, parseInt(timestamp), parsed); + const parsed = await this.threadedParser.exec({ kind: 'block', msg: nakamotoBlockMsg }); + const result = parsed.result as ParsedNakamotoBlock; + await this.handleNakamotoBlockMsg(messageId, parseInt(timestamp), result); } else { // ignore pre-Nakamoto blocks } @@ -69,8 +68,9 @@ export class EventStreamHandler { case '/stackerdb_chunks': { const msg = body as StackerDbChunk; - const parsed = await this.threadedParserStackerDb.exec(msg); - await this.handleStackerDbMsg(messageId, parseInt(timestamp), parsed); + const parsed = await this.threadedParser.exec({ kind: 'chunk', msg }); + const result = parsed.result as ParsedStackerDbChunk[]; + await this.handleStackerDbMsg(messageId, parseInt(timestamp), result); break; } @@ -97,8 +97,7 @@ export class EventStreamHandler { async stop(): Promise { await this.eventStream.stop(); - await this.threadedParserBlocks.close(); - await this.threadedParserStackerDb.close(); + await this.threadedParser.close(); } async handleStackerDbMsg( diff --git a/src/event-stream/msg-parser-worker-blocks.ts b/src/event-stream/msg-parser-worker-blocks.ts deleted file mode 100644 index 8ff0905..0000000 --- a/src/event-stream/msg-parser-worker-blocks.ts +++ /dev/null @@ -1,8 +0,0 @@ -import type { CoreNodeNakamotoBlockMessage } from './core-node-message'; -import { ParsedNakamotoBlock, parseNakamotoBlockMsg } from './msg-parsing'; - -export function processTask(msg: CoreNodeNakamotoBlockMessage): ParsedNakamotoBlock { - return parseNakamotoBlockMsg(msg); -} - -export const workerModule = module; diff --git a/src/event-stream/msg-parser-worker-stackerdb.ts b/src/event-stream/msg-parser-worker-stackerdb.ts deleted file mode 100644 index d97f2c1..0000000 --- a/src/event-stream/msg-parser-worker-stackerdb.ts +++ /dev/null @@ -1,7 +0,0 @@ -import type { StackerDbChunk } from './core-node-message'; -import { parseStackerDbChunk } from './msg-parsing'; - -export function processTask(msg: StackerDbChunk) { - return parseStackerDbChunk(msg); -} -export const workerModule = module; diff --git a/src/event-stream/threaded-parser-worker.ts b/src/event-stream/threaded-parser-worker.ts new file mode 100644 index 0000000..2110e1f --- /dev/null +++ b/src/event-stream/threaded-parser-worker.ts @@ -0,0 +1,16 @@ +import type { CoreNodeNakamotoBlockMessage, StackerDbChunk } from './core-node-message'; +import { parseNakamotoBlockMsg, parseStackerDbChunk } from './msg-parsing'; + +export function processTask( + args: + | { kind: 'block'; msg: CoreNodeNakamotoBlockMessage } + | { kind: 'chunk'; msg: StackerDbChunk } +) { + if (args.kind === 'block') { + return { kind: 'block', result: parseNakamotoBlockMsg(args.msg) }; + } else { + return { kind: 'chunk', result: parseStackerDbChunk(args.msg) }; + } +} + +export const workerModule = module; From f9b173d5f982974b30c785d59170f4dc5c6a9f5f Mon Sep 17 00:00:00 2001 From: Matthew Little Date: Wed, 4 Jun 2025 18:17:15 +0700 Subject: [PATCH 4/4] chore: revert debugging changes --- chunk-parser/src/recover-slot-pubkey.ts | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/chunk-parser/src/recover-slot-pubkey.ts b/chunk-parser/src/recover-slot-pubkey.ts index 5bf3fdf..d9e4cb7 100644 --- a/chunk-parser/src/recover-slot-pubkey.ts +++ b/chunk-parser/src/recover-slot-pubkey.ts @@ -1,18 +1,7 @@ import crypto from 'node:crypto'; -// import * as secp from '@noble/secp256k1'; +import * as secp from '@noble/secp256k1'; import { ModifiedSlot, NewNakamotoBlockMessage, toU32BeBytes } from './common'; -let secp: typeof import('@noble/secp256k1'); - -async function getSecp() { - if (!secp) secp = await import('@noble/secp256k1'); - return secp; -} -getSecp().catch(error => { - console.error(`Failed to load secp256k1: ${error}`, error); - throw error; -}); - /** Get the digest to sign that authenticates this chunk data and metadata */ function authDigest(slot: ModifiedSlot): Buffer { const hasher = crypto.createHash('sha512-256');