Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 26 additions & 12 deletions src/domain-services/flow-link/flow-link-service.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { type FlowId } from '@unocha/hpc-api-core/src/db/models/flow';
import { type Database } from '@unocha/hpc-api-core/src/db/type';
import { Op } from '@unocha/hpc-api-core/src/db/util/conditions';
import { Cond, Op } from '@unocha/hpc-api-core/src/db/util/conditions';
import { type InstanceOfModel } from '@unocha/hpc-api-core/src/db/util/types';
import { getOrCreate } from '@unocha/hpc-api-core/src/util';
import { Service } from 'typedi';
Expand All @@ -14,26 +14,40 @@ export class FlowLinkService {
// Fetch all flow links in one go
const flowLinks = await models.flowLink.find({
where: {
parentID: {
[Op.IN]: flowIds,
},
[Cond.AND]: [
{
[Cond.OR]: [
{
parentID: {
[Op.IN]: flowIds,
},
},
{
childID: {
[Op.IN]: flowIds,
},
},
],
},
{
depth: { [Op.GTE]: 1 },
},
],
},
});

// Initialize the map with empty arrays for each flowId
const flowLinksMap = new Map<
number,
FlowId,
Array<InstanceOfModel<Database['flowLink']>>
>();

// Group flow links by parentID in one pass
// Group flow links by `parentID` and `childID` in one pass
for (const link of flowLinks) {
const flowLinksForFlow = getOrCreate(
flowLinksMap,
link.parentID,
() => []
);
flowLinksForFlow.push(link);
const childFlowLink = getOrCreate(flowLinksMap, link.childID, () => []);
const parentFlowLink = getOrCreate(flowLinksMap, link.parentID, () => []);
childFlowLink.push(link);
parentFlowLink.push(link);
}

return flowLinksMap;
Expand Down
31 changes: 14 additions & 17 deletions src/domain-services/flows/flow-search-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,6 @@ export class FlowSearchService {
]);

