Skip to content

Commit e170a03

Browse files
authored
Merge pull request #352 from UN-OCHA/HPC-10152
HPC-10152: Fix PostgreSQL query params limit
2 parents 054ba4b + 9ca18cb commit e170a03

File tree

3 files changed

+115
-93
lines changed

3 files changed

+115
-93
lines changed

src/domain-services/flow-link/flow-link-service.ts

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { type FlowId } from '@unocha/hpc-api-core/src/db/models/flow';
22
import { type Database } from '@unocha/hpc-api-core/src/db/type';
3-
import { Op } from '@unocha/hpc-api-core/src/db/util/conditions';
3+
import { Cond, Op } from '@unocha/hpc-api-core/src/db/util/conditions';
44
import { type InstanceOfModel } from '@unocha/hpc-api-core/src/db/util/types';
55
import { getOrCreate } from '@unocha/hpc-api-core/src/util';
66
import { Service } from 'typedi';
@@ -14,26 +14,40 @@ export class FlowLinkService {
1414
// Fetch all flow links in one go
1515
const flowLinks = await models.flowLink.find({
1616
where: {
17-
parentID: {
18-
[Op.IN]: flowIds,
19-
},
17+
[Cond.AND]: [
18+
{
19+
[Cond.OR]: [
20+
{
21+
parentID: {
22+
[Op.IN]: flowIds,
23+
},
24+
},
25+
{
26+
childID: {
27+
[Op.IN]: flowIds,
28+
},
29+
},
30+
],
31+
},
32+
{
33+
depth: { [Op.GTE]: 1 },
34+
},
35+
],
2036
},
2137
});
2238

2339
// Initialize the map with empty arrays for each flowId
2440
const flowLinksMap = new Map<
25-
number,
41+
FlowId,
2642
Array<InstanceOfModel<Database['flowLink']>>
2743
>();
2844

29-
// Group flow links by parentID in one pass
45+
// Group flow links by `parentID` and `childID` in one pass
3046
for (const link of flowLinks) {
31-
const flowLinksForFlow = getOrCreate(
32-
flowLinksMap,
33-
link.parentID,
34-
() => []
35-
);
36-
flowLinksForFlow.push(link);
47+
const childFlowLink = getOrCreate(flowLinksMap, link.childID, () => []);
48+
const parentFlowLink = getOrCreate(flowLinksMap, link.parentID, () => []);
49+
childFlowLink.push(link);
50+
parentFlowLink.push(link);
3751
}
3852

3953
return flowLinksMap;

src/domain-services/flows/flow-search-service.ts

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -223,8 +223,6 @@ export class FlowSearchService {
223223
]);
224224

