-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprocess_pool.js
More file actions
287 lines (243 loc) · 7.38 KB
/
Copy pathprocess_pool.js
File metadata and controls
287 lines (243 loc) · 7.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
/**
* Process Pool Manager
* Maintains a pool of reusable Python processes to avoid memory leaks
*
* Problem: Spawning new Python process per request = OOM at 50+ concurrent users
* Solution: Keep N persistent processes, queue requests to them
* Benefit: 100x memory efficiency, stable under load
*/
const { spawn } = require('child_process');
const path = require('path');
class ProcessPool {
constructor(poolSize = 4, scriptPath = 'M3U_Matrix_Pro.py') {
this.poolSize = poolSize;
this.scriptPath = scriptPath;
this.processes = [];
this.requestQueue = [];
this.processingRequest = new Array(poolSize).fill(false);
this.initialized = false;
}
/**
* Initialize the process pool
* Creates N persistent Python processes
*/
async init() {
if (this.initialized) return;
console.log(`[ProcessPool] Initializing ${this.poolSize} Python processes...`);
for (let i = 0; i < this.poolSize; i++) {
const processIndex = i;
const proc = this._createProcess(i);
this.processes.push({
id: i,
process: proc,
busy: false,
requestCount: 0
});
// Wait for process to be ready
await this._waitForProcessReady(proc, i);
}
this.initialized = true;
console.log(`[ProcessPool] ✓ Pool initialized with ${this.poolSize} processes`);
}
/**
* Create a single Python process
* Each process runs in "server mode" listening for requests
*/
_createProcess(index) {
const proc = spawn('python3', [
this.scriptPath,
'--process-pool',
String(index)
], {
stdio: ['pipe', 'pipe', 'pipe'],
detached: false
});
proc.on('error', (error) => {
console.error(`[ProcessPool] Process ${index} error:`, error.message);
});
proc.on('exit', (code) => {
console.warn(`[ProcessPool] Process ${index} exited with code ${code}`);
});
// Monitor stderr for errors
proc.stderr.on('data', (data) => {
const message = data.toString().trim();
if (message && !message.includes('debug')) {
console.warn(`[ProcessPool] Process ${index} stderr:`, message);
}
});
return proc;
}
/**
* Wait for process to signal it's ready
*/
_waitForProcessReady(proc, index) {
return new Promise((resolve) => {
const timeout = setTimeout(() => {
console.warn(`[ProcessPool] Process ${index} initialization timeout`);
resolve(); // Assume ready after timeout
}, 2000);
proc.stdout.once('data', (data) => {
clearTimeout(timeout);
console.log(`[ProcessPool] Process ${index} ready`);
resolve();
});
});
}
/**
* Execute a command using the process pool
* Returns promise that resolves with output
*/
async execute(args) {
if (!this.initialized) {
await this.init();
}
return new Promise((resolve, reject) => {
const request = { args, resolve, reject, timestamp: Date.now() };
// Try to find available process
const processIndex = this._findAvailableProcess();
if (processIndex !== -1) {
// Process available, execute immediately
this._executeOnProcess(processIndex, request);
} else {
// Queue request, will execute when process available
this.requestQueue.push(request);
}
});
}
/**
* Find an available process
*/
_findAvailableProcess() {
for (let i = 0; i < this.processes.length; i++) {
if (!this.processes[i].busy) {
return i;
}
}
return -1;
}
/**
* Execute request on specific process
*/
_executeOnProcess(processIndex, request) {
const pool = this.processes[processIndex];
pool.busy = true;
pool.requestCount++;
const args = request.args;
const argString = JSON.stringify(args) + '\n';
let output = '';
let errorOutput = '';
// Set up output handlers
const onData = (chunk) => {
output += chunk.toString();
};
const onError = (chunk) => {
errorOutput += chunk.toString();
};
const onClose = () => {
// Cleanup
pool.process.stdout.removeListener('data', onData);
pool.process.stderr.removeListener('data', onError);
pool.busy = false;
// Process result
try {
if (errorOutput.trim()) {
request.reject(new Error(errorOutput));
} else if (output.trim()) {
request.resolve(output.trim());
} else {
request.reject(new Error('No output from process'));
}
} catch (e) {
request.reject(e);
}
// Process next queued request
if (this.requestQueue.length > 0) {
const nextRequest = this.requestQueue.shift();
this._executeOnProcess(processIndex, nextRequest);
}
// Log pool status every 100 requests
if (pool.requestCount % 100 === 0) {
console.log(`[ProcessPool] Process ${processIndex}: ${pool.requestCount} requests handled`);
}
};
pool.process.stdout.once('data', onData);
pool.process.stderr.once('data', onError);
// Timeout after 30 seconds
const timeout = setTimeout(() => {
pool.process.stdout.removeListener('data', onData);
pool.process.stderr.removeListener('data', onError);
pool.busy = false;
request.reject(new Error('Process execution timeout'));
// Process next request
if (this.requestQueue.length > 0) {
const nextRequest = this.requestQueue.shift();
this._executeOnProcess(processIndex, nextRequest);
}
}, 30000);
pool.process.stdout.once('close', () => {
clearTimeout(timeout);
onClose();
});
// Send request to process (assuming Python script reads from stdin)
pool.process.stdin.write(argString);
}
/**
* Get pool statistics
*/
getStats() {
return {
poolSize: this.poolSize,
busyProcesses: this.processes.filter(p => p.busy).length,
totalRequests: this.processes.reduce((sum, p) => sum + p.requestCount, 0),
queuedRequests: this.requestQueue.length,
avgRequests: Math.round(this.processes.reduce((sum, p) => sum + p.requestCount, 0) / this.poolSize)
};
}
/**
* Shutdown the pool gracefully
*/
async shutdown() {
console.log('[ProcessPool] Shutting down...');
for (const pool of this.processes) {
pool.process.kill();
}
this.processes = [];
this.initialized = false;
console.log('[ProcessPool] ✓ Shutdown complete');
}
}
/**
* Simple fallback: execute without pool (for testing)
* Creates new process per request (slower, but works)
*/
async function executeProcessSimple(args) {
return new Promise((resolve, reject) => {
const python = spawn('python3', args);
let output = '';
let errorOutput = '';
python.stdout.on('data', (data) => {
output += data.toString();
});
python.stderr.on('data', (data) => {
errorOutput += data.toString();
});
const timeout = setTimeout(() => {
python.kill();
reject(new Error('Process execution timeout'));
}, 30000);
python.on('close', (code) => {
clearTimeout(timeout);
if (code === 0) {
resolve(output.trim());
} else {
reject(new Error(errorOutput || `Process exited with code ${code}`));
}
});
python.on('error', reject);
});
}
// Export both pool and simple executor
module.exports = {
ProcessPool,
executeProcessSimple
};