diff --git a/src/domain-services/flow-link/flow-link-service.ts b/src/domain-services/flow-link/flow-link-service.ts index 5c68f6d3..03a0fdce 100644 --- a/src/domain-services/flow-link/flow-link-service.ts +++ b/src/domain-services/flow-link/flow-link-service.ts @@ -1,6 +1,6 @@ import { type FlowId } from '@unocha/hpc-api-core/src/db/models/flow'; import { type Database } from '@unocha/hpc-api-core/src/db/type'; -import { Op } from '@unocha/hpc-api-core/src/db/util/conditions'; +import { Cond, Op } from '@unocha/hpc-api-core/src/db/util/conditions'; import { type InstanceOfModel } from '@unocha/hpc-api-core/src/db/util/types'; import { getOrCreate } from '@unocha/hpc-api-core/src/util'; import { Service } from 'typedi'; @@ -14,26 +14,40 @@ export class FlowLinkService { // Fetch all flow links in one go const flowLinks = await models.flowLink.find({ where: { - parentID: { - [Op.IN]: flowIds, - }, + [Cond.AND]: [ + { + [Cond.OR]: [ + { + parentID: { + [Op.IN]: flowIds, + }, + }, + { + childID: { + [Op.IN]: flowIds, + }, + }, + ], + }, + { + depth: { [Op.GTE]: 1 }, + }, + ], }, }); // Initialize the map with empty arrays for each flowId const flowLinksMap = new Map< - number, + FlowId, Array> >(); - // Group flow links by parentID in one pass + // Group flow links by `parentID` and `childID` in one pass for (const link of flowLinks) { - const flowLinksForFlow = getOrCreate( - flowLinksMap, - link.parentID, - () => [] - ); - flowLinksForFlow.push(link); + const childFlowLink = getOrCreate(flowLinksMap, link.childID, () => []); + const parentFlowLink = getOrCreate(flowLinksMap, link.parentID, () => []); + childFlowLink.push(link); + parentFlowLink.push(link); } return flowLinksMap; diff --git a/src/domain-services/flows/flow-search-service.ts b/src/domain-services/flows/flow-search-service.ts index 095f0811..5395bce8 100644 --- a/src/domain-services/flows/flow-search-service.ts +++ b/src/domain-services/flows/flow-search-service.ts @@ -223,8 +223,6 @@ export class FlowSearchService { ]); const promises = flows.map(async (flow) => { - const flowLink = getOrCreate(flowLinksMap, flow.id, () => []); - // Categories Map follows the structure: // flowID: { versionID: [categories]} // So we need to get the categories for the flow version @@ -247,33 +245,32 @@ export class FlowSearchService { } let parkedParentSource: FlowParkedParentSource | null = null; - const shouldLookAfterParentSource = - flowLink.length > 0 && shouldIncludeChildrenOfParkedFlows; - - if (shouldLookAfterParentSource) { - parkedParentSource = await this.flowService.getParketParents( - flow, - flowLink, - models - ); - } - const childIDs: number[] = + const childIDs: FlowId[] = flowLinksMap .get(flow.id) ?.filter( (flowLink) => flowLink.parentID === flow.id && flowLink.depth > 0 ) - .map((flowLink) => flowLink.childID.valueOf()) ?? []; + .map((flowLink) => flowLink.childID) ?? []; - const parentIDs: number[] = + const parentLinks = flowLinksMap .get(flow.id) ?.filter( (flowLink) => flowLink.childID === flow.id && flowLink.depth > 0 - ) - .map((flowLink) => flowLink.parentID.valueOf()) ?? []; + ) ?? []; + const parentIDs: FlowId[] = parentLinks.map( + (flowLink) => flowLink.parentID + ); + if (shouldIncludeChildrenOfParkedFlows) { + parkedParentSource = await this.flowService.getParkedParents( + flow, + parentLinks, + models + ); + } const parsedFlow: Flow = this.buildFlowDTO( flow, categoriesByVersion, diff --git a/src/domain-services/flows/flow-service.ts b/src/domain-services/flows/flow-service.ts index dc1e259a..095c0268 100644 --- a/src/domain-services/flows/flow-service.ts +++ b/src/domain-services/flows/flow-service.ts @@ -2,6 +2,8 @@ import { type Database } from '@unocha/hpc-api-core/src/db'; import { type FlowId } from '@unocha/hpc-api-core/src/db/models/flow'; import { Op } from '@unocha/hpc-api-core/src/db/util/conditions'; import { type InstanceOfModel } from '@unocha/hpc-api-core/src/db/util/types'; +import { splitIntoChunks } from '@unocha/hpc-api-core/src/util'; +import { PG_MAX_QUERY_PARAMS } from '@unocha/hpc-api-core/src/util/consts'; import { createBrandedValue, getTableColumns, @@ -24,7 +26,6 @@ import type { UniqueFlowEntity, } from './model'; import { buildSearchFlowsConditions } from './strategy/impl/utils'; - @Service() export class FlowService { constructor(private readonly flowObjectService: FlowObjectService) {} @@ -94,7 +95,6 @@ export class FlowService { const refDirection = orderBy.direction ?? 'source'; - let flowObjects = []; let entityIDsSorted: number[] = []; switch (entity) { @@ -322,29 +322,44 @@ export class FlowService { const entityCondKey = orderBy.entity as unknown; const entityCondKeyFlowObjectType = entityCondKey as FlowObjectType; - flowObjects = await database.flowObject.find({ - where: { - objectType: entityCondKeyFlowObjectType, - refDirection, - objectID: { - [Op.IN]: entityIDsSorted, - }, - }, - distinct: ['flowID', 'versionID'], - }); + // Order map + const orderMap = new Map(); + for (const [index, entityID] of entityIDsSorted.entries()) { + orderMap.set(entityID, index); + } + + // Instead of doing a single query that may end up on a 'Memory Error' + // we will do a progressive search + // by chunks of PG_MAX_QUERY_PARAMS - 2 => ( (2 ** 16 - 1) - 2 = 65533 ) + const flowObjects = ( + await Promise.all( + splitIntoChunks(entityIDsSorted, PG_MAX_QUERY_PARAMS - 2).map( + (entityIds) => + database.flowObject.find({ + where: { + objectType: entityCondKeyFlowObjectType, + refDirection, + objectID: { + [Op.IN]: entityIds, + }, + }, + distinct: ['flowID', 'versionID'], + }) + ) + ) + ).flat(); // Then, we need to filter the results from the flowObject table // using the planVersions list as sorted reference // this is because we cannot apply the order of a given list // to the query directly - flowObjects = flowObjects + const sortedFlowObjects = flowObjects .map((flowObject) => ({ ...flowObject, - sortingKey: entityIDsSorted.indexOf(flowObject.objectID.valueOf()), + sortingKey: orderMap.get(flowObject.objectID), })) - .sort((a, b) => a.sortingKey - b.sortingKey); - - return this.mapFlowsToUniqueFlowEntities(flowObjects); + .toSorted((a, b) => (a.sortingKey ?? 0) - (b.sortingKey ?? 0)); + return this.mapFlowsToUniqueFlowEntities(sortedFlowObjects); } private mapFlowsToUniqueFlowEntities( @@ -359,7 +374,7 @@ export class FlowService { ); } - async getParketParents( + async getParkedParents( flow: FlowInstance, flowLinkArray: Array>, models: Database @@ -374,7 +389,6 @@ export class FlowService { if (flowLinksParentsIDs.length === 0) { return null; } - const parkedCategory = await models.category.findOne({ where: { group: 'flowType', @@ -454,17 +468,18 @@ export class FlowService { models: Database, flowObjectFilters: FlowObjectFilterGrouped ): Promise { + // 1. Retrieve the parked category const parkedCategory = await models.category.findOne({ where: { name: 'Parked', group: 'flowType', }, }); - if (!parkedCategory) { throw new Error('Parked category not found'); } + // 2. Get all category references for parked flows const categoryRefs = await models.categoryRef.find({ where: { categoryID: parkedCategory.id, @@ -473,91 +488,87 @@ export class FlowService { distinct: ['objectID', 'versionID'], }); + // Build list of parent IDs from categoryRefs + const parentIDs: FlowId[] = categoryRefs.map((ref) => + createBrandedValue(ref.objectID) + ); + + // 3. Retrieve flow links where the parent is among those references and depth > 0 const flowLinks = await models.flowLink.find({ where: { - depth: { - [Op.GT]: 0, - }, - parentID: { - [Op.IN]: categoryRefs.map((categoryRef) => - createBrandedValue(categoryRef.objectID) - ), - }, + depth: { [Op.GT]: 0 }, + parentID: { [Op.IN]: parentIDs }, }, distinct: ['parentID', 'childID'], }); + // Create a reference list of parent flows from the flow links const parentFlowsRef: UniqueFlowEntity[] = flowLinks.map((flowLink) => ({ - id: createBrandedValue(flowLink.parentID), + id: flowLink.parentID, versionID: null, })); - // Since this list can be really large in size: ~42k flow links - // This can cause a performance issue when querying the database - // and even end up with a error like: - // could not resize shared memory segment \"/PostgreSQL.2154039724\" - // to 53727360 bytes: No space left on device - - // We need to do this query by chunks + // 4. Query parent flows progressively in chunks const parentFlows = await this.progresiveSearch( models, parentFlowsRef, - 1000, + PG_MAX_QUERY_PARAMS - 2, // Use a batch size of PG_MAX_QUERY_PARAMS - 2 to avoid hitting the limit 0, - false, // Stop on batch size + false, // Do not stop on batch size [], { activeStatus: true } ); + // 5. Retrieve flow objects using the flow object filters const flowObjectsWhere = buildWhereConditionsForFlowObjectFilters(flowObjectFilters); - const flowObjects = await this.flowObjectService.getFlowFromFlowObjects( models, flowObjectsWhere ); - // Once we get the flowObjects - we need to keep only those that are present in both lists - const filteredParentFlows = parentFlows.filter((parentFlow) => - flowObjects.some( - (flowObject) => - flowObject.id === parentFlow.id && - flowObject.versionID === parentFlow.versionID + // 6. Build a Set for flowObjects for fast lookup (using a composite key of id and versionID) + const flowObjectsSet = new Set( + flowObjects.map( + (flowObject) => `${flowObject.id}|${flowObject.versionID}` ) ); - // Once we have the ParentFlows whose status are 'parked' - // We keep look for the flowLinks of those flows to obtain the child flows - // that are linked to them - const childFlowsIDs: FlowId[] = []; + // 7. Filter parent flows that are present in the flowObjects list + const filteredParentFlows = parentFlows.filter((parentFlow) => { + const key = `${parentFlow.id}|${parentFlow.versionID}`; + return flowObjectsSet.has(key); + }); + + // 8. Build a Set of filtered parent flow IDs for quick membership checking + const filteredParentFlowIds = new Set( + filteredParentFlows.map((flow) => flow.id) + ); + + // 9. Extract child flow IDs from flowLinks where the parent is in the filtered set + const childFlowsIDsSet = new Set(); for (const flowLink of flowLinks) { - if ( - filteredParentFlows.some( - (parentFlow) => parentFlow.id === flowLink.parentID - ) - ) { - childFlowsIDs.push(flowLink.childID); + if (filteredParentFlowIds.has(flowLink.parentID)) { + childFlowsIDsSet.add(flowLink.childID); } } + // 10. Retrieve child flows const childFlows = await models.flow.find({ where: { - deletedAt: null, activeStatus: true, - id: { - [Op.IN]: childFlowsIDs, - }, + id: { [Op.IN]: childFlowsIDsSet }, }, distinct: ['id', 'versionID'], }); - // Once we have the child flows, we need to filter them - // using the flowObjectFilters - // This search needs to be also done by chunks - return childFlows.map((ref) => ({ - id: createBrandedValue(ref.id), + // 11. Map child flows to UniqueFlowEntity and return the result + const result = childFlows.map((ref) => ({ + id: ref.id, versionID: ref.versionID, })); + + return result; } /**