225225
const promises = flows.map(async (flow) => {
226-
const flowLink = getOrCreate(flowLinksMap, flow.id, () => []);
227-
228226
// Categories Map follows the structure:
229227
// flowID: { versionID: [categories]}
230228
// So we need to get the categories for the flow version
@@ -247,33 +245,32 @@ export class FlowSearchService {
247245
}
248246

249247
let parkedParentSource: FlowParkedParentSource | null = null;
250-
const shouldLookAfterParentSource =
251-
flowLink.length > 0 && shouldIncludeChildrenOfParkedFlows;
252-
253-
if (shouldLookAfterParentSource) {
254-
parkedParentSource = await this.flowService.getParketParents(
255-
flow,
256-
flowLink,
257-
models
258-
);
259-
}
260248

261-
const childIDs: number[] =
249+
const childIDs: FlowId[] =
262250
flowLinksMap
263251
.get(flow.id)
264252
?.filter(
265253
(flowLink) => flowLink.parentID === flow.id && flowLink.depth > 0
266254
)
267-
.map((flowLink) => flowLink.childID.valueOf()) ?? [];
255+
.map((flowLink) => flowLink.childID) ?? [];
268256

269-
const parentIDs: number[] =
257+
const parentLinks =
270258
flowLinksMap
271259
.get(flow.id)
272260
?.filter(
273261
(flowLink) => flowLink.childID === flow.id && flowLink.depth > 0
274-
)
275-
.map((flowLink) => flowLink.parentID.valueOf()) ?? [];
262+
) ?? [];
263+
const parentIDs: FlowId[] = parentLinks.map(
264+
(flowLink) => flowLink.parentID
265+
);
276266

267+
if (shouldIncludeChildrenOfParkedFlows) {
268+
parkedParentSource = await this.flowService.getParkedParents(
269+
flow,
270+
parentLinks,
271+
models
272+
);
273+
}
277274
const parsedFlow: Flow = this.buildFlowDTO(
278275
flow,
279276
categoriesByVersion,

src/domain-services/flows/flow-service.ts

Lines changed: 75 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ import { type Database } from '@unocha/hpc-api-core/src/db';
22
import { type FlowId } from '@unocha/hpc-api-core/src/db/models/flow';
33
import { Op } from '@unocha/hpc-api-core/src/db/util/conditions';
44
import { type InstanceOfModel } from '@unocha/hpc-api-core/src/db/util/types';
5+
import { splitIntoChunks } from '@unocha/hpc-api-core/src/util';
6+
import { PG_MAX_QUERY_PARAMS } from '@unocha/hpc-api-core/src/util/consts';
57
import {
68
createBrandedValue,
79
getTableColumns,
@@ -24,7 +26,6 @@ import type {
2426
UniqueFlowEntity,
2527
} from './model';
2628
import { buildSearchFlowsConditions } from './strategy/impl/utils';
27-
2829
@Service()
2930
export class FlowService {
3031
constructor(private readonly flowObjectService: FlowObjectService) {}
@@ -94,7 +95,6 @@ export class FlowService {
9495

9596
const refDirection = orderBy.direction ?? 'source';
9697

97-
let flowObjects = [];
9898
let entityIDsSorted: number[] = [];
9999

100100
switch (entity) {
@@ -322,29 +322,44 @@ export class FlowService {
322322
const entityCondKey = orderBy.entity as unknown;
323323
const entityCondKeyFlowObjectType = entityCondKey as FlowObjectType;
324324

325-
flowObjects = await database.flowObject.find({
326-
where: {
327-
objectType: entityCondKeyFlowObjectType,
328-
refDirection,
329-
objectID: {
330-
[Op.IN]: entityIDsSorted,
331-
},
332-
},
333-
distinct: ['flowID', 'versionID'],
334-
});
325+
// Order map
326+
const orderMap = new Map<number, number>();
327+
for (const [index, entityID] of entityIDsSorted.entries()) {
328+
orderMap.set(entityID, index);
329+
}
330+
331+
// Instead of doing a single query that may end up on a 'Memory Error'
332+
// we will do a progressive search
333+
// by chunks of PG_MAX_QUERY_PARAMS - 2 => ( (2 ** 16 - 1) - 2 = 65533 )
334+
const flowObjects = (
335+
await Promise.all(
336+
splitIntoChunks(entityIDsSorted, PG_MAX_QUERY_PARAMS - 2).map(
337+
(entityIds) =>
338+
database.flowObject.find({
339+
where: {
340+
objectType: entityCondKeyFlowObjectType,
341+
refDirection,
342+
objectID: {
343+
[Op.IN]: entityIds,
344+
},
345+
},
346+
distinct: ['flowID', 'versionID'],
347+
})
348+
)
349+
)
350+
).flat();
335351

336352
// Then, we need to filter the results from the flowObject table
337353
// using the planVersions list as sorted reference
338354
// this is because we cannot apply the order of a given list
339355
// to the query directly
340-
flowObjects = flowObjects
356+
const sortedFlowObjects = flowObjects
341357
.map((flowObject) => ({
342358
...flowObject,
343-
sortingKey: entityIDsSorted.indexOf(flowObject.objectID.valueOf()),
359+
sortingKey: orderMap.get(flowObject.objectID),
344360
}))
345-
.sort((a, b) => a.sortingKey - b.sortingKey);
346-
347-
return this.mapFlowsToUniqueFlowEntities(flowObjects);
361+
.toSorted((a, b) => (a.sortingKey ?? 0) - (b.sortingKey ?? 0));
362+
return this.mapFlowsToUniqueFlowEntities(sortedFlowObjects);
348363
}
349364

350365
private mapFlowsToUniqueFlowEntities(
@@ -359,7 +374,7 @@ export class FlowService {
359374
);
360375
}
361376

362-
async getParketParents(
377+
async getParkedParents(
363378
flow: FlowInstance,
364379
flowLinkArray: Array<InstanceOfModel<Database['flowLink']>>,
365380
models: Database
@@ -374,7 +389,6 @@ export class FlowService {
374389
if (flowLinksParentsIDs.length === 0) {
375390
return null;
376391
}
377-
378392
const parkedCategory = await models.category.findOne({
379393
where: {
380394
group: 'flowType',
@@ -454,17 +468,18 @@ export class FlowService {
454468
models: Database,
455469
flowObjectFilters: FlowObjectFilterGrouped
456470
): Promise<UniqueFlowEntity[]> {
471+
// 1. Retrieve the parked category
457472
const parkedCategory = await models.category.findOne({
458473
where: {
459474
name: 'Parked',
460475
group: 'flowType',
461476
},
462477
});
463-
464478
if (!parkedCategory) {
465479
throw new Error('Parked category not found');
466480
}
467481

482+
// 2. Get all category references for parked flows
468483
const categoryRefs = await models.categoryRef.find({
469484
where: {
470485
categoryID: parkedCategory.id,
@@ -473,91 +488,87 @@ export class FlowService {
473488
distinct: ['objectID', 'versionID'],
474489
});
475490

491+
// Build list of parent IDs from categoryRefs
492+
const parentIDs: FlowId[] = categoryRefs.map((ref) =>
493+
createBrandedValue(ref.objectID)
494+
);
495+
496+
// 3. Retrieve flow links where the parent is among those references and depth > 0
476497
const flowLinks = await models.flowLink.find({
477498
where: {
478-
depth: {
479-
[Op.GT]: 0,
480-
},
481-
parentID: {
482-
[Op.IN]: categoryRefs.map((categoryRef) =>
483-
createBrandedValue(categoryRef.objectID)
484-
),
485-
},
499+
depth: { [Op.GT]: 0 },
500+
parentID: { [Op.IN]: parentIDs },
486501
},
487502
distinct: ['parentID', 'childID'],
488503
});
489504

505+
// Create a reference list of parent flows from the flow links
490506
const parentFlowsRef: UniqueFlowEntity[] = flowLinks.map((flowLink) => ({
491-
id: createBrandedValue(flowLink.parentID),
507+
id: flowLink.parentID,
492508
versionID: null,
493509
}));
494510

495-
// Since this list can be really large in size: ~42k flow links
496-
// This can cause a performance issue when querying the database
497-
// and even end up with a error like:
498-
// could not resize shared memory segment \"/PostgreSQL.2154039724\"
499-
// to 53727360 bytes: No space left on device
500-
501-
// We need to do this query by chunks
511+
// 4. Query parent flows progressively in chunks
502512
const parentFlows = await this.progresiveSearch(
503513
models,
504514
parentFlowsRef,
505-
1000,
515+
PG_MAX_QUERY_PARAMS - 2, // Use a batch size of PG_MAX_QUERY_PARAMS - 2 to avoid hitting the limit
506516
0,
507-
false, // Stop on batch size
517+
false, // Do not stop on batch size
508518
[],
509519
{ activeStatus: true }
510520
);
511521

522+
// 5. Retrieve flow objects using the flow object filters
512523
const flowObjectsWhere =
513524
buildWhereConditionsForFlowObjectFilters(flowObjectFilters);
514-
515525
const flowObjects = await this.flowObjectService.getFlowFromFlowObjects(
516526
models,
517527
flowObjectsWhere
518528
);
519529

520-
// Once we get the flowObjects - we need to keep only those that are present in both lists
521-
const filteredParentFlows = parentFlows.filter((parentFlow) =>
522-
flowObjects.some(
523-
(flowObject) =>
524-
flowObject.id === parentFlow.id &&
525-
flowObject.versionID === parentFlow.versionID
530+
// 6. Build a Set for flowObjects for fast lookup (using a composite key of id and versionID)
531+
const flowObjectsSet = new Set(
532+
flowObjects.map(
533+
(flowObject) => `${flowObject.id}|${flowObject.versionID}`
526534
)
527535
);
528536

529-
// Once we have the ParentFlows whose status are 'parked'
530-
// We keep look for the flowLinks of those flows to obtain the child flows
531-
// that are linked to them
532-
const childFlowsIDs: FlowId[] = [];
537+
// 7. Filter parent flows that are present in the flowObjects list
538+
const filteredParentFlows = parentFlows.filter((parentFlow) => {
539+
const key = `${parentFlow.id}|${parentFlow.versionID}`;
540+
return flowObjectsSet.has(key);
541+
});
542+
543+
// 8. Build a Set of filtered parent flow IDs for quick membership checking
544+
const filteredParentFlowIds = new Set(
545+
filteredParentFlows.map((flow) => flow.id)
546+
);
547+
548+
// 9. Extract child flow IDs from flowLinks where the parent is in the filtered set
549+
const childFlowsIDsSet = new Set<FlowId>();
533550
for (const flowLink of flowLinks) {
534-
if (
535-
filteredParentFlows.some(
536-
(parentFlow) => parentFlow.id === flowLink.parentID
537-
)
538-
) {
539-
childFlowsIDs.push(flowLink.childID);
551+
if (filteredParentFlowIds.has(flowLink.parentID)) {
552+
childFlowsIDsSet.add(flowLink.childID);
540553
}
541554
}
542555

556+
// 10. Retrieve child flows
543557
const childFlows = await models.flow.find({
544558
where: {
545-
deletedAt: null,
546559
activeStatus: true,
547-
id: {
548-
[Op.IN]: childFlowsIDs,
549-
},
560+
id: { [Op.IN]: childFlowsIDsSet },
550561
},
551562
distinct: ['id', 'versionID'],
552563
});
553564

554-
// Once we have the child flows, we need to filter them
555-
// using the flowObjectFilters
556-
// This search needs to be also done by chunks
557-
return childFlows.map((ref) => ({
558-
id: createBrandedValue(ref.id),
565+
// 11. Map child flows to UniqueFlowEntity and return the result
566+
const result = childFlows.map((ref) => ({
567+
id: ref.id,
559568
versionID: ref.versionID,
560569
}));
570+
571+
return result;
561572
}
562573

563574
/**

0 commit comments

Comments
 (0)