const promises = flows.map(async (flow) => {
const flowLink = getOrCreate(flowLinksMap, flow.id, () => []);

// Categories Map follows the structure:
// flowID: { versionID: [categories]}
// So we need to get the categories for the flow version
Expand All @@ -247,33 +245,32 @@ export class FlowSearchService {
}

let parkedParentSource: FlowParkedParentSource | null = null;
const shouldLookAfterParentSource =
flowLink.length > 0 && shouldIncludeChildrenOfParkedFlows;

if (shouldLookAfterParentSource) {
parkedParentSource = await this.flowService.getParketParents(
flow,
flowLink,
models
);
}

const childIDs: number[] =
const childIDs: FlowId[] =
flowLinksMap
.get(flow.id)
?.filter(
(flowLink) => flowLink.parentID === flow.id && flowLink.depth > 0
)
.map((flowLink) => flowLink.childID.valueOf()) ?? [];
.map((flowLink) => flowLink.childID) ?? [];

const parentIDs: number[] =
const parentLinks =
flowLinksMap
.get(flow.id)
?.filter(
(flowLink) => flowLink.childID === flow.id && flowLink.depth > 0
)
.map((flowLink) => flowLink.parentID.valueOf()) ?? [];
) ?? [];
const parentIDs: FlowId[] = parentLinks.map(
(flowLink) => flowLink.parentID
);

if (shouldIncludeChildrenOfParkedFlows) {
parkedParentSource = await this.flowService.getParkedParents(
flow,
parentLinks,
models
);
}
const parsedFlow: Flow = this.buildFlowDTO(
flow,
categoriesByVersion,
Expand Down
139 changes: 75 additions & 64 deletions src/domain-services/flows/flow-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import { type Database } from '@unocha/hpc-api-core/src/db';
import { type FlowId } from '@unocha/hpc-api-core/src/db/models/flow';
import { Op } from '@unocha/hpc-api-core/src/db/util/conditions';
import { type InstanceOfModel } from '@unocha/hpc-api-core/src/db/util/types';
import { splitIntoChunks } from '@unocha/hpc-api-core/src/util';
import { PG_MAX_QUERY_PARAMS } from '@unocha/hpc-api-core/src/util/consts';
import {
createBrandedValue,
getTableColumns,
Expand All @@ -24,7 +26,6 @@ import type {
UniqueFlowEntity,
} from './model';
import { buildSearchFlowsConditions } from './strategy/impl/utils';

@Service()
export class FlowService {
constructor(private readonly flowObjectService: FlowObjectService) {}
Expand Down Expand Up @@ -94,7 +95,6 @@ export class FlowService {

const refDirection = orderBy.direction ?? 'source';

let flowObjects = [];
let entityIDsSorted: number[] = [];

switch (entity) {
Expand Down Expand Up @@ -322,29 +322,44 @@ export class FlowService {
const entityCondKey = orderBy.entity as unknown;
const entityCondKeyFlowObjectType = entityCondKey as FlowObjectType;

flowObjects = await database.flowObject.find({
where: {
objectType: entityCondKeyFlowObjectType,
refDirection,
objectID: {
[Op.IN]: entityIDsSorted,
},
},
distinct: ['flowID', 'versionID'],
});
// Order map
const orderMap = new Map<number, number>();
for (const [index, entityID] of entityIDsSorted.entries()) {
orderMap.set(entityID, index);
}

// Instead of doing a single query that may end up on a 'Memory Error'
// we will do a progressive search
// by chunks of PG_MAX_QUERY_PARAMS - 2 => ( (2 ** 16 - 1) - 2 = 65533 )
const flowObjects = (
await Promise.all(
splitIntoChunks(entityIDsSorted, PG_MAX_QUERY_PARAMS - 2).map(
(entityIds) =>
database.flowObject.find({
where: {
objectType: entityCondKeyFlowObjectType,
refDirection,
objectID: {
[Op.IN]: entityIds,
},
},
distinct: ['flowID', 'versionID'],
})
)
)
).flat();

// Then, we need to filter the results from the flowObject table
// using the planVersions list as sorted reference
// this is because we cannot apply the order of a given list
// to the query directly
flowObjects = flowObjects
const sortedFlowObjects = flowObjects
.map((flowObject) => ({
...flowObject,
sortingKey: entityIDsSorted.indexOf(flowObject.objectID.valueOf()),
sortingKey: orderMap.get(flowObject.objectID),
}))
.sort((a, b) => a.sortingKey - b.sortingKey);

return this.mapFlowsToUniqueFlowEntities(flowObjects);
.toSorted((a, b) => (a.sortingKey ?? 0) - (b.sortingKey ?? 0));
return this.mapFlowsToUniqueFlowEntities(sortedFlowObjects);
}

private mapFlowsToUniqueFlowEntities(
Expand All @@ -359,7 +374,7 @@ export class FlowService {
);
}

async getParketParents(
async getParkedParents(
flow: FlowInstance,
flowLinkArray: Array<InstanceOfModel<Database['flowLink']>>,
models: Database
Expand All @@ -374,7 +389,6 @@ export class FlowService {
if (flowLinksParentsIDs.length === 0) {
return null;
}

const parkedCategory = await models.category.findOne({
where: {
group: 'flowType',
Expand Down Expand Up @@ -454,17 +468,18 @@ export class FlowService {
models: Database,
flowObjectFilters: FlowObjectFilterGrouped
): Promise<UniqueFlowEntity[]> {
// 1. Retrieve the parked category
const parkedCategory = await models.category.findOne({
where: {
name: 'Parked',
group: 'flowType',
},
});

if (!parkedCategory) {
throw new Error('Parked category not found');
}

// 2. Get all category references for parked flows
const categoryRefs = await models.categoryRef.find({
where: {
categoryID: parkedCategory.id,
Expand All @@ -473,91 +488,87 @@ export class FlowService {
distinct: ['objectID', 'versionID'],
});

// Build list of parent IDs from categoryRefs
const parentIDs: FlowId[] = categoryRefs.map((ref) =>
createBrandedValue(ref.objectID)
);

// 3. Retrieve flow links where the parent is among those references and depth > 0
const flowLinks = await models.flowLink.find({
where: {
depth: {
[Op.GT]: 0,
},
parentID: {
[Op.IN]: categoryRefs.map((categoryRef) =>
createBrandedValue(categoryRef.objectID)
),
},
depth: { [Op.GT]: 0 },
parentID: { [Op.IN]: parentIDs },
},
distinct: ['parentID', 'childID'],
});

// Create a reference list of parent flows from the flow links
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that you create this lookup array and pass it to progresiveSearch(), shows that it is impure method, which I don't like.

const parentFlowsRef: UniqueFlowEntity[] = flowLinks.map((flowLink) => ({
id: createBrandedValue(flowLink.parentID),
id: flowLink.parentID,
versionID: null,
}));

// Since this list can be really large in size: ~42k flow links
// This can cause a performance issue when querying the database
// and even end up with a error like:
// could not resize shared memory segment \"/PostgreSQL.2154039724\"
// to 53727360 bytes: No space left on device

// We need to do this query by chunks
// 4. Query parent flows progressively in chunks
const parentFlows = await this.progresiveSearch(
models,
parentFlowsRef,
1000,
PG_MAX_QUERY_PARAMS - 2, // Use a batch size of PG_MAX_QUERY_PARAMS - 2 to avoid hitting the limit
0,
false, // Stop on batch size
false, // Do not stop on batch size
[],
{ activeStatus: true }
);

// 5. Retrieve flow objects using the flow object filters
const flowObjectsWhere =
buildWhereConditionsForFlowObjectFilters(flowObjectFilters);

const flowObjects = await this.flowObjectService.getFlowFromFlowObjects(
models,
flowObjectsWhere
);

// Once we get the flowObjects - we need to keep only those that are present in both lists
const filteredParentFlows = parentFlows.filter((parentFlow) =>
flowObjects.some(
(flowObject) =>
flowObject.id === parentFlow.id &&
flowObject.versionID === parentFlow.versionID
// 6. Build a Set for flowObjects for fast lookup (using a composite key of id and versionID)
const flowObjectsSet = new Set(
flowObjects.map(
(flowObject) => `${flowObject.id}|${flowObject.versionID}`
)
);

// Once we have the ParentFlows whose status are 'parked'
// We keep look for the flowLinks of those flows to obtain the child flows
// that are linked to them
const childFlowsIDs: FlowId[] = [];
// 7. Filter parent flows that are present in the flowObjects list
const filteredParentFlows = parentFlows.filter((parentFlow) => {
const key = `${parentFlow.id}|${parentFlow.versionID}`;
return flowObjectsSet.has(key);
});

// 8. Build a Set of filtered parent flow IDs for quick membership checking
const filteredParentFlowIds = new Set(
filteredParentFlows.map((flow) => flow.id)
);

// 9. Extract child flow IDs from flowLinks where the parent is in the filtered set
const childFlowsIDsSet = new Set<FlowId>();
for (const flowLink of flowLinks) {
if (
filteredParentFlows.some(
(parentFlow) => parentFlow.id === flowLink.parentID
)
) {
childFlowsIDs.push(flowLink.childID);
if (filteredParentFlowIds.has(flowLink.parentID)) {
childFlowsIDsSet.add(flowLink.childID);
}
}

// 10. Retrieve child flows
const childFlows = await models.flow.find({
where: {
deletedAt: null,
activeStatus: true,
id: {
[Op.IN]: childFlowsIDs,
},
id: { [Op.IN]: childFlowsIDsSet },
},
distinct: ['id', 'versionID'],
});

// Once we have the child flows, we need to filter them
// using the flowObjectFilters
// This search needs to be also done by chunks
return childFlows.map((ref) => ({
id: createBrandedValue(ref.id),
// 11. Map child flows to UniqueFlowEntity and return the result
const result = childFlows.map((ref) => ({
id: ref.id,
versionID: ref.versionID,
}));

return result;
}

/**
Expand Down