diff --git a/src/adminDataSource.ts b/src/adminDataSource.ts new file mode 100644 index 000000000..c6d2fc823 --- /dev/null +++ b/src/adminDataSource.ts @@ -0,0 +1,75 @@ +import { DataSource } from 'typeorm'; +import config from './config'; +import { getEntities } from './entities/entities'; +import { redisConfig } from './redis'; +import { logger } from './utils/logger'; + +/** + * AdminDataSource - Dedicated DataSource for AdminJS + * + * This DataSource ALWAYS uses the master database for both reads and writes. + * This ensures AdminJS operations work correctly without trying to write to read replicas. + * + * IMPORTANT: Do NOT change defaultMode to 'slave' for this DataSource + */ +export class AdminDataSource { + private static datasource: DataSource; + + static async initialize() { + if (!AdminDataSource.datasource) { + const entities = getEntities(); + const poolSize = Number(process.env.TYPEORM_DATABASE_POOL_SIZE) || 10; + + // AdminJS always uses master - no read replica routing + AdminDataSource.datasource = new DataSource({ + name: 'admin', // Unique name for AdminJS DataSource + schema: 'public', + type: 'postgres', + // Single master connection - no replication for AdminJS + host: config.get('TYPEORM_DATABASE_HOST') as string, + port: config.get('TYPEORM_DATABASE_PORT') as number, + database: config.get('TYPEORM_DATABASE_NAME') as string, + username: config.get('TYPEORM_DATABASE_USER') as string, + password: config.get('TYPEORM_DATABASE_PASSWORD') as string, + + entities, + synchronize: false, // Never auto-sync in admin + dropSchema: false, + logger: 'advanced-console', + logging: ['error', 'warn'], + cache: { + type: 'redis', + options: { + ...redisConfig, + db: 2, // Different Redis DB for admin cache + }, + }, + poolSize: Math.max(5, Math.floor(poolSize / 2)), // Smaller pool for admin + extra: { + maxWaitingClients: 5, + evictionRunIntervalMillis: 1000, + idleTimeoutMillis: 1000, + }, + }); + + await AdminDataSource.datasource.initialize(); + logger.info('āœ… AdminDataSource initialized (Master only)'); + } + } + + static getDataSource() { + if (!AdminDataSource.datasource) { + throw new Error( + 'AdminDataSource not initialized. Call initialize() first.', + ); + } + return AdminDataSource.datasource; + } + + static async close() { + if (AdminDataSource.datasource?.isInitialized) { + await AdminDataSource.datasource.destroy(); + logger.info('AdminDataSource closed'); + } + } +} diff --git a/src/orm.ts b/src/orm.ts index 7961e434e..adc94dd65 100644 --- a/src/orm.ts +++ b/src/orm.ts @@ -4,6 +4,7 @@ import config from './config'; import { CronJob } from './entities/CronJob'; import { getEntities } from './entities/entities'; import { redisConfig } from './redis'; +import { logger } from './utils/logger'; export class AppDataSource { private static datasource: DataSource; @@ -29,7 +30,7 @@ export class AppDataSource { schema: 'public', type: 'postgres', replication: { - defaultMode: 'master', + defaultMode: slaves.length > 0 ? 'slave' : 'master', master: { database: config.get('TYPEORM_DATABASE_NAME') as string, username: config.get('TYPEORM_DATABASE_USER') as string, @@ -60,6 +61,19 @@ export class AppDataSource { }, }); await AppDataSource.datasource.initialize(); + + // Log replication configuration + if (slaves.length > 0) { + logger.info( + `āœ… AppDataSource initialized with ${slaves.length} read replica(s)`, + ); + logger.info(` Master: ${config.get('TYPEORM_DATABASE_HOST')}`); + slaves.forEach((slave, idx) => { + logger.info(` Replica ${idx + 1}: ${slave.host}`); + }); + } else { + logger.info('āœ… AppDataSource initialized (No replicas configured)'); + } } } diff --git a/src/server/adminJs/adminJs.ts b/src/server/adminJs/adminJs.ts index f2fe4b3c8..97894f283 100644 --- a/src/server/adminJs/adminJs.ts +++ b/src/server/adminJs/adminJs.ts @@ -1,40 +1,41 @@ -import adminJs, { ActionContext, AdminJSOptions } from 'adminjs'; import adminJsExpress from '@adminjs/express'; import { Database, Resource } from '@adminjs/typeorm'; +import adminJs, { ActionContext, AdminJSOptions } from 'adminjs'; import { IncomingMessage } from 'connect'; -import { User } from '../../entities/user'; +import { AdminDataSource } from '../../adminDataSource'; import config from '../../config'; +import { User } from '../../entities/user'; import { redis } from '../../redis'; -import { logger } from '../../utils/logger'; import { findUserById } from '../../repositories/userRepository'; import { fetchAdminAndValidatePassword } from '../../services/userService'; -import { campaignsTab } from './tabs/campaignsTab'; +import { logger } from '../../utils/logger'; +import { AnchorContractAddressTab } from './tabs/anchorContractAddressTab'; import { broadcastNotificationTab } from './tabs/broadcastNotificationTab'; -import { mainCategoryTab } from './tabs/mainCategoryTab'; +import { campaignsTab } from './tabs/campaignsTab'; import { categoryTab } from './tabs/categoryTab'; -import { projectsTab } from './tabs/projectsTab'; +import { donationTab } from './tabs/donationTab'; +import { featuredUpdateTab } from './tabs/featuredUpdateTab'; +import { globalConfigurationTab } from './tabs/globalConfigurationTab'; +import { mainCategoryTab } from './tabs/mainCategoryTab'; import { organizationsTab } from './tabs/organizationsTab'; -import { usersTab } from './tabs/usersTab'; +import { projectAddressTab } from './tabs/projectAddressTab'; +import { ProjectFraudTab } from './tabs/projectFraudTab'; +import { projectQfRoundsTab } from './tabs/projectQfRoundsTab'; +import { projectSocialMediaTab } from './tabs/projectSocialMediaTab'; +import { projectsTab } from './tabs/projectsTab'; import { projectStatusHistoryTab } from './tabs/projectStatusHistoryTab'; import { projectStatusReasonTab } from './tabs/projectStatusReasonTab'; -import { projectAddressTab } from './tabs/projectAddressTab'; import { projectStatusTab } from './tabs/projectStatusTab'; import { projectUpdateTab } from './tabs/projectUpdateTab'; -import { thirdPartProjectImportTab } from './tabs/thirdPartProjectImportTab'; -import { featuredUpdateTab } from './tabs/featuredUpdateTab'; -import { generateTokenTab } from './tabs/tokenTab'; -import { donationTab } from './tabs/donationTab'; import { projectVerificationTab } from './tabs/projectVerificationTab'; -import { qfRoundTab } from './tabs/qfRoundTab'; import { qfRoundHistoryTab } from './tabs/qfRoundHistoryTab'; -import { SybilTab } from './tabs/sybilTab'; -import { ProjectFraudTab } from './tabs/projectFraudTab'; +import { qfRoundTab } from './tabs/qfRoundTab'; import { RecurringDonationTab } from './tabs/recurringDonationTab'; -import { AnchorContractAddressTab } from './tabs/anchorContractAddressTab'; -import { projectSocialMediaTab } from './tabs/projectSocialMediaTab'; import { SwapTransactionTab } from './tabs/swapTransactionTab'; -import { projectQfRoundsTab } from './tabs/projectQfRoundsTab'; -import { globalConfigurationTab } from './tabs/globalConfigurationTab'; +import { SybilTab } from './tabs/sybilTab'; +import { thirdPartProjectImportTab } from './tabs/thirdPartProjectImportTab'; +import { generateTokenTab } from './tabs/tokenTab'; +import { usersTab } from './tabs/usersTab'; // use redis for session data instead of in-memory storage // eslint-disable-next-line @typescript-eslint/no-var-requires @@ -132,6 +133,8 @@ export const getCurrentAdminJsSession = async (request: IncomingMessage) => { type AdminJsResources = AdminJSOptions['resources']; const getResources = async (): Promise => { + const adminDataSource = AdminDataSource.getDataSource(); + const resources: AdminJsResources = [ projectVerificationTab, projectQfRoundsTab, @@ -162,6 +165,28 @@ const getResources = async (): Promise => { globalConfigurationTab, ]; + // Ensure all resources use the AdminJS-specific DataSource + const resourcesWithAdminDataSource: AdminJsResources = resources.map( + (res: any) => { + const resourceDef = res?.resource ?? res; + // If resource is an Entity (constructor function), wrap it with a TypeORM Resource bound to dataSource + if (typeof resourceDef === 'function') { + return { + ...res, + resource: new (Resource as any)(resourceDef, adminDataSource), + }; + } + // If resource already an object descriptor, construct a TypeORM Resource using provided model + if (resourceDef && typeof resourceDef === 'object' && resourceDef.model) { + return { + ...res, + resource: new (Resource as any)(resourceDef.model, adminDataSource), + }; + } + return res; + }, + ); + const loggingHook = async (response, request, context) => { const { action, currentAdmin, resource } = context; const { method, params } = request; @@ -180,7 +205,7 @@ const getResources = async (): Promise => { return response; }; // Add logging hook to all resources - resources.forEach(resource => { + (resourcesWithAdminDataSource as any[]).forEach(resource => { const options = resource.options || {}; const actions = options.actions || {}; const resourceActionList = Object.keys(actions); @@ -203,7 +228,7 @@ const getResources = async (): Promise => { resource.options = options; }); - return resources; + return resourcesWithAdminDataSource; }; const getadminJsInstance = async () => { diff --git a/src/server/bootstrap.ts b/src/server/bootstrap.ts index d8b66fdec..cf3484d0f 100644 --- a/src/server/bootstrap.ts +++ b/src/server/bootstrap.ts @@ -37,6 +37,7 @@ import { import { logger } from '../utils/logger'; import { adminJsRootPath, getAdminJsRouter } from './adminJs/adminJs'; // import { apiGivRouter } from '../routers/apiGivRoutes'; +import { AdminDataSource } from '../adminDataSource'; import { AppDataSource, CronDataSource } from '../orm'; import { dropDbCronExtension, @@ -103,6 +104,9 @@ export async function bootstrap() { await CronDataSource.initialize(); logger.debug('bootstrap() after CronDataSource.initialize()', new Date()); + // Initialize dedicated AdminJS DataSource (master-only) + await AdminDataSource.initialize(); + Container.set(DataSource, AppDataSource.getDataSource()); await setDatabaseParameters(AppDataSource.getDataSource()); diff --git a/src/utils/dbHelpers.ts b/src/utils/dbHelpers.ts new file mode 100644 index 000000000..b308e38cd --- /dev/null +++ b/src/utils/dbHelpers.ts @@ -0,0 +1,198 @@ +import { + EntityTarget, + ObjectLiteral, + Repository, + SelectQueryBuilder, +} from 'typeorm'; +import { PostgresConnectionOptions } from 'typeorm/driver/postgres/PostgresConnectionOptions'; +import { AdminDataSource } from '../adminDataSource'; +import { AppDataSource } from '../orm'; + +/** + * Database Routing Helpers + * + * These utilities help control whether queries go to master or read replicas. + * + * IMPORTANT CONCEPTS: + * - With replication.defaultMode: 'slave', SELECT queries automatically go to replicas + * - INSERT, UPDATE, DELETE always go to master regardless of defaultMode + * - Transactions always use master connection + * + * USE CASES FOR FORCING MASTER: + * 1. Read-after-write scenarios (need immediate consistency) + * 2. Critical reads that need absolute latest data + * 3. Authentication/Authorization checks + * 4. Financial operations that need strong consistency + */ + +/** + * Execute a SELECT query on the master database with proper connection cleanup + * Use this for critical reads that need absolute consistency (read-after-write scenarios) + * + * @example + * const user = await queryMaster( + * User.createQueryBuilder('user').where('user.id = :id', { id: userId }) + * ); + * + * @example + * const users = await queryMaster( + * User.createQueryBuilder('user').where('user.isActive = true'), + * 'getMany' + * ); + */ +export async function queryMaster( + queryBuilder: SelectQueryBuilder, + method: + | 'getOne' + | 'getMany' + | 'getRawOne' + | 'getRawMany' + | 'getCount' = 'getOne', +): Promise { + const dataSource = AppDataSource.getDataSource(); + const queryRunner = dataSource.createQueryRunner('master'); + + try { + await queryRunner.connect(); + const result = await queryBuilder.setQueryRunner(queryRunner)[method](); + return result; + } finally { + // CRITICAL: Always release the query runner to return connection to pool + await queryRunner.release(); + } +} + +/** + * Get a repository that always uses master connection + * Useful for AdminJS or critical write operations + * + * @example + * const userRepo = getMasterRepository(User); + * const user = await userRepo.findOne({ where: { id: userId } }); + */ +export function getMasterRepository( + entity: EntityTarget, +): Repository { + // AdminDataSource always uses master + return AdminDataSource.getDataSource().getRepository(entity); +} + +/** + * Get a repository using the standard AppDataSource + * With defaultMode: 'slave', SELECTs will use replicas + * + * @example + * const projectRepo = getRepository(Project); + * const projects = await projectRepo.find(); // Uses replica + */ +export function getRepository( + entity: EntityTarget, +): Repository { + return AppDataSource.getDataSource().getRepository(entity); +} + +/** + * Execute a callback with a master query runner + * Properly manages connection lifecycle + * Use for read-after-write scenarios or when you need guaranteed master access + * + * @example + * const donation = await withMasterQueryRunner(async (queryRunner) => { + * // Write operation + * const donation = Donation.create(data); + * await queryRunner.manager.save(donation); + * + * // Read from master immediately after write + * const verified = await queryRunner.manager.findOne(Donation, { + * where: { id: donation.id } + * }); + * + * return verified; + * }); + */ +export async function withMasterQueryRunner( + callback: (queryRunner: any) => Promise, +): Promise { + const dataSource = AppDataSource.getDataSource(); + const queryRunner = dataSource.createQueryRunner('master'); + + try { + await queryRunner.connect(); + return await callback(queryRunner); + } finally { + // CRITICAL: Always release the query runner to return connection to pool + await queryRunner.release(); + } +} + +/** + * Create a transaction that always uses master + * (Transactions already use master, but this makes it explicit) + * + * @example + * await withTransaction(async (entityManager) => { + * const user = await entityManager.save(User, userData); + * const project = await entityManager.save(Project, projectData); + * return { user, project }; + * }); + */ +export async function withTransaction( + callback: (entityManager: any) => Promise, +): Promise { + const dataSource = AppDataSource.getDataSource(); + const queryRunner = dataSource.createQueryRunner(); + + await queryRunner.connect(); + await queryRunner.startTransaction(); + + try { + const result = await callback(queryRunner.manager); + await queryRunner.commitTransaction(); + return result; + } catch (error) { + await queryRunner.rollbackTransaction(); + throw error; + } finally { + await queryRunner.release(); + } +} + +/** + * Check if read replicas are configured + */ +export function hasReadReplicas(): boolean { + const dataSource = AppDataSource.getDataSource(); + const replication = (dataSource.options as PostgresConnectionOptions) + .replication as any; + + if (!replication || !replication.slaves) { + return false; + } + + return replication.slaves.length > 0; +} + +/** + * Get replication configuration info + */ +export function getReplicationInfo() { + const dataSource = AppDataSource.getDataSource(); + const replication = (dataSource.options as PostgresConnectionOptions) + .replication as any; + + if (!replication) { + return { + enabled: false, + master: null, + replicas: [], + defaultMode: null, + }; + } + + return { + enabled: true, + master: replication.master?.host || 'unknown', + replicas: (replication.slaves || []).map((s: any) => s.host), + defaultMode: replication.defaultMode || 'master', + }; +} diff --git a/test/test-replication.ts b/test/test-replication.ts new file mode 100644 index 000000000..706d4863d --- /dev/null +++ b/test/test-replication.ts @@ -0,0 +1,164 @@ +/* eslint-disable no-console */ +/** + * Test Script for Database Replication Setup + * + * Run this script to verify your replication configuration is working correctly: + * + * Usage: + * npx ts-node test-replication.ts + * + * This will test: + * 1. DataSource initialization + * 2. Replica configuration detection + * 3. Basic query routing + * 4. AdminDataSource connection + */ + +import { AdminDataSource } from '../src/adminDataSource'; +import { Project } from '../src/entities/project'; +import { User } from '../src/entities/user'; +import { AppDataSource, CronDataSource } from '../src/orm'; +import { + getMasterRepository, + getReplicationInfo, + getRepository, + hasReadReplicas, +} from '../src/utils/dbHelpers'; + +async function testReplicationSetup() { + console.log('\nšŸ” Testing Database Replication Setup\n'); + console.log('='.repeat(60)); + + try { + // Test 1: Initialize DataSources + console.log('\nšŸ“Š Test 1: Initializing DataSources...'); + + console.log(' - Initializing AppDataSource...'); + await AppDataSource.initialize(); + console.log(' āœ… AppDataSource initialized'); + + console.log(' - Initializing CronDataSource...'); + await CronDataSource.initialize(); + console.log(' āœ… CronDataSource initialized'); + + console.log(' - Initializing AdminDataSource...'); + await AdminDataSource.initialize(); + console.log(' āœ… AdminDataSource initialized'); + + // Test 2: Check Replication Configuration + console.log('\nšŸ“Š Test 2: Checking Replication Configuration...'); + + const hasReplicas = hasReadReplicas(); + console.log(` - Has Read Replicas: ${hasReplicas ? 'āœ… YES' : 'āš ļø NO'}`); + + const replicationInfo = getReplicationInfo(); + console.log(' - Replication Info:'); + console.log(` • Enabled: ${replicationInfo.enabled}`); + console.log(` • Default Mode: ${replicationInfo.defaultMode}`); + console.log(` • Master: ${replicationInfo.master}`); + console.log(` • Replicas: ${replicationInfo.replicas.length}`); + replicationInfo.replicas.forEach((replica, idx) => { + console.log(` ${idx + 1}. ${replica}`); + }); + + // Test 3: Test Query Execution + console.log('\nšŸ“Š Test 3: Testing Query Execution...'); + + // Test with standard repository (should use replica for reads) + console.log(' - Testing standard repository (Project)...'); + const projectRepo = getRepository(Project); + const projectCount = await projectRepo.count(); + console.log(` āœ… Found ${projectCount} projects`); + + // Test with master repository + console.log(' - Testing master repository (User)...'); + const userMasterRepo = getMasterRepository(User); + const userCount = await userMasterRepo.count(); + console.log(` āœ… Found ${userCount} users`); + + // Test 4: Verify AdminDataSource + console.log('\nšŸ“Š Test 4: Verifying AdminDataSource...'); + + const adminDs = AdminDataSource.getDataSource(); + console.log( + ` - AdminDataSource is initialized: ${adminDs.isInitialized ? 'āœ…' : 'āŒ'}`, + ); + console.log( + ` - AdminDataSource host: ${adminDs.options['host'] || (adminDs.options as any).replication?.master?.host}`, + ); + + // Check if AdminDataSource has replication (it shouldn't) + const hasAdminReplication = !!(adminDs.options as any).replication; + console.log( + ` - AdminDataSource uses replication: ${hasAdminReplication ? 'āŒ (Should be NO)' : 'āœ… NO'}`, + ); + + // Test 5: Connection Health Check + console.log('\nšŸ“Š Test 5: Testing Connection Health...'); + + console.log(' - Testing AppDataSource query...'); + const start1 = Date.now(); + await AppDataSource.getDataSource().query('SELECT 1'); + const time1 = Date.now() - start1; + console.log(` āœ… Query executed in ${time1}ms`); + + console.log(' - Testing AdminDataSource query...'); + const start2 = Date.now(); + await AdminDataSource.getDataSource().query('SELECT 1'); + const time2 = Date.now() - start2; + console.log(` āœ… Query executed in ${time2}ms`); + + // Summary + console.log('\n' + '='.repeat(60)); + console.log('āœ… All Tests Passed!\n'); + + if (hasReplicas) { + console.log('šŸ“Œ Summary:'); + console.log(' • Read replicas are configured and active'); + console.log(' • GraphQL queries will use read replicas for SELECT'); + console.log(' • AdminJS will use master for all operations'); + console.log(' • Write operations always use master'); + } else { + console.log('āš ļø Summary:'); + console.log(' • No read replicas configured'); + console.log(' • All queries will use master database'); + console.log(' • To enable replicas, add TYPEORM_DATABASE_HOST_READONLY'); + console.log(' and related environment variables'); + } + + console.log('\n✨ Replication setup is working correctly!\n'); + } catch (error) { + console.error('\nāŒ Error during testing:'); + console.error(error); + console.log('\nšŸ“‹ Troubleshooting:'); + console.log(' 1. Check your .env file has correct database credentials'); + console.log(' 2. Verify database servers are accessible'); + console.log(' 3. Check if read replica credentials are correct'); + console.log(' 4. Review logs above for specific error details\n'); + process.exit(1); + } finally { + // Cleanup + try { + await AppDataSource.getDataSource()?.destroy(); + await CronDataSource.getDataSource()?.destroy(); + await AdminDataSource.close(); + console.log('šŸ”Œ Connections closed\n'); + } catch (e) { + // Ignore cleanup errors + } + } +} + +// Run the test +if (require.main === module) { + testReplicationSetup() + .then(() => { + process.exit(0); + }) + .catch(error => { + console.error('Fatal error:', error); + process.exit(1); + }); +} + +export { testReplicationSetup };