Skip to content

Commit de153e5

Browse files
committed
Add a blog post about data processing in Node
1 parent 2ee1e95 commit de153e5

1 file changed

Lines changed: 376 additions & 0 deletions

File tree

Lines changed: 376 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,376 @@
1+
---
2+
layout: post
3+
title: Speed Up Data Processing in Node with p-limit and Worker Threads
4+
author: jarzka
5+
excerpt: >
6+
We'll explore practical strategies to accelerate a real-world Node application that reads several large JSON files from disk, processes their contents, and outputs the results as large number of smaller JSON files.
7+
tags:
8+
- Node
9+
- JavaScript
10+
- TypeScript
11+
- plimit
12+
- worker_threads
13+
---
14+
15+
When I started writing this blog post, I realized that Node.js is now a teenager — at that age I was setting up my first website with PHP and only _thinking_ about becoming a software developer. Back then, Node.js was created to let web servers use JavaScript, the same language powering websites. While Node.js is well-suited for I/O driven web servers, it faces challenges with CPU-intensive computations due to its single-threaded event loop. When heavy computations run on the main thread, they can block other events and degrade overall performance.
16+
17+
In this blog post, we'll explore practical strategies to accelerate a real-world Node application that reads several large JSON files from disk, processes their contents, and outputs the results as a large number of smaller JSON files. We'll introduce two powerful Node.js tools: `p-limit` library for managing concurrent I/O operations, and `worker_threads` for parallelizing CPU-bound tasks. By combining these techniques, we can significantly reduce processing time and make Node applications more scalable.
18+
19+
## Limiting Factors in Node
20+
21+
To make sense of how to optimize performance in Node.js, it is essential to first understand the underlying limitations. At its core, Node.js is built on the V8 JavaScript engine, which provides high-speed execution of JavaScript code, but even highly optimized code cannot match the speed of native code. This means that Node.js is hardly the fastest tool available for raw data processing, so why use it for that in the first place? Well, it is a natural choice for handling JSON-based data due to its native support for JavaScript and rich node_modules ecosystem. It's also a widely used tool and can be easily picked up by virtually any web developer.
22+
23+
Node's architecture is fundamentally event-driven and single-threaded. This works fine for most web interfaces and servers, but can be seen as a limitation for pure data processing. In simple terms, single-threaded means that only a single JavaScript event or function can be on execution at a time. Even if you put your code in a Promise-returning function, its non-blocking I/O parts are still run sequentially and block the event loop from executing other tasks. Luckily, Node.js is still efficient for handling I/O operations such as reading/writing files and handling network traffic. These operations are delegated to run by the underlying system (**libuv thread pool** to be precise) and thus do not overload Node's main event loop.
24+
25+
To optimize our Node process, we are going to maximize the performance of parallel I/O operations and also parallelize CPU-bound operations. How much of an increase can we expect? It's difficult to give a precise prediction, as it largely depends on the use case. If numbers must be given, we were able to decrease the processing time of our JSON files from 40 seconds to just 14 seconds by using these techniques.
26+
27+
## Processing Data Overview
28+
29+
Let's begin with a simple example of processing data and writing the result to the disk:
30+
31+
```js
32+
for (const file of files) {
33+
const result = await processData(file);
34+
await fs.promises.writeFile(result.path, result.data); // I/O operation, handled by libuv
35+
}
36+
```
37+
38+
Each file is processed and written to the disk one-by-one. This works and it's simple, but does not have much parallel processing going on as we are only either processing data **or** waiting for the write operation to complete. Let's improve it by running write operations in the background:
39+
40+
```js
41+
for (const file of files) {
42+
const result = await processData(file);
43+
fs.promises.writeFile(result.path, result.data); // no await here, let it run in background
44+
}
45+
```
46+
47+
Here we do not wait for file writing to complete, but schedule it and immediately continue processing the next file. This looks better from performance perspective, but error checking is still missing and we do not wait for those background operations to complete. Let's fix it:
48+
49+
```js
50+
const writePromises = [];
51+
52+
for (const file of files) {
53+
const result = await processData(file);
54+
const p = fs.promises.writeFile(result.path, result.data);
55+
writePromises.push(p);
56+
}
57+
58+
// Data processing done, wait for all write operations to successfully complete. This throws an exception if even one write failed.
59+
await Promise.all(writePromises);
60+
```
61+
62+
This could work fine, assuming `result` is small and the number of files is limited. However, we have a large number of files to process, so running this would eventually cause many files to be sent to **libuv** to be handled at once. While libuv has some limits of how many write operations it does in parallel by default, the remaining operations are still put into queue — potentially risking high memory peak. We need to act to prevent that.
63+
64+
## p-limit
65+
66+
[p-limit](https://github.com/sindresorhus/p-limit) is a lightweight library for controlling concurrency in asynchronous operations. In other words, it can be used to easily limit the number of I/O-bound promises running simultaneously. The emphasis is on the word **I/O-bound**; p-limit does not help with CPU-bound tasks since it does not magically make JavaScript multi-threaded.
67+
68+
Going back to our previous example, what we actually want to do is to run multiple file write operations in parallel and in the background, but avoid running _too many_ to cause high memory peaks. **p-limit** solves this for us by allowing to specify the maximum number of concurrently run promises.
69+
70+
The right amount concurrent operations depends on the use case and the capability of the underlying hardware. I would suggest beginning with a value of 2-6 and finding the sweet spot manually by testing with real-world data.
71+
72+
Let's take a look at how to use **p-limit**, first doing it **incorrectly**.
73+
74+
```js
75+
import plimit from "p-limit";
76+
const limit = pLimit(4); // Allow up to 4 concurrent operations
77+
const tasks = [];
78+
79+
// Naive example: assuming that files come from somewhere and `processFile` is implemented somewhere
80+
for (const file of files) {
81+
const result = await processData(file);
82+
// BAD! Memory peak can still occur here.
83+
const task = limit(() => fs.promises.writeFile(result.path, result.data));
84+
tasks.push(task);
85+
}
86+
87+
// Wait for all writes to complete
88+
await Promise.all(tasks);
89+
```
90+
91+
In this example, we are limiting the total number of simultatenous file operations, but we are NOT limiting calls to `processData`. This means that eventually all finished `result` objects would be queued to `p-limit`, potentially causing high memory peaks if the `result` object is large.
92+
93+
The can be solved by limiting both data processing and file writing:
94+
95+
```js
96+
import plimit from "p-limit";
97+
const limit = pLimit(4);
98+
const tasks = [];
99+
100+
for (const file of files) {
101+
const task = limit(async () => {
102+
const result = await processData(file);
103+
// Start writing the file, but don't await it here
104+
return fs.promises.writeFile(result.path, result.data);
105+
});
106+
107+
tasks.push(task);
108+
}
109+
110+
// Wait for all processing + writes to complete
111+
await Promise.all(tasks);
112+
```
113+
114+
This is starting to look good. Now we are allowing parallel data processing and file writing while still setting a limit of how many `result` objects can created at once. We could stop here, but I want to introduce one more improvement.
115+
116+
## p-limit with Pending Queue
117+
118+
Let's assume there are so many files to be processed that we need to divide them into multiple different processing functions. This would mean that we needed to use p-limit on multiple different places, possibly losing a track of overall concurrency and memory usage. Each `pLimit` instance only controls the tasks submitted to it, not the total number of tasks running across all instances. This can easily lead to more tasks executing simultaneously than intended.
119+
120+
To manage this safely, we could use a single global concurrency limiter that all processing functions use, ensuring the total number of active heavy tasks never exceeds your safe threshold.
121+
122+
Here is an example of introducing a centralized write queue using a single instance of **pLimit**. When data has been processed, it can be queued for writing via `enqueueWriteToFile` function. This queue is allowed to hold no more than **eight** pending write operations, setting a clear limit for memory usage in the whole application. If the pending queue is full, the caller is forced to wait until there is free capacity available. Finally, up to **four** write operations are allowed to run in parallel.
123+
124+
```ts
125+
import { promises as fs } from "fs";
126+
127+
import pLimit from "p-limit";
128+
129+
const MAX_ITEMS_IN_PENDING_QUEUE = 8; // Allow up to 8 operations to wait in the queue
130+
const writePool = pLimit(4); // Allow up to 4 concurrent operations
131+
const pendingWrites = new Set();
132+
133+
interface DataFile {
134+
path: string,
135+
data: string
136+
}
137+
138+
function waitForFreeCapacityInPendingQueue() {
139+
return new Promise((resolve) => {
140+
const check = () => {
141+
if (writePool.pendingCount < MAX_ITEMS_IN_PENDING_QUEUE) {
142+
resolve(undefined);
143+
} else {
144+
// Wait until pending queue is free
145+
setTimeout(check, 0);
146+
}
147+
};
148+
149+
check();
150+
});
151+
}
152+
153+
async function enqueueWriteToFile(file: DataFile) {
154+
// Main thread waits here if pending queue is full
155+
await waitForFreeCapacityInPendingQueue();
156+
// Schedule the write operation and continue processing
157+
const task = writePool(() => fs.writeFile(file.path, file.data));
158+
pendingWrites.add(task);
159+
task.finally(() => pendingWrites.delete(task));
160+
}
161+
162+
async function waitForFileWritesToComplete() {
163+
await Promise.all([...pendingWrites]);
164+
}
165+
166+
async function main() {
167+
for (file of importantFiles) {
168+
const processed = await processFile(file);
169+
await enqueueWriteToFile(processed)
170+
}
171+
172+
for (file of moreImportantFiles) {
173+
const processed = await processFile(file);
174+
await enqueueWriteToFile(processed)
175+
}
176+
177+
await waitForFileWritesToComplete();
178+
}
179+
```
180+
181+
We have now solved the problem of not waiting for each write operation to complete and also avoiding high memory peaks by limiting the number of pending write operations. However, we are still processing data sequentially, one file at a time. To make this more efficient, we are going to look at `worker_threads` module next.
182+
183+
## Worker Threads
184+
185+
Node.js has `worker_threads` module, which enables true parallelism by running code inside Workers. This practically frees us from the single-threaded limitation of Node. Hooray, all performance problems solved! But there is a catch: running code in a worker is _kind of_ like running another Node inside Node. Each worker has its own execution context, call stack, heap memory and event loop. This makes worker threads completely separated and also requires some effort to setup.
186+
187+
Since running a task in a Worker requires its own "Node", initialising a Worker _practically_ requires initialising it with its own script. Here is a simplified example:
188+
189+
First, we create the worker file and import `processData` from the main application:
190+
191+
```ts
192+
// worker_types.ts
193+
interface DataFile {
194+
path: string,
195+
data: string
196+
}
197+
```
198+
199+
```ts
200+
// worker.ts
201+
import { workerData, parentPort } from "node:worker_threads";
202+
import { processData } from "data/process";
203+
import { DataFile } from "./worker_types";
204+
205+
async function runWorker() {
206+
// Params from main thread
207+
const params = workerData as DataFile;
208+
const result: DataFile = await processData(params);
209+
// Send result back to the main thread
210+
parentPort?.postMessage(result);
211+
}
212+
213+
// Run the worker
214+
runWorker().catch(err => {
215+
parentPort?.postMessage({ error: err.message });
216+
});
217+
```
218+
219+
We can compile this worker using `esbuild`, with a command like this: `npx esbuild src/worker.ts --bundle --outdir=dist --format=esm --platform=node"`.
220+
221+
Finally, we call the worker from the main thread:
222+
223+
```ts
224+
// main.ts
225+
import { fileURLToPath } from "node:url";
226+
import { Worker } from "node:worker_threads";
227+
import * as path from "path";
228+
229+
import { DataFile } from "worker_types";
230+
import { enqueueWriteToFile } from "io/write_queue"; // From previous p-limit example
231+
232+
const filename = fileURLToPath(import.meta.url);
233+
const dirname = path.dirname(filename);
234+
235+
function runWorker(params: DataFile) {
236+
return new Promise((resolve, reject) => {
237+
const worker = new Worker(path.join(dirname, "worker.js"), {
238+
workerData: params,
239+
});
240+
241+
worker.on("message", async (result) => {
242+
console.log("Result from worker:", result);
243+
await enqueueWriteToFile(result);
244+
resolve();
245+
});
246+
247+
worker.on("error", reject);
248+
worker.on("exit", (code) => {
249+
if (code !== 0)
250+
reject(new Error(`Worker stopped with exit code ${code}`));
251+
});
252+
});
253+
}
254+
255+
async function main() {
256+
// Assuming file comes from somewhere
257+
const workerPromise = runWorker(file);
258+
259+
console.log("Worker started, main thread is still responsive.");
260+
261+
await workerPromise;
262+
console.log("Worker finished!");
263+
}
264+
```
265+
266+
This example shows how to offload a single computation from main thread to a dedicated worker while allowing main thread to be responsive and continue handling other tasks without being blocked. While this method works, it requires us to manually initialise a new Worker every time we want to process something in parallel. Also, the worker must be tuned to handle different data processing tasks. This process can be simplified by using a library called [workerpool](https://github.com/josdejong/workerpool).
267+
268+
## Worker Pool
269+
270+
Here is a simplified example of what **workerpool** looks like. First, we create a worker file which exposes the functions it can execute:
271+
272+
```ts
273+
// worker.ts
274+
import { processData } from "data/process"
275+
import { DataFile } from "worker_types";
276+
277+
const workerpool = require('workerpool');
278+
279+
async function processDataFile(file: DataFile) {
280+
return await processData(file);
281+
}
282+
283+
// Expose what functions this worker can execute
284+
workerpool.worker({
285+
processDataFile,
286+
});
287+
```
288+
289+
```ts
290+
// main.ts
291+
const workerpool = require('workerpool');
292+
import { DataFile } from "worker_types";
293+
import { enqueueWriteToFile } from "io/write_queue";
294+
295+
const filename = fileURLToPath(import.meta.url);
296+
const dirname = path.dirname(filename);
297+
298+
// Create a worker pool with as many workers as there are logical CPU cores
299+
const pool = workerpool.pool(dirname + '/myWorker.js', {
300+
maxWorkers: require("os").cpus().length,
301+
});
302+
303+
async function main() {
304+
try {
305+
// Assuming file comes from somewhere
306+
const result = await pool.exec("processDataFile", [file]);
307+
await enqueueWriteToFile(result);
308+
} catch (err) {
309+
console.error(err);
310+
}
311+
312+
// Terminate all workers when everything is done
313+
pool.terminate();
314+
}
315+
```
316+
317+
**workerpool** takes the implementation of creating and calling workers to a higher abstraction level, simplifying code and also making it more efficient since **workerpool** keeps a pool of workers "warm", ready to be used for offloading multiple long-running computations in the background. No need to manually create them every time.
318+
319+
In the above examples, we have offloaded only a single data processing task to the worker pool, but naturally we want to process all data in parallel. However, in heavily parallelized applications, we have to be careful of not offloading and queueing too many operations to workers at once and thus (again) introducing potential memory peaks - just like we need to be careful with queuing too many I/O operations. Thus, I would not call `pool.exec` for every file at once since it _could_ cause worker pool queue to overload of file parameters.
320+
321+
### Worker Pool with Pending Queue
322+
323+
If more tasks are offloaded to workers than there are free workers available, we would need to use some kind of pending queue to force the main thread to wait for a free worker to be available. A simplified example would look something like this:
324+
325+
```ts
326+
const pendingTasks = new Set();
327+
let queuedTasksCount = 0;
328+
329+
function waitForFreeWorker() {
330+
return new Promise((resolve) => {
331+
const check = () => {
332+
if (pendingTasks.size < NUMBER_OF_WORKERS) {
333+
resolve(undefined);
334+
} else {
335+
setTimeout(check, 0);
336+
}
337+
};
338+
check();
339+
});
340+
}
341+
342+
async function enqueueWorkerTask(config: WorkerParams) {
343+
// Wait until there is a free worker available in the pool.
344+
await waitForFreeWorker();
345+
const task = pool.exec(config.task, [config.params]);
346+
pendingTasks.add(task);
347+
348+
task.finally(() => {
349+
console.log("Worker task completed:", config.task);
350+
pendingTasks.delete(task);
351+
});
352+
}
353+
354+
async function waitForWorkerTasksToComplete() {
355+
await Promise.all([...pendingTasks]);
356+
await pool.terminate();
357+
}
358+
```
359+
360+
## Tips for Working with Workers
361+
362+
Whether you decide to use **workerpool** or implement your own solutions, here are a couple of tips I wish I had known when I started working with Workers:
363+
364+
- You are free to import stuff from your main application code to your worker, but everything you import gets compiled in the final `worker.js`. This can result in large bundles and potentially unwanted code if not done carefully. For example, if you introduce a worker pool in your main application code, and you accidentally import some code from there in your worker, **every worker gets its own worker pool**. Probably not what you want. This can be avoided by doing `import { isMainThread } from "worker_threads"` and checking if `isMainThread` is `false` when ever something should never be imported by a worker.
365+
- Worker's JS code must be built separately every time you modify it and always before running tests (I'm not happy remembering spending a day figuring out why some changes in my worker code did not do anything). If you use Vitest, you can use its `setup.ts` file to run code like `execSync("npm run build:worker", { stdio: "inherit" });` to get a fresh worker every time before running tests.
366+
- Worker threads have their own isolated memory and cannot reference objects from the main thread (okay, there is `SharedArrayBuffer`, but it's a bit advanced concept). When you instantiate a worker and pass in some parameters, these parameters are **passed by value** (i.e. structured clone), not by reference. Thus, one needs to be careful about high memory peaks when using workers.
367+
368+
## Conclusion
369+
370+
Here is a quick summary of the tools we covered:
371+
372+
**p-limit** is ideal for managing concurrency in I/O-bound tasks, such as reading or writing files and making HTTP requests. By limiting the number of simultaneous operations, you prevent resource exhaustion and avoid hitting system limits. The code is only slightly more complex than standard async code, making `p-limit` a practical choice for many I/O-heavy workloads.
373+
374+
**worker_threads** is designed for CPU-bound tasks that would otherwise block the event loop, virtually any heavy data processing. Worker threads run in parallel, leveraging multiple CPU cores for true concurrency. However, using worker threads increases code complexity and memory usage, as each worker has its own execution context. Using it also requires some planning of _how_ to actually divide complex computation into workers effectively and may not be sensible if tasks depend on each other. For heavy parallel data processing, I recommend taking a look at [workerpool](https://github.com/josdejong/workerpool) library.
375+
376+
Both `p-limit` and `worker_threads` are powerful tools for improving Node.js performance, but they serve different purposes and come with trade-offs. Using either or both always increases code complexity and memory usage, potentially crashing your application if not managed carefully. Thus, it's important to evaluate whether the performance gains justify these costs for your specific use case.

0 commit comments

Comments
 (0)