Skip to content

Commit c0912f6

Browse files
committed
[POC] Introduce Task Workers
Implementing SAP/ui5-tooling#897 Based on SAP/ui5-builder#955
1 parent 23ded2e commit c0912f6

File tree

6 files changed

+520
-3
lines changed

6 files changed

+520
-3
lines changed

lib/build/TaskRunner.js

+17-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ class TaskRunner {
2121
* @param {@ui5/project/build/ProjectBuilder~BuildConfiguration} parameters.buildConfig
2222
* Build configuration
2323
*/
24-
constructor({graph, project, log, taskUtil, taskRepository, buildConfig}) {
24+
constructor({graph, project, log, taskUtil, taskRepository, buildConfig, workDispatcher}) {
2525
if (!graph || !project || !log || !taskUtil || !taskRepository || !buildConfig) {
2626
throw new Error("TaskRunner: One or more mandatory parameters not provided");
2727
}
@@ -31,6 +31,7 @@ class TaskRunner {
3131
this._taskRepository = taskRepository;
3232
this._buildConfig = buildConfig;
3333
this._log = log;
34+
this._workDispatcher = workDispatcher;
3435

3536
this._directDependencies = new Set(this._taskUtil.getDependencies());
3637
}
@@ -198,7 +199,21 @@ class TaskRunner {
198199
}
199200

200201
if (!taskFunction) {
201-
taskFunction = (await this._taskRepository.getTask(taskName)).task;
202+
const taskInfo = await this._taskRepository.getTask(taskName);
203+
taskFunction = taskInfo.task;
204+
if (taskInfo.processors) {
205+
const workDispatcher = this._workDispatcher;
206+
params.processors = {
207+
execute: async function(processorName, params) {
208+
const processorInfo = taskInfo.processors[processorName];
209+
if (!processorInfo) {
210+
throw new Error(`Unknown processor ${processorName} requested by task ${taskName}`);
211+
}
212+
const processor = workDispatcher.getProcessor(processorInfo.path);
213+
return await processor.execute(params);
214+
}
215+
}
216+
}
202217
}
203218
return taskFunction(params);
204219
};

lib/build/helpers/BuildContext.js

+10
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import ProjectBuildContext from "./ProjectBuildContext.js";
2+
import WorkDispatcher from "./WorkDispatcher.js";
23

34
/**
45
* Context of a build process
@@ -7,6 +8,8 @@ import ProjectBuildContext from "./ProjectBuildContext.js";
78
* @memberof @ui5/project/build/helpers
89
*/
910
class BuildContext {
11+
#workDispatcher = null;
12+
1013
constructor(graph, taskRepository, { // buildConfig
1114
selfContained = false,
1215
cssVariables = false,
@@ -49,6 +52,8 @@ class BuildContext {
4952
cssVariables: cssVariables
5053
};
5154
this._projectBuildContexts = [];
55+
56+
this.#workDispatcher = WorkDispatcher.getInstance(this);
5257
}
5358

5459
getRootProject() {
@@ -71,6 +76,10 @@ class BuildContext {
7176
return this._graph;
7277
}
7378

79+
getWorkDispatcher() {
80+
return this.#workDispatcher;
81+
}
82+
7483
createProjectContext({project}) {
7584
const projectBuildContext = new ProjectBuildContext({
7685
buildContext: this,
@@ -84,6 +93,7 @@ class BuildContext {
8493
await Promise.all(this._projectBuildContexts.map((ctx) => {
8594
return ctx.executeCleanupTasks(force);
8695
}));
96+
await this.#workDispatcher.cleanup(this, force);
8797
}
8898
}
8999

lib/build/helpers/ProjectBuildContext.js

+2-1
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,8 @@ class ProjectBuildContext {
114114
taskUtil: this.getTaskUtil(),
115115
graph: this._buildContext.getGraph(),
116116
taskRepository: this._buildContext.getTaskRepository(),
117-
buildConfig: this._buildContext.getBuildConfig()
117+
buildConfig: this._buildContext.getBuildConfig(),
118+
workDispatcher: this._buildContext.getWorkDispatcher()
118119
});
119120
}
120121
return this._taskRunner;

lib/build/helpers/WorkDispatcher.js

+153
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
import workerpool from "workerpool";
2+
import os from "node:os";
3+
import {fileURLToPath} from "node:url";
4+
import {getLogger} from "@ui5/logger";
5+
import {serializeData, deserializeData, serializeResources, FsMainThreadInterface} from "./threadUtils.js";
6+
import {setTimeout as setTimeoutPromise} from "node:timers/promises";
7+
8+
const MIN_WORKERS = 2;
9+
const MAX_WORKERS = 4;
10+
const osCpus = os.cpus().length || 1;
11+
const maxWorkers = Math.max(Math.min(osCpus - 1, MAX_WORKERS), MIN_WORKERS);
12+
13+
export default class WorkDispatcher {
14+
#log = getLogger("build:helpers:WorkDispatcher");
15+
#activeBuilds = new Set();
16+
#pool;
17+
static #ensureSingleton = false;
18+
static #instance;
19+
20+
#getPool() {
21+
if (!this.#pool) {
22+
this.#log.verbose(
23+
`Creating workerpool with up to ${maxWorkers} workers (available CPU cores: ${osCpus})`
24+
);
25+
const workerPath = fileURLToPath(
26+
new URL("./threadRunner.js", import.meta.url)
27+
);
28+
this.#pool = workerpool.pool(workerPath, {
29+
workerType: "auto",
30+
maxWorkers,
31+
});
32+
}
33+
return this.#pool;
34+
}
35+
36+
constructor() {
37+
if (!WorkDispatcher.#ensureSingleton) {
38+
throw new Error(
39+
"WorkDispatcher is a singleton class. Use WorkDispatcher.getInstance()"
40+
);
41+
}
42+
}
43+
44+
static getInstance(buildRef) {
45+
if (!buildRef) {
46+
throw new Error(`A reference to the calling instance must be provided`);
47+
}
48+
if (!WorkDispatcher.#instance) {
49+
WorkDispatcher.#ensureSingleton = true;
50+
WorkDispatcher.#instance = new WorkDispatcher();
51+
WorkDispatcher.#ensureSingleton = false;
52+
}
53+
54+
WorkDispatcher.#instance.#registerActiveBuild(buildRef);
55+
56+
return WorkDispatcher.#instance;
57+
}
58+
59+
getProcessor(modulePath) {
60+
return {
61+
execute: async ({resources, options, reader}) => {
62+
const buildUpArgs = {modulePath, args: {options: await serializeData(options)}};
63+
let toTransfer;
64+
let threadMessageHandler;
65+
let fsInterfaceMainPort;
66+
67+
if (reader) {
68+
const {port1, port2} = new MessageChannel();
69+
fsInterfaceMainPort = port1;
70+
buildUpArgs.args.fsInterfaceComPort = port2;
71+
toTransfer = {transfer: [port2]};
72+
73+
threadMessageHandler = new FsMainThreadInterface(reader);
74+
threadMessageHandler.startCommunication(fsInterfaceMainPort);
75+
}
76+
77+
if (resources) {
78+
buildUpArgs.args.resources = await serializeResources(resources);
79+
}
80+
81+
const result = await this.#getPool().exec("execProcessor", [buildUpArgs], toTransfer);
82+
83+
if (reader) {
84+
threadMessageHandler.endCommunication(fsInterfaceMainPort);
85+
}
86+
87+
return deserializeData(result);
88+
}
89+
};
90+
}
91+
92+
async cleanup(buildRef, force) {
93+
const attemptPoolTermination = async () => {
94+
if (this.#activeBuilds.size && !force) {
95+
this.#log.verbose(
96+
`Pool termination canceled. There are still ${this.#activeBuilds.size} active builds`
97+
);
98+
return;
99+
}
100+
101+
this.#log.verbose(`Attempting to terminate the workerpool...`);
102+
103+
if (!this.#pool) {
104+
this.#log.verbose(
105+
"Pool termination requested, but a pool has not been initialized or has already been terminated."
106+
);
107+
return;
108+
}
109+
110+
// There are many stats that could be used, but these ones seem the most
111+
// convenient. When all the (available) workers are idle, then it's safe to terminate.
112+
// There are many stats that could be used, but these ones seem the most
113+
// convenient. When all the (available) workers are idle, then it's safe to terminate.
114+
let {idleWorkers, totalWorkers} = this.#pool.stats();
115+
while (idleWorkers !== totalWorkers && !force) {
116+
await setTimeoutPromise(100); // Wait a bit workers to finish and try again
117+
({idleWorkers, totalWorkers} = this.#pool.stats());
118+
}
119+
120+
return await this.terminateTasks(force);
121+
};
122+
123+
if (!buildRef) {
124+
throw new Error(`A reference to the calling instance must be provided`);
125+
}
126+
if (!this.#activeBuilds.has(buildRef)) {
127+
throw new Error(`The provided build reference is unknown`);
128+
}
129+
this.#activeBuilds.delete(buildRef);
130+
131+
return await attemptPoolTermination();
132+
}
133+
134+
async terminateTasks(force) {
135+
if (!this.#pool) {
136+
this.#log.verbose(
137+
"Pool termination requested, but a pool has not been initialized or has already been terminated");
138+
return;
139+
}
140+
141+
this.#activeBuilds = [];
142+
const pool = this.#pool;
143+
this.#pool = null;
144+
return await pool.terminate(force);
145+
}
146+
147+
#registerActiveBuild(instanceRef) {
148+
if (this.#activeBuilds.has(instanceRef)) {
149+
throw new Error(`Build already registered in Work Dispatcher. This should never happen`);
150+
}
151+
this.#activeBuilds.add(instanceRef);
152+
}
153+
}

