For over 5+ years we help companies reach their financial and branding goals. oDesk Software Co., Ltd is a values-driven technology agency dedicated

Gallery

Contacts

Address

108 Tran Dinh Xu, Nguyen Cu Trinh Ward, District 1, Ho Chi Minh City, Vietnam

E-Mail Address

info@odesk.me

Phone

(+84) 28 3636 7951

Hotline

(+84) 76 899 4959

Websites Development

How to work with worker threads in NodeJS

Demystifying Threads is Node.js

Often, we talk about how Node’s single-threaded nature makes Node applications easily scalable and resource-efficient. But it also makes Node an unsuitable choice for implementing CPU-intensive tasks .

But as a workaround to this problem, in its v10.5.0 release, Node introduced the concept of worker threads. Worker threads provide a mechanism to spawn new threads to handle CPU-intensive tasks in a Node program.

So, in this tutorial, we are going to give you a simplified introduction to worker threads and how to use them. To understand why worker threads are important, however, let’s first discuss why exactly Node performs poorly when handling CPU-intensive tasks.


Why Can’t Node Handle CPU Intensive Tasks?

We call the single thread running in Node programs the event loop. Node uses it to handle the following execution when an event is triggered.

But if the execution of the event is expensive, like in the case of CPU-bound and I/O bound tasks, Node can’t afford to run it on the event loop without holding up the only available thread for a long time.

So instead of waiting for the expensive operation to complete, the event loop registers the callback function attached to the event and moves on to handle the next event in the loop.

The expensive operation is offloaded to a set of auxiliary threads called the worker pool. A thread in the worker pool executes the task asynchronously and notifies the event loop once the operation is completed.

The event loop then executes the callback function registered for the operation on its thread.

Because callbacks are run on the event loop, if any of them contains CPU-intensive operations such as complex mathematical calculations used in machine learning or big data, it blocks the event loop for a considerable time. During this period, the application won’t execute any of the other tasks in the event loop including responding to requests from clients.

A situation like that drastically reduces the performance of a Node application. So, for the longest time, Node was considered as unfit for handling CPU-intensive operations.

But the introduction of worker threads provided a workaround to this problem.


How Do Worker Threads Work?

In v10, Node introduced worker threads as an experimental feature. It became stable in v12. Worker threads do not behave exactly like a traditional multithreading system because it’s not a feature built into Javascript itself.

But it allows delegating expensive tasks to separate threads instead of blocking the event loop of the application. So, how do worker threads actually work under the hood?

The responsibility of a worker thread is to run a piece of code specified by the parent or main thread. Each worker runs in isolation from other workers. However, a worker and its parent can pass messages back and forth through a message channel.

To run workers isolated from each other when Javascript doesn’t support multithreading, worker threads use a special mechanism.

We all know Node runs on top of Chrome’s V8 engine. V8 supports the creation of isolated V8 runtimes. These isolated instances, known as V8 Isolate , have their own Javascript heaps and micro-task queues.

Worker threads are run on these isolated V8 engines, each worker having its own V8 engine and event queue. In other words, when workers are active, a Node application has multiple Node instances running in the same process.

Even though Javascript doesn’t inherently support concurrency, worker threads provides a workaround to run multiple threads in a single process.


Creating and Running New Workers

In this example, we are going to execute a task for calculating the nth term of the Fibonacci sequence. It’s a CPU-intensive task that would block the single thread of our Node application if executed without worker threads, especially as the nth term increases.

We are separating our implementation to two files. The first one, app.js, is where the code executed by the main thread, including the creation of a new worker, is in. The second file, worker.js, includes the code run by the worker we create. It’s where the code for CPU-intensive Fibonacci calculation should be in.

Let’s see how we can handle the creation of new worker in the parent thread.

const {Worker} = require("worker_threads");

let num = 40;

//Create new worker
const worker = new Worker("./worker.js", {workerData: {num: num}});

//Listen for a message from worker
worker.once("message", result => {
  console.log(`${num}th Fibonacci Number: ${result}`);
});

worker.on("error", error => {
  console.log(error);
});

worker.on("exit", exitCode => {
  console.log(exitCode);
})

console.log("Executed in the parent thread");

