diff --git a/package.json b/package.json index 9a4259c..695c011 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@clickup/ent-framework", "description": "A PostgreSQL graph-database-alike library with microsharding and row-level security", - "version": "2.13.7", + "version": "2.13.8", "license": "MIT", "keywords": [ "postgresql", diff --git a/src/abstract/Cluster.ts b/src/abstract/Cluster.ts index 5c5efed..8acf642 100644 --- a/src/abstract/Cluster.ts +++ b/src/abstract/Cluster.ts @@ -17,6 +17,7 @@ import { objectHash, maybeCall, jsonHash, + jitter, } from "../internal/misc"; import { Registry } from "../internal/Registry"; import type { Client } from "./Client"; @@ -45,6 +46,8 @@ export interface ClusterOptions { localCache?: LocalCache | null; /** How often to run Shards rediscovery in normal circumstances. */ shardsDiscoverIntervalMs?: MaybeCallable; + /** Jitter for shardsDiscoverIntervalMs. */ + shardsDiscoverIntervalJitter?: MaybeCallable; /** How often to recheck for changes in options.islands (typically, often, * since it's assumed that options.islands calculation is cheap). If the * Cluster configuration is changed, then we trigger rediscovery ASAP. */ @@ -97,6 +100,7 @@ export class Cluster { > = { localCache: null, shardsDiscoverIntervalMs: 10000, + shardsDiscoverIntervalJitter: 0.2, shardsDiscoverRecheckIslandsIntervalMs: 500, locateIslandErrorRetryCount: 2, locateIslandErrorRediscoverClusterDelayMs: 1000, @@ -183,7 +187,11 @@ export class Cluster { this.shardNoByID = client.shardNoByID.bind(client); this.shardsDiscoverCache = new CachedRefreshedValue({ - delayMs: () => maybeCall(this.options.shardsDiscoverIntervalMs), + delayMs: () => + Math.round( + maybeCall(this.options.shardsDiscoverIntervalMs) * + jitter(maybeCall(this.options.shardsDiscoverIntervalJitter)), + ), warningTimeoutMs: () => maybeCall(this.options.shardsDiscoverIntervalMs), deps: { delayMs: () => @@ -206,16 +214,38 @@ export class Cluster { * Signals the Cluster to keep the Clients pre-warmed, e.g. open. (It's up to * the particular Client's implementation, what does a "pre-warmed Client" * mean; typically, it's keeping some minimal number of pooled connections.) + * + * Except when `randomizedDelayMs` is passed as 0, the actual prewarm (and + * Islands discovery) queries will run with a randomized delay between N/2 and + * N ms. It is better to operate in such mode: if multiple Node processes + * start simultaneously in the cluster, then the randomization helps to avoid + * new connections burst (new connections establishment is expensive for e.g. + * pgbouncer or when DB is accessed over SSL). */ - prewarm(): void { + prewarm( + randomizedDelayMs: number = 5000, + onInitialPrewarm?: (delayMs: number) => void, + ): void { + if (this.prewarmEnabled) { + return; + } + this.prewarmEnabled = true; - runInVoid(async () => { - for (const island of await this.islands()) { - for (const client of island.clients) { - client.prewarm(); - } - } - }); + const initialDelayMs = randomizedDelayMs + ? Math.round(random(randomizedDelayMs / 2, randomizedDelayMs)) + : 0; + setTimeout( + () => + runInVoid(async () => { + onInitialPrewarm?.(initialDelayMs); + for (const island of await this.islands()) { + for (const client of island.clients) { + client.prewarm(); + } + } + }), + initialDelayMs, + ); } /** @@ -402,7 +432,11 @@ export class Cluster { const startTime = performance.now(); await pTimeout( this.shardsDiscoverCache.waitRefresh(), - maybeCall(this.options.shardsDiscoverIntervalMs) * 2, + Math.round( + maybeCall(this.options.shardsDiscoverIntervalMs) * + jitter(maybeCall(this.options.shardsDiscoverIntervalJitter)) * + 2, + ), "Timed out while waiting for whole-Cluster Shards discovery.", ).catch((error) => this.options.loggers.swallowedErrorLogger({ @@ -433,7 +467,11 @@ export class Cluster { const startTime = performance.now(); await pTimeout( island.rediscover(), - maybeCall(this.options.shardsDiscoverIntervalMs) * 2, + Math.round( + maybeCall(this.options.shardsDiscoverIntervalMs) * + jitter(maybeCall(this.options.shardsDiscoverIntervalJitter)) * + 2, + ), `Timed out while waiting for Island ${island.no} Shards discovery.`, ).catch((error) => this.options.loggers.swallowedErrorLogger({ diff --git a/src/abstract/LocalCache.ts b/src/abstract/LocalCache.ts index 74c3215..410f1f0 100644 --- a/src/abstract/LocalCache.ts +++ b/src/abstract/LocalCache.ts @@ -65,7 +65,10 @@ export class LocalCache { this.options = defaults({}, options, LocalCache.DEFAULT_OPTIONS); this.cleanupTimeout = setTimeout( () => this.onCleanupTimer(), - this.options.cleanupFirstRunDelayMs * jitter(this.options.cleanupJitter), + Math.round( + this.options.cleanupFirstRunDelayMs * + jitter(this.options.cleanupJitter), + ), ); } @@ -170,8 +173,10 @@ export class LocalCache { ); this.cleanupTimeout = setTimeout( () => this.onCleanupTimer(), - (this.options.expirationMs / this.options.cleanupRoundsPerExpiration) * - jitter(this.options.cleanupJitter), + Math.round( + (this.options.expirationMs / this.options.cleanupRoundsPerExpiration) * + jitter(this.options.cleanupJitter), + ), ); } diff --git a/src/ent/Inverse.ts b/src/ent/Inverse.ts index 67d66be..0006440 100644 --- a/src/ent/Inverse.ts +++ b/src/ent/Inverse.ts @@ -119,7 +119,7 @@ export class Inverse { }), ); if (row) { - await this.run(vc, this.shard(id1), this.inverseSchema.delete(row.id)); + await this.run(vc, this.shard(id1), this.inverseSchema.delete(row[ID])); } } @@ -143,7 +143,7 @@ export class Inverse { * Creates an Inverse schema which derives its id field's autoInsert from the * passed id2 schema. The returned schema is heavily cached, so batching for * it works efficiently even for different id2 schemas and different Inverse - * types (actually, it would work the same way even without @Memoize since + * types (actually, it would work the same way even without `@Memoize` since * Runner batches by schema hash, not by schema object instance, but anyways). */ @Memoize( diff --git a/src/ent/QueryCache.ts b/src/ent/QueryCache.ts index 6a7fb29..48200b1 100644 --- a/src/ent/QueryCache.ts +++ b/src/ent/QueryCache.ts @@ -6,6 +6,7 @@ import { VCWithQueryCache } from "./VCFlavor"; const OPS = [ "loadNullable", "loadByNullable", + "selectBy", "select", "count", "exists", @@ -86,9 +87,14 @@ export class QueryCache { } /** - * Deletes cache slots or keys for an Ent. + * Deletes cache slots or keys for an Ent. If key is null, skips the deletion. + * If key is undefined (i.e. not passed), then deletes all slots. */ - delete(EntClass: AnyClass, ops: Op[], key?: string): this { + delete(EntClass: AnyClass, ops: readonly Op[], key?: string | null): this { + if (key === null) { + return this; + } + const byOp = this.byEntClass?.get(EntClass); if (!byOp) { return this; diff --git a/src/ent/VCTrace.ts b/src/ent/VCTrace.ts index e018c92..3dedf06 100644 --- a/src/ent/VCTrace.ts +++ b/src/ent/VCTrace.ts @@ -4,7 +4,6 @@ * part of the trace value. */ const RANDOM_BITS = 10; - const RANDOM_BITS_MASK = Math.pow(2, RANDOM_BITS) - 1; /** @@ -16,16 +15,34 @@ export class VCTrace { readonly trace: string; constructor(trace?: string) { - this.trace = trace ?? createRandomTrace(); + this.trace = trace ?? this.createRandomTrace(); } -} -/** - * Returns a stringified uint63 (0 - 9223372036854775807). - */ -function createRandomTrace(): string { - return ( - (BigInt(Date.now()) << BigInt(RANDOM_BITS)) | - BigInt(Math.trunc(Math.random() * RANDOM_BITS_MASK) & RANDOM_BITS_MASK) - ).toString(); + /** + * In case the trace was created by this tool, tries to extract the date of + * its creation. As a sanity check, verifies that this date is not too far + * away from the present time. + */ + tryExtractCreationDate(): Date | null { + try { + const ts = BigInt(this.trace) >> BigInt(RANDOM_BITS); + const minTs = Date.now() - 1000 * 3600 * 24 * 365; + const maxTs = Date.now() + 1000 * 3600 * 24; + return BigInt(minTs) < ts && ts < BigInt(maxTs) + ? new Date(Number(ts)) + : null; + } catch { + return null; + } + } + + /** + * Returns a stringified uint63 (0 - 9223372036854775807). + */ + private createRandomTrace(): string { + return ( + (BigInt(Date.now()) << BigInt(RANDOM_BITS)) | + BigInt(Math.trunc(Math.random() * RANDOM_BITS_MASK) & RANDOM_BITS_MASK) + ).toString(); + } } diff --git a/src/ent/__tests__/Ent.composite-pk.test.ts b/src/ent/__tests__/Ent.composite-pk.test.ts index dfb87f1..e5614cb 100644 --- a/src/ent/__tests__/Ent.composite-pk.test.ts +++ b/src/ent/__tests__/Ent.composite-pk.test.ts @@ -10,7 +10,7 @@ import { Require } from "../rules/Require"; import { GLOBAL_SHARD } from "../ShardAffinity"; import { createVC } from "./test-utils"; -export class EntTestUser extends BaseEnt( +class EntTestUser extends BaseEnt( testCluster, new PgSchema( 'ent.composite-pk"test_user', @@ -39,7 +39,7 @@ export class EntTestUser extends BaseEnt( } } -export class EntTestComposite extends BaseEnt( +class EntTestComposite extends BaseEnt( testCluster, new PgSchema( 'ent.composite-pk"test_composite', diff --git a/src/ent/__tests__/Ent.generic.test.ts b/src/ent/__tests__/Ent.generic.test.ts index 102c630..f7cd753 100644 --- a/src/ent/__tests__/Ent.generic.test.ts +++ b/src/ent/__tests__/Ent.generic.test.ts @@ -19,7 +19,7 @@ import { createVC, expectToMatchSnapshot } from "./test-utils"; /** * Company */ -export class EntTestCompany extends BaseEnt( +class EntTestCompany extends BaseEnt( testCluster, new PgSchema( 'ent.generic"company', @@ -58,7 +58,7 @@ export class EntTestCompany extends BaseEnt( /** * User -> Company */ -export class EntTestUser extends BaseEnt( +class EntTestUser extends BaseEnt( testCluster, new PgSchema( 'ent.generic"user', @@ -110,7 +110,7 @@ export class EntTestUser extends BaseEnt( /** * Post -> User -> Company */ -export class EntTestPost extends BaseEnt( +class EntTestPost extends BaseEnt( testCluster, new PgSchema( 'ent.generic"post', @@ -165,7 +165,7 @@ export class EntTestPost extends BaseEnt( /** * Comment -> Post -> User -> Company */ -export class EntTestComment extends BaseEnt( +class EntTestComment extends BaseEnt( testCluster, new PgSchema( 'ent.generic"comment', @@ -210,7 +210,7 @@ export class EntTestComment extends BaseEnt( /** * Like -> Post -> User -> Company */ -export class EntTestLike extends BaseEnt( +class EntTestLike extends BaseEnt( testCluster, new PgSchema( 'ent.generic"like', diff --git a/src/ent/__tests__/QueryCache.test.ts b/src/ent/__tests__/QueryCache.test.ts index ef478ab..9feeec1 100644 --- a/src/ent/__tests__/QueryCache.test.ts +++ b/src/ent/__tests__/QueryCache.test.ts @@ -7,7 +7,7 @@ import { AllowIf } from "../rules/AllowIf"; import { GLOBAL_SHARD } from "../ShardAffinity"; import { createVC } from "./test-utils"; -export class EntTestCompany extends BaseEnt( +class EntTestCompany extends BaseEnt( testCluster, new PgSchema( 'query-cache"company', diff --git a/src/ent/__tests__/Triggers.test.ts b/src/ent/__tests__/Triggers.test.ts index 4e2968e..534ec93 100644 --- a/src/ent/__tests__/Triggers.test.ts +++ b/src/ent/__tests__/Triggers.test.ts @@ -19,7 +19,7 @@ const $EPHEMERAL2 = Symbol("$EPHEMERAL2"); /** * User */ -export class EntTestUser extends BaseEnt( +class EntTestUser extends BaseEnt( testCluster, new PgSchema( 'ent.triggers"user', @@ -65,7 +65,7 @@ export class EntTestUser extends BaseEnt( /** * Headline -> User -> Company */ -export class EntTestHeadline extends BaseEnt( +class EntTestHeadline extends BaseEnt( testCluster, new PgSchema( 'ent.triggers"headline', @@ -269,7 +269,7 @@ export class EntTestHeadline extends BaseEnt( /** * Country */ -export class EntTestCountry extends BaseEnt( +class EntTestCountry extends BaseEnt( testCluster, new PgSchema( 'ent.triggers"country', diff --git a/src/ent/mixins/CacheMixin.ts b/src/ent/mixins/CacheMixin.ts index 458a76b..fc0dba4 100644 --- a/src/ent/mixins/CacheMixin.ts +++ b/src/ent/mixins/CacheMixin.ts @@ -1,5 +1,8 @@ import type { Client } from "../../abstract/Client"; +import { hasKey } from "../../internal/misc"; +import { ID } from "../../types"; import type { + SelectByInput, CountInput, ExistsInput, InsertInput, @@ -14,6 +17,14 @@ import type { UpdateOriginalInput } from "../types"; import type { VC } from "../VC"; import type { PrimitiveClass, PrimitiveInstance } from "./PrimitiveMixin"; +const MULTI_ROW_AFFECTING_OPS = [ + "loadByNullable", + "selectBy", + "select", + "count", + "exists", +] as const; + /** * Modifies the passed class adding VC-stored cache layer to it. */ @@ -31,23 +42,22 @@ export function CacheMixin< vc: VC, input: InsertInput, ): Promise { - const res = await super.insertIfNotExists(vc, input); - vc.cache(QueryCache).delete(this, ["loadByNullable", "select", "count"]); - return res; + const resPromise = super.insertIfNotExists(vc, input); + vc.cache(QueryCache) + .delete(this, ["loadNullable"], hasKey(ID, input) ? input[ID] : null) + .delete(this, MULTI_ROW_AFFECTING_OPS); + return resPromise; } static override async upsert( vc: VC, input: InsertInput, ): Promise { - const res = super.upsert(vc, input); - vc.cache(QueryCache).delete(this, [ - "loadNullable", - "loadByNullable", - "select", - "count", - ]); - return res; + const resPromise = super.upsert(vc, input); + vc.cache(QueryCache) + .delete(this, ["loadNullable"], hasKey(ID, input) ? input[ID] : null) + .delete(this, MULTI_ROW_AFFECTING_OPS); + return resPromise; } static override async loadNullable>( @@ -76,6 +86,18 @@ export function CacheMixin< ) as Promise; } + static override async selectBy>( + this: new () => TEnt, + vc: VC, + input: SelectByInput, + ): Promise { + return vc + .cache(QueryCache) + .through(this, "selectBy", JSON.stringify(input), async () => + super.selectBy(vc, input), + ) as Promise; + } + static override async select>( this: new () => TEnt, vc: VC, @@ -119,21 +141,21 @@ export function CacheMixin< override async updateOriginal( input: UpdateOriginalInput, ): Promise { - const res = await super.updateOriginal(input); + const resPromise = super.updateOriginal(input); this.vc .cache(QueryCache) - .delete(this.constructor, ["loadNullable"], this.id) - .delete(this.constructor, ["loadByNullable", "select", "count"]); - return res; + .delete(this.constructor, ["loadNullable"], this[ID]) + .delete(this.constructor, MULTI_ROW_AFFECTING_OPS); + return resPromise; } override async deleteOriginal(): Promise { - const res = await super.deleteOriginal(); + const resPromise = super.deleteOriginal(); this.vc .cache(QueryCache) - .delete(this.constructor, ["loadNullable"], this.id) - .delete(this.constructor, ["loadByNullable", "select", "count"]); - return res; + .delete(this.constructor, ["loadNullable"], this[ID]) + .delete(this.constructor, MULTI_ROW_AFFECTING_OPS); + return resPromise; } } diff --git a/src/ent/mixins/__tests__/CacheMixin.test.ts b/src/ent/mixins/__tests__/CacheMixin.test.ts new file mode 100644 index 0000000..661ece8 --- /dev/null +++ b/src/ent/mixins/__tests__/CacheMixin.test.ts @@ -0,0 +1,148 @@ +import { MASTER } from "../../../abstract/Shard"; +import type { TestPgClient } from "../../../pg/__tests__/test-utils"; +import { + recreateTestTables, + testCluster, +} from "../../../pg/__tests__/test-utils"; +import { escapeIdent } from "../../../pg/helpers/escapeIdent"; +import { PgSchema } from "../../../pg/PgSchema"; +import { createVC } from "../../__tests__/test-utils"; +import { BaseEnt } from "../../BaseEnt"; +import { True } from "../../predicates/True"; +import { AllowIf } from "../../rules/AllowIf"; +import { Require } from "../../rules/Require"; +import { GLOBAL_SHARD } from "../../ShardAffinity"; +import type { VC } from "../../VC"; +import { VCWithQueryCache } from "../../VCFlavor"; + +class EntTest extends BaseEnt( + testCluster, + new PgSchema( + 'cache-mixin"test', + { + id: { type: String, autoInsert: "id_gen()" }, + k1: { type: String }, + k2: { type: String }, + }, + ["k1", "k2"], + ), +) { + static readonly CREATE = [ + `CREATE TABLE %T( + id bigint NOT NULL PRIMARY KEY, + k1 text NOT NULL, + k2 text NOT NULL, + UNIQUE (k1, k2) + )`, + ]; + + static override configure() { + return new this.Configuration({ + shardAffinity: GLOBAL_SHARD, + privacyInferPrincipal: null, + privacyLoad: [new AllowIf(new True())], + privacyInsert: [new Require(new True())], + }); + } +} + +const STATIC_ID = "42"; +const ROW = { k1: "test", k2: "test" }; + +let vc: VC; +let master: TestPgClient; + +beforeEach(async () => { + await recreateTestTables([EntTest]); + vc = createVC().withFlavor(new VCWithQueryCache({ maxQueries: 1000 })); + master = await EntTest.CLUSTER.globalShard().client(MASTER); +}); + +test.each([ + [ + "cache is cleaned on insert with id", + async (vc: VC) => EntTest.insert(vc, { id: STATIC_ID, ...ROW }), + ], + [ + "cache is cleaned on upsert with id", + async (vc: VC) => EntTest.upsert(vc, { id: STATIC_ID, ...ROW }), + ], + [ + "cache is cleaned on insert without id", + async (vc: VC) => EntTest.insert(vc, ROW), + ], + [ + "cache is cleaned on upsert without id", + async (vc: VC) => EntTest.upsert(vc, ROW), + ], + [ + "cache is cleaned on update", + async (_vc: VC, existing: EntTest) => + existing.updateOriginal(ROW).then(() => existing.id), + ], +])("%s", async (_name, mutate) => { + const existing = await EntTest.insertReturning(vc, { + ...ROW, + k1: "test-existing", + }); + + // Prewarm cache. + expect(await EntTest.loadNullable(vc, STATIC_ID)).toEqual(null); + expect(await EntTest.loadNullable(vc, existing.id)).not.toMatchObject(ROW); + expect(await EntTest.loadByNullable(vc, ROW)).toEqual(null); + expect(await EntTest.selectBy(vc, { k1: ROW.k1 })).toHaveLength(0); + expect(await EntTest.select(vc, ROW, 1)).toHaveLength(0); + expect(await EntTest.count(vc, ROW)).toEqual(0); + expect(await EntTest.exists(vc, ROW)).toEqual(false); + + const mutatedID = await mutate(vc, existing); + + // Mutation should be reflected in all read calls. + expect(await EntTest.loadNullable(vc, mutatedID)).toMatchObject(ROW); + expect(await EntTest.loadByNullable(vc, ROW)).toMatchObject(ROW); + expect(await EntTest.selectBy(vc, { k1: ROW.k1 })).toHaveLength(1); + expect(await EntTest.select(vc, ROW, 1)).toHaveLength(1); + expect(await EntTest.count(vc, ROW)).toEqual(1); + expect(await EntTest.exists(vc, ROW)).toEqual(true); + + // Direct DB modification must not affect cache. + await master.query({ + query: [ + `DELETE FROM ${escapeIdent(EntTest.SCHEMA.name)} WHERE id=?`, + mutatedID, + ], + isWrite: true, + annotations: [], + op: "", + table: EntTest.SCHEMA.name, + batchFactor: 1, + }); + + expect(await EntTest.loadNullable(vc, mutatedID)).toMatchObject(ROW); + expect(await EntTest.loadByNullable(vc, ROW)).toMatchObject(ROW); + expect(await EntTest.select(vc, ROW, 1)).toHaveLength(1); + expect(await EntTest.count(vc, ROW)).toEqual(1); + expect(await EntTest.exists(vc, ROW)).toEqual(true); +}); + +test("cache is cleaned on delete", async () => { + const existing = await EntTest.insertReturning(vc, ROW); + + // Prewarm cache. + expect(await EntTest.loadNullable(vc, existing.id)).toMatchObject(ROW); + expect(await EntTest.loadByNullable(vc, ROW)).toMatchObject(ROW); + expect(await EntTest.selectBy(vc, { k1: ROW.k1 })).toHaveLength(1); + expect(await EntTest.select(vc, ROW, 1)).toHaveLength(1); + expect(await EntTest.count(vc, ROW)).toEqual(1); + expect(await EntTest.exists(vc, ROW)).toEqual(true); + + await existing.deleteOriginal(); + + // Deletion must be reflected in all read calls. + expect(await EntTest.loadNullable(vc, existing.id)).toEqual(null); + expect(await EntTest.loadByNullable(vc, ROW)).toEqual(null); + expect(await EntTest.selectBy(vc, { k1: ROW.k1 })).toHaveLength(0); + expect(await EntTest.select(vc, ROW, 1)).toHaveLength(0); + expect(await EntTest.count(vc, ROW)).toEqual(0); + expect(await EntTest.exists(vc, ROW)).toEqual(false); +}); diff --git a/src/ent/predicates/IncomingEdgeFromVCExists.ts b/src/ent/predicates/IncomingEdgeFromVCExists.ts index 1625737..0c09f05 100644 --- a/src/ent/predicates/IncomingEdgeFromVCExists.ts +++ b/src/ent/predicates/IncomingEdgeFromVCExists.ts @@ -37,10 +37,9 @@ export class IncomingEdgeFromVCExists "(" + this.EntEdge.name + "[" + - this.entEdgeVCField + - "=vc, " + - this.entEdgeFKField + - "=row.id]" + + `${this.entEdgeVCField}=vc, ` + + `${this.entEdgeFKField}=row.${ID}` + + "]" + ")"; } diff --git a/src/pg/PgClientPool.ts b/src/pg/PgClientPool.ts index 1cb7fd7..6a2f33f 100644 --- a/src/pg/PgClientPool.ts +++ b/src/pg/PgClientPool.ts @@ -23,11 +23,16 @@ export interface PgClientPoolOptions extends PgClientOptions { Pool?: typeof Pool; /** Close the connection after the query if it was opened long time ago. */ maxConnLifetimeMs?: MaybeCallable; - /** Jitter for old connections closure. */ + /** Jitter for maxConnLifetimeMs. */ maxConnLifetimeJitter?: MaybeCallable; + /** Add not more than this number of connections in each prewarm interval. New + * connections are expensive to establish (especially when SSL is enabled). */ + prewarmIntervalStep?: MaybeCallable; /** How often to send bursts of prewarm queries to all Clients to keep the * minimal number of open connections. */ prewarmIntervalMs?: MaybeCallable; + /** Jitter for prewarmIntervalMs. */ + prewarmIntervalJitter?: MaybeCallable; /** What prewarm query to send. */ prewarmQuery?: MaybeCallable; } @@ -48,8 +53,10 @@ export class PgClientPool extends PgClient { ...super.DEFAULT_OPTIONS, Pool, maxConnLifetimeMs: 0, - maxConnLifetimeJitter: 0.2, + maxConnLifetimeJitter: 0.5, + prewarmIntervalStep: 1, prewarmIntervalMs: 10000, + prewarmIntervalJitter: 0.5, prewarmQuery: 'SELECT 1 AS "prewarmQuery"', }; @@ -81,8 +88,10 @@ export class PgClientPool extends PgClient { if (maxConnLifetimeMs > 0) { client.closeAt = Date.now() + - maxConnLifetimeMs * - jitter(maybeCall(this.options.maxConnLifetimeJitter)); + Math.round( + maxConnLifetimeMs * + jitter(maybeCall(this.options.maxConnLifetimeJitter)), + ); } // Sets a "default error" handler to not let errors leak to e.g. Jest @@ -168,6 +177,7 @@ export class PgClientPool extends PgClient { const min = Math.min( this.options.config.min, this.options.config.max ?? Infinity, + this.pool.totalCount + (maybeCall(this.options.prewarmIntervalStep) || 1), ); const toPrewarm = min - this.pool.waitingCount; if (toPrewarm > 0) { @@ -186,9 +196,15 @@ export class PgClientPool extends PgClient { ); } - this.prewarmTimeout.current = setTimeout(() => { - this.prewarmTimeout.current = null; - this.prewarm(); - }, maybeCall(this.options.prewarmIntervalMs)); + this.prewarmTimeout.current = setTimeout( + () => { + this.prewarmTimeout.current = null; + this.prewarm(); + }, + Math.round( + maybeCall(this.options.prewarmIntervalMs) * + jitter(maybeCall(this.options.prewarmIntervalJitter)), + ), + ); } } diff --git a/src/pg/PgQueryDeleteWhere.ts b/src/pg/PgQueryDeleteWhere.ts index f77e3c7..aff04e8 100644 --- a/src/pg/PgQueryDeleteWhere.ts +++ b/src/pg/PgQueryDeleteWhere.ts @@ -63,6 +63,6 @@ class PgRunnerDeleteWhere extends PgRunner< annotations, input[ID].length, ); - return rows.map((row) => row.id); + return rows.map((row) => row[ID]); } } diff --git a/src/pg/__tests__/test-utils.ts b/src/pg/__tests__/test-utils.ts index 2fbca46..9543db4 100644 --- a/src/pg/__tests__/test-utils.ts +++ b/src/pg/__tests__/test-utils.ts @@ -313,6 +313,7 @@ beforeEach(() => { */ export const testCluster = new Cluster({ shardsDiscoverIntervalMs: 500, + shardsDiscoverIntervalJitter: 0.01, islands: TEST_ISLANDS, createClient: ({ nameSuffix, isAlwaysLaggingReplica, loggers, ...config }) => new TestPgClient( diff --git a/src/tools/ToolScoreboard.ts b/src/tools/ToolScoreboard.ts index 8d32722..461857e 100644 --- a/src/tools/ToolScoreboard.ts +++ b/src/tools/ToolScoreboard.ts @@ -66,7 +66,7 @@ interface ToolScoreboardQueryError { } const ROTATING_CHARS = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏"; -const FACES = ["▲", "⏺", "◼"]; +const FACES = ["➀", "➁", "➂", "➃", "➄", "➅", "➆", "➇", "➈"]; /** * A tool which plays the role of Linux `top` command, but for the Cluster. @@ -88,6 +88,7 @@ export class ToolScoreboard { private launchedPollers: Set = new Set(); private renderCallCount = 0; + private renderCallFirstAt?: number; private queryPollDefers: Array> = []; /** Options of this tool. */ @@ -302,6 +303,7 @@ export class ToolScoreboard { */ render(): string { this.renderCallCount++; + this.renderCallFirstAt ??= Date.now(); const queriesWidth = this.options.maxQueries * 2; const rows: string[][] = []; @@ -311,7 +313,7 @@ export class ToolScoreboard { "Client", "Role", "Pool Conns", - `Queries (ms or ${FACES[0]} - pings; D - discovery; red - error)`, + "Queries (ms or Ⓝ ×10 - pings; D - discovery; red - error)", "Health", ]); for (const [islandNo, { clients }] of this.islands) { @@ -377,7 +379,14 @@ export class ToolScoreboard { const lines: string[] = []; - lines.push(`[${formatTimeWithMs(new Date())}]`); + lines.push( + "[" + + formatTimeWithMs(new Date()) + + (this.renderCallFirstAt + ? `, ${((Date.now() - this.renderCallFirstAt) / 1000).toFixed(1)} sec elapsed` + : "") + + "]", + ); lines.push( table(rows, {