Skip to content

Commit 2aa3a47

Browse files
authored
feat: support sql transactions for multiple databases (#49)
* feat: support sql transactions for multiple dbs * fix: test
1 parent 9ff424f commit 2aa3a47

File tree

3 files changed

+28
-14
lines changed

3 files changed

+28
-14
lines changed

src/helpers/values.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,3 +142,19 @@ export function toEnumValue<T>(enm: { [s: string]: T }, value: string): T | unde
142142
? (value as unknown as T)
143143
: undefined;
144144
}
145+
146+
/**
147+
* Unwraps a value that may be undefined or null.
148+
* @param val - The value to unwrap
149+
* @param onNullish - Callback to throw an error if the value is null or undefined
150+
* @returns The unwrapped value
151+
*/
152+
export function unwrap<T>(val: T | null, onNullish?: () => string): Exclude<T, undefined | null> {
153+
if (val === undefined) {
154+
throw new Error(onNullish?.() ?? 'value is undefined');
155+
}
156+
if (val === null) {
157+
throw new Error(onNullish?.() ?? 'value is null');
158+
}
159+
return val as Exclude<T, undefined | null>;
160+
}

src/postgres/__tests__/base-pg-store.test.ts

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,21 +92,18 @@ describe('BasePgStore', () => {
9292
});
9393

9494
test('postgres transaction connection integrity', async () => {
95-
const usageName = 'postgres:test;datastore-crud';
9695
const obj = db.sql;
96+
const dbName = obj.options.database;
9797

9898
expect(sqlTransactionContext.getStore()).toBeUndefined();
9999
await db.sqlTransaction(async sql => {
100-
// Transaction flag is open.
101-
expect(sqlTransactionContext.getStore()?.usageName).toBe(usageName);
102100
// New connection object.
103101
const newObj = sql;
104102
expect(obj).not.toEqual(newObj);
105-
expect(sqlTransactionContext.getStore()?.sql).toEqual(newObj);
103+
expect(sqlTransactionContext.getStore()?.[dbName]).toEqual(newObj);
106104

107105
// Nested tx uses the same connection object.
108106
await db.sqlTransaction(sql => {
109-
expect(sqlTransactionContext.getStore()?.usageName).toBe(usageName);
110107
expect(newObj).toEqual(sql);
111108
});
112109

src/postgres/base-pg-store.ts

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@ import { isProdEnv } from '../helpers/values';
88
*/
99
export const sqlTransactionContext = new AsyncLocalStorage<SqlTransactionContext>();
1010
type SqlTransactionContext = {
11-
usageName: string;
12-
sql: PgSqlClient;
11+
[dbName: string]: PgSqlClient;
1312
};
1413
type UnwrapPromiseArray<T> = T extends any[]
1514
? {
@@ -26,8 +25,9 @@ export abstract class BasePgStore {
2625
* async context will be returned to guarantee transaction consistency.
2726
*/
2827
get sql(): PgSqlClient {
28+
const dbName = this._sql.options.database?.toString() ?? 'default';
2929
const sqlContext = sqlTransactionContext.getStore();
30-
return sqlContext ? sqlContext.sql : this._sql;
30+
return sqlContext ? sqlContext[dbName] : this._sql;
3131
}
3232
private readonly _sql: PgSqlClient;
3333

@@ -52,15 +52,16 @@ export abstract class BasePgStore {
5252
callback: (sql: PgSqlClient) => T | Promise<T>,
5353
readOnly = true
5454
): Promise<UnwrapPromiseArray<T>> {
55-
// Do we have a scoped client already? Use it directly.
56-
const sqlContext = sqlTransactionContext.getStore();
57-
if (sqlContext) {
58-
return callback(sqlContext.sql) as UnwrapPromiseArray<T>;
55+
// Do we have a scoped client already? Use it directly. Key is the database name.
56+
const dbName = this._sql.options.database?.toString() ?? 'default';
57+
const sql = sqlTransactionContext.getStore()?.[dbName];
58+
if (sql) {
59+
return callback(sql) as UnwrapPromiseArray<T>;
5960
}
6061
// Otherwise, start a transaction and store the scoped connection in the current async context.
61-
const usageName = this._sql.options.connection.application_name ?? '';
6262
return this._sql.begin(readOnly ? 'read only' : 'read write', sql => {
63-
return sqlTransactionContext.run({ usageName, sql }, () => callback(sql));
63+
const currentStore = sqlTransactionContext.getStore() ?? {};
64+
return sqlTransactionContext.run({ ...currentStore, [dbName]: sql }, () => callback(sql));
6465
});
6566
}
6667

0 commit comments

Comments
 (0)