We use the Worker class to create a new worker thread. It accepts the following arguments when creating a new Worker instance.

new Worker(filename[, options])

Here, the filename argument refers to the path to the file which contains the code that should be executed by the worker thread. Therefore, we need to pass the file path of the worker.js file.

The Worker constructor also accepts a number of options which you can refer to in the official documentation of the Worker class . We are, however, opting to use only the workerData option. Data passed through the workerData option will be available to the worker when it starts running. We can use this option to easily pass the value of “n” to the Fibonacci sequence calculator.

To receive the results when the execution is finished, the parent thread attaches a few event listeners to the worker.

In this implementation, we have chosen to listen for three events. They are:

  • message: This event triggers when the worker posts a message to the parent.
  • error: This is triggered if an error occurs when running the worker.
  • exit: This is triggered when the worker exits from execution. The exit code will be set to 0 if it exits after a process.exit() call. If the execution was terminated with worker.terminate(), the code will be 1.

In the message event, the worker uses the message channel connecting the worker and parent to send a message. We’ll see how the messaging really works in a later section.

Once the worker is created, the parent thread can continue its execution without waiting for the results. When we run the above code, the string “Executed in the parent thread” is logged to the console before the nth Fibonacci sequence is returned.

Therefore, we see an output like this.

Executed in the parent thread
40th Fibonacci Number: 102334155

Now, let’s write the code implemented by the worker inside the worker.js file.

const {parentPort, workerData} = require("worker_threads");

parentPort.postMessage(getFib(workerData.num))

function getFib(num) {
    if (num === 0) {
      return 0;
    }
    else if (num === 1) {
      return 1;
    }
    else {
      return getFib(num - 1) + getFib(num - 2);
    }
}

Here, we use the recursive getFib function to calculate the nth term of the Fibonacci sequence.

But what’s more interesting is the way we receive data from the parent through the workerData option. Through this object, the worker can access the value of “num” passed when it was created.

Then, we also use this parentPort object to post a message to the parent thread with the results from the Fibonacci calculation.


Communication Between Parent and Worker Threads

As you saw in the previous example, the parentPort object allows the worker to communicate with the parent thread.

This parentPort is an instance of MessagePort class. When either the parent or the worker sends a message using a MessagePort instance, the message is written to the message channel and triggers a “message” event to notify the receiver.

The parent can send a message to the worker using the same postMessage method.

worker.postMessage("Message from parent");

We can use the message channel to send multiple messages back and forth between the parent and worker whenever needed.


The Better Way of Handling Workers

In the previous example, the worker we created exited at the end of one Fibonacci calculation. With that implementation, if we wanted to calculate another Fibonacci number, we would have had to spawn a new worker thread.

But spawning worker threads, with independent V8 engines and event loops, is a highly resource-consuming task. Especially, following that approach in production code won’t be quite efficient.

So, instead of creating new worker threads for each Fibonacci calculation, we can reuse the same worker to carry out every calculation. Here is how this behavior is implemented.

//worker.js
const {parentPort} = require("worker_threads");

parentPort.on("message", data => {
  parentPort.postMessage({num: data.num, fib: getFib(data.num)});
});

function getFib(num) {
  if (num === 0) {
    return 0;
  }
  else if (num === 1) {
    return 1;
  }
  else {
    return getFib(num - 1) + getFib(num - 2);
  }
}

In worker.js, we set the worker to listen to messages from its parent.

The parent, instead of passing data through the workerData option, chooses to pass data with the messages.

//app.js

const {Worker} = require("worker_threads");

//Create new worker
const worker = new Worker("./worker.js");

//Listen for a message from worker
worker.on("message", result => {
  console.log(`${result.num}th Fibonacci Number: ${result.fib}`);
});

worker.on("error", error => {
  console.log(error);
});

worker.postMessage({num: 40});
worker.postMessage({num: 12});

Now, we can use the same worker to run multiple CPU-intensive operations.

It’s important to note that data passed through postMessage method, similar to data passed as workerData, are cloned using the structured cloning algorithm before sending them to the worker. This prevents race conditions that occur when two threads modify the same object.


Sharing Memory Between Parent and Worker

