Worker Threads: Node Parallelism
Join the DZone community and get the full member experience.
Join For FreeConcurrency vs Parallelism
Node.js has long excelled at concurrency. With the recent release of Node 13.0, Node now has a stable answer to parallelism as well.
Concurrency can be thought of as switching between async processes, which all take turns executing, and, while idle, return control back to the event loop. On the other hand, parallelism is the ability for a process to separate and run simultaneously on multiple threads. There are other solutions in JavaScript that have tried to address this problem. For an in-depth comparison, I found this article useful.
The Master Script
The master script must do 3 things
- Create the workers by referencing the js file for the worker.
const worker = new WorkerThread.Worker(join(__dirname, './worker.js'))
- Send messages to the workers initiating work. A message is just a JavaScript object, so if you need to customize the behavior of the worker, you can include any categorization, data, etc.
worker.postMessage({foo:"stuff"});
- Register responses to the actions. Any response from the worker will be a message event, returning an object. You could return a string or a more complex object with the results of whatever operations the worker has performed.
worker.on('message', (message) => { // do stuff with the response }
Now, try combining those together into a script
const { join } = require('path');
const WorkerThread = require('worker_threads');
const THREAD_COUNT = 30;
/**
* before running Modify THREAD_COUNT
*/
(async () => {
// Setup whatever data needed to run your jobs.
const users = await getAllUsers();
// Define functions to handle the different messages passed by the workers
function handleStatusReport(workerNumber, report) {
console.log(`the worker:${workerNumber} says`, report.body || report);
}
function handleWorkerFinished(worker, workerNumber, message) {
console.log(`done with ${JSON.stringify(message.body)}!`);
if (i < users.length) {
worker.postMessage(users[i]);
i += 1;
} else {
console.log(`Worker number ${workerNumber} completed working!`);
}
}
//Spin up our initial batch of workers... we will reuse these workers by sending more work as they finish
for (let j = 0; j < Math.min(THREAD_COUNT, users.length); j += 1) {
const worker = new WorkerThread.Worker(join(__dirname, './worker.js'));
console.log(`Running ${i} of ${users.length}`);
worker.postMessage(users[i]);
i += 1;
//Listen on messages from the worker
worker.on('message', (messageBody) => {
//Switch on values in message body, to support different types of message
if (messageBody.type === 'done') {
handleWorkerFinished(worker, j, messageBody);
} else {
handleStatusReport(j, messageBody);
}
});
}
})();
The Worker Script
The worker can as simple as
parentPort.on('message', async someObject => // Some function that uses the object, and can report back to the parent worker by );
The worker is not required to report back to the master; however, this is good practice to know whether or not the worker finished, failed, or, optionally, to return the results of the calculations.
parentPort.postMessage({ type: 'done', body: {key: 'value'} });
parentPort.postMessage({ type: 'log', body: {message: 'something happened'} });
The examples above have a property type
with values done
and log
. Both the name and values are arbitrary — just remember that you can exchange data between the workers and the master in this fashion. Additionally, I should note that sending a message from the worker to the master will not terminate the worker. So, one worker can send many messages during its execution.
Here is a more thorough template for a worker:
const { parentPort } = require('worker_threads'); async function process(someObject) { //Do lots of processing here... //Send a status update to the PRIMARY parentPort.postMessage({ type: 'status', body: 'still working' }); //Do more processing... //Send final message with relevant data back to the PRIMARY script parentPort.postMessage({ type: 'done', body: {key: 'value'} }); } //Register method to execute on message parentPort.on('message', async
someObject
=> process(someObject));
Additional Notes
Thread Workers have only recently become a stable feature in Node 13.0. If any error occurs while running older versions of node, it is worth making sure thread workers are enabled by running with the --experimental-worker flag.
node --experimental-worker master.js
Further Reading
Opinions expressed by DZone contributors are their own.
Comments