Skip to content

Commit dc047d2

Browse files
committed
🐞 Fix issue when filter dataset is large
Now we query using batches Improved sorting via map
1 parent dd5c77b commit dc047d2

File tree

1 file changed

+64
-54
lines changed

1 file changed

+64
-54
lines changed

src/domain-services/flows/flow-service.ts

Lines changed: 64 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { type Database } from '@unocha/hpc-api-core/src/db';
22
import { type FlowId } from '@unocha/hpc-api-core/src/db/models/flow';
33
import { Op } from '@unocha/hpc-api-core/src/db/util/conditions';
44
import { type InstanceOfModel } from '@unocha/hpc-api-core/src/db/util/types';
5+
import { splitIntoChunks } from '@unocha/hpc-api-core/src/util';
56
import {
67
createBrandedValue,
78
getTableColumns,
@@ -322,28 +323,39 @@ export class FlowService {
322323
const entityCondKey = orderBy.entity as unknown;
323324
const entityCondKeyFlowObjectType = entityCondKey as FlowObjectType;
324325

325-
flowObjects = await database.flowObject.find({
326-
where: {
327-
objectType: entityCondKeyFlowObjectType,
328-
refDirection,
329-
objectID: {
330-
[Op.IN]: entityIDsSorted,
331-
},
332-
},
333-
distinct: ['flowID', 'versionID'],
334-
});
326+
// Order map
327+
const orderMap = new Map<number, number>();
328+
for (const [index, entityID] of entityIDsSorted.entries()) {
329+
orderMap.set(entityID, index);
330+
}
335331

332+
const chunks = splitIntoChunks(entityIDsSorted, 1000);
333+
// Instead of doing a single query that may end up on a 'Memory Error'
334+
// we will do a progressive search
335+
// by chunks of 1000
336+
for (const chunk of chunks) {
337+
const flowObjectsBatch = await database.flowObject.find({
338+
where: {
339+
objectType: entityCondKeyFlowObjectType,
340+
refDirection,
341+
objectID: {
342+
[Op.IN]: chunk,
343+
},
344+
},
345+
distinct: ['flowID', 'versionID'],
346+
});
347+
flowObjects.push(...flowObjectsBatch);
348+
}
336349
// Then, we need to filter the results from the flowObject table
337350
// using the planVersions list as sorted reference
338351
// this is because we cannot apply the order of a given list
339352
// to the query directly
340353
flowObjects = flowObjects
341354
.map((flowObject) => ({
342355
...flowObject,
343-
sortingKey: entityIDsSorted.indexOf(flowObject.objectID.valueOf()),
356+
sortingKey: orderMap.get(flowObject.objectID.valueOf()),
344357
}))
345-
.sort((a, b) => a.sortingKey - b.sortingKey);
346-
358+
.sort((a, b) => (a.sortingKey ?? 0) - (b.sortingKey ?? 0));
347359
return this.mapFlowsToUniqueFlowEntities(flowObjects);
348360
}
349361

@@ -454,17 +466,18 @@ export class FlowService {
454466
models: Database,
455467
flowObjectFilters: FlowObjectFilterGrouped
456468
): Promise<UniqueFlowEntity[]> {
469+
// 1. Retrieve the parked category
457470
const parkedCategory = await models.category.findOne({
458471
where: {
459472
name: 'Parked',
460473
group: 'flowType',
461474
},
462475
});
463-
464476
if (!parkedCategory) {
465477
throw new Error('Parked category not found');
466478
}
467479

480+
// 2. Get all category references for parked flows
468481
const categoryRefs = await models.categoryRef.find({
469482
where: {
470483
categoryID: parkedCategory.id,
@@ -473,91 +486,88 @@ export class FlowService {
473486
distinct: ['objectID', 'versionID'],
474487
});
475488

489+
// Build list of parent IDs from categoryRefs
490+
const parentIDs = categoryRefs.map((ref) =>
491+
createBrandedValue(ref.objectID)
492+
);
493+
494+
// 3. Retrieve flow links where the parent is among those references and depth > 0
476495
const flowLinks = await models.flowLink.find({
477496
where: {
478-
depth: {
479-
[Op.GT]: 0,
480-
},
481-
parentID: {
482-
[Op.IN]: categoryRefs.map((categoryRef) =>
483-
createBrandedValue(categoryRef.objectID)
484-
),
485-
},
497+
depth: { [Op.GT]: 0 },
498+
parentID: { [Op.IN]: parentIDs },
486499
},
487500
distinct: ['parentID', 'childID'],
488501
});
489502

503+
// Create a reference list of parent flows from the flow links
490504
const parentFlowsRef: UniqueFlowEntity[] = flowLinks.map((flowLink) => ({
491505
id: createBrandedValue(flowLink.parentID),
492506
versionID: null,
493507
}));
494508

495-
// Since this list can be really large in size: ~42k flow links
496-
// This can cause a performance issue when querying the database
497-
// and even end up with a error like:
498-
// could not resize shared memory segment \"/PostgreSQL.2154039724\"
499-
// to 53727360 bytes: No space left on device
500-
501-
// We need to do this query by chunks
509+
// 4. Query parent flows progressively in chunks
502510
const parentFlows = await this.progresiveSearch(
503511
models,
504512
parentFlowsRef,
505513
1000,
506514
0,
507-
false, // Stop on batch size
515+
false, // Do not stop on batch size
508516
[],
509517
{ activeStatus: true }
510518
);
511519

520+
// 5. Retrieve flow objects using the flow object filters
512521
const flowObjectsWhere =
513522
buildWhereConditionsForFlowObjectFilters(flowObjectFilters);
514-
515523
const flowObjects = await this.flowObjectService.getFlowFromFlowObjects(
516524
models,
517525
flowObjectsWhere
518526
);
519527

520-
// Once we get the flowObjects - we need to keep only those that are present in both lists
521-
const filteredParentFlows = parentFlows.filter((parentFlow) =>
522-
flowObjects.some(
523-
(flowObject) =>
524-
flowObject.id === parentFlow.id &&
525-
flowObject.versionID === parentFlow.versionID
528+
// 6. Build a Set for flowObjects for fast lookup\n (using a composite key of id and versionID)\n
529+
const flowObjectsSet = new Set(
530+
flowObjects.map(
531+
(flowObject) => `${flowObject.id}|${flowObject.versionID}`
526532
)
527533
);
528534

529-
// Once we have the ParentFlows whose status are 'parked'
530-
// We keep look for the flowLinks of those flows to obtain the child flows
531-
// that are linked to them
532-
const childFlowsIDs: FlowId[] = [];
535+
// 7. Filter parent flows that are present in the flowObjects list
536+
const filteredParentFlows = parentFlows.filter((parentFlow) => {
537+
const key = `${parentFlow.id}|${parentFlow.versionID}`;
538+
return flowObjectsSet.has(key);
539+
});
540+
541+
// 8. Build a Set of filtered parent flow IDs for quick membership checking
542+
const filteredParentFlowIds = new Set(
543+
filteredParentFlows.map((flow) => flow.id)
544+
);
545+
546+
// 9. Extract child flow IDs from flowLinks where the parent is in the filtered set
547+
const childFlowsIDsSet = new Set<FlowId>();
533548
for (const flowLink of flowLinks) {
534-
if (
535-
filteredParentFlows.some(
536-
(parentFlow) => parentFlow.id === flowLink.parentID
537-
)
538-
) {
539-
childFlowsIDs.push(flowLink.childID);
549+
if (filteredParentFlowIds.has(flowLink.parentID)) {
550+
childFlowsIDsSet.add(createBrandedValue(flowLink.childID));
540551
}
541552
}
542553

554+
// 10. Retrieve child flows
543555
const childFlows = await models.flow.find({
544556
where: {
545557
deletedAt: null,
546558
activeStatus: true,
547-
id: {
548-
[Op.IN]: childFlowsIDs,
549-
},
559+
id: { [Op.IN]: [...childFlowsIDsSet] },
550560
},
551561
distinct: ['id', 'versionID'],
552562
});
553563

554-
// Once we have the child flows, we need to filter them
555-
// using the flowObjectFilters
556-
// This search needs to be also done by chunks
557-
return childFlows.map((ref) => ({
564+
// 11. Map child flows to UniqueFlowEntity and return the result
565+
const result = childFlows.map((ref) => ({
558566
id: createBrandedValue(ref.id),
559567
versionID: ref.versionID,
560568
}));
569+
570+
return result;
561571
}
562572

563573
/**

0 commit comments

Comments
 (0)