Node Worker Threads allow applications to share memory between parent and worker threads similar to a traditional multithreaded system.

Sharing memory is more efficient than cloning data and passing them back and forth between the parent and worker threads.

To share memory between the threads, we use a SharedArrayBuffer .

Sharing memory between two threads like this could result in race conditions in the program. Race conditions occur when two threads try to read and write to the same memory location at the same time.

We can use Atomics to solve this problem. Atomics ensures that one read or write operation is finished before starting the next operation. It prevents the occurrence of race conditions because, now, only one thread gets to access the shared memory at a given time.

Let’s see how to implement memory sharing with Atomics and SharedArrayBuffer.

const {Worker} = require("worker_threads");

let nums = [21, 33, 15, 40];

//get size of the array buffer with int32 size buffer for each element in the array
const size = Int32Array.BYTES_PER_ELEMENT*nums.length;

//create the buffer for the shared array
const sharedBuffer = new SharedArrayBuffer(size);
const sharedArray = new Int32Array(sharedBuffer);

nums.forEach((num, index) => {
  Atomics.store(sharedArray, index, num);
})

//Create new worker
const worker = new Worker("./worker.js");

//Listen for a message from worker
worker.on("message", result => {
  console.log(`${result.num}th Fibonacci Number: ${result.fib}`);
});

worker.on("error", error => {
  console.log(error);
});

worker.postMessage({nums: sharedArray});

With this implementation, both the parent and worker threads can read data from and write data to the shared array object.

The code executed by the worker slightly changes because now we are passing an array of numbers instead of an individual number.

parentPort.on("message", data => {
  data.nums.forEach(num => {
    parentPort.postMessage({num: num, fib: getFib(num)});
  });
})

The worker immediately sends a message to the parent for every Fibonacci number it calculates without waiting for the entire process to complete.

In this implementation, even though we use the postMessage method to send data to the worker, it doesn’t clone data before sending them to the worker. Instead, it passes a reference to the shared array object.

Parallel processing solutions Node provided before Worker Threads, such as Cluster and Child Process, do not have the ability to share memory between threads. This gives Worker Threads an edge over these solutions.


Thread Worker Pool

As we discussed before, creating a worker is a costly task. That’s why we decided to reuse the same worker without letting it exit after one task execution. We can take this one step further and create a pool of workers waiting to execute a task.

When we have only a single worker, the application has to wait until the worker is freed to accept a new task. It delays the application’s response time to other queued requests. We can avoid this by using a pool of threads.

Then, the pool can provide us an inactive worker from the number of available workers to carry out the tasks without delay.

Unfortunately, Worker Threads doesn’t have native support for worker pools yet. So, we have to rely on a third-party library to make the task easier for us. We are going to use the node-worker-threads-pool npm package.

Here’s how we can create a static worker pool (i.e. fixed number of workers) using this package.

const {StaticPool} = require("node-worker-threads-pool");

let num = 40;

//Create a static worker pool with 8 workers
const pool = new StaticPool({
  size: 8,
  task: "./worker.js"
});

//Get a worker from the pool and execute the task
pool.exec({num: num}).then(result => {
  console.log(`${result.num}th Fibonacci Number: ${result.fib}`);
});

When we run the pool.exec() function, the pool provides an inactive worker to run our task.

The worker’s code doesn’t have to go through any changes to support the worker pool.

const {parentPort} = require("worker_threads");

parentPort.on("message", data => {
  parentPort.postMessage({num: data.num, fib: getFib(data.num)});
})

function getFib(num) {
  if (num === 0) {
    return 0;
  }
  else if (num === 1) {
    return 1;
  }
  else {
    return getFib(num - 1) + getFib(num - 2);
  }
}

Summary

In this tutorial, we discussed how to work with Node.js Worker Threads to create new threads to run CPU-intensive tasks. Offloading these tasks to a separate thread helps to run our Node application without a significant performance drop even when handling CPU-intensive tasks.

Even though Node doesn’t support multithreading in the traditional way, Worker Threads provides a good workaround to this problem. So, if you were under the impression that Node applications can’t have more than one thread, it’s time to bin that notion and give Worker Threads a try.

Source: livecodestream

Author

oDesk Software

Leave a comment