lib/build/helpers/threadRunner.js

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import workerpool from "workerpool";
2+
import {FsWorkerThreadInterface, deserializeResources, serializeData, deserializeData} from "./threadUtils.js";
3+
import {getLogger} from "@ui5/logger";
4+
import {createResource} from "@ui5/fs/resourceFactory";
5+
6+
export default async function execProcessor({modulePath, args}) {
7+
const {default: moduleToExecute} = await import(modulePath);
8+
if (!moduleToExecute) {
9+
throw new Error(`No default export for module ${modulePath}`);
10+
}
11+
const methodCall = moduleToExecute;
12+
const {options, resources, fsInterfaceComPort} = args;
13+
14+
const buildUpArgs = {
15+
options: await deserializeData(options),
16+
resourceFactory: {createResource},
17+
log: getLogger(`builder:processor:${modulePath}`)
18+
};
19+
20+
if (resources) {
21+
buildUpArgs.resources = await deserializeResources(resources);
22+
}
23+
if (fsInterfaceComPort) {
24+
buildUpArgs.fs = new FsWorkerThreadInterface(fsInterfaceComPort);
25+
}
26+
27+
const result = await methodCall(buildUpArgs);
28+
29+
return serializeData(result);
30+
}
31+
32+
// Test execution via ava is never done on the main thread
33+
/* istanbul ignore else */
34+
if (!workerpool.isMainThread) {
35+
// Script got loaded through workerpool
36+
// => Create a worker and register public functions
37+
workerpool.worker({
38+
execProcessor,
39+
});
40+
}

0 commit comments

Comments
 (0)