Playing with threads in Node.JS 10.5.0

Good day



At my work, a dispute arose between me and the affiliates about the threads in the new version of Node.JS and the need to synchronize them. To begin with, we decided to choose the task of writing lines to the file in parallel. The topic with worker_threads is hot, please, under cat.

A little about the streams themselves. They are experimental technology in Node.JS 10.5.0, and in order to have access to the "worker_threads" module, you need to run our Node.JS application with the "--experimental-worker" flag. I registered this flag in the start script in the package.json file:
{
  "name": "worker-test",
  "version": "1.0.0",
  "description": "",
  "main": "app.js",
  "scripts": {
    "start": "node --max-old-space-size=4096 --experimental-worker app.js "
  },
  "author": "",
  "license": "ISC"
}

Now about the logic itself. The main thread spawns N worker threads; they all write to the file at some interval. Unlike all the examples where the main and child streams start from one file, I separated the streams into a separate one, it seems to me more clean and elegant.

Actually, the code.

The main app.js file is the entry point.

const { Worker } = require('worker_threads');
const path = require('path');
const WORKERS_NUMBER = 100;
console.log('Hello from main!');
for (var i = 1; i <= WORKERS_NUMBER ; i++) {
  const w = new Worker(path.join(__dirname, './writer-worker-app/app.js'), { workerData: { id: i } });
}

Here we simply create child streams using the Worker class and specifying the path to the start file for the stream './writer-worker-app/app.js'. When creating the stream, we transfer the self-written identifier as the workerData data.

Start file for stream ./writer-worker-app/app.js:

const { workerData, parentPort } = require('worker_threads');
const logger = require('./logger');
const id = workerData.id;
console.log(`Worker ${id} initializad.`);
while (true) {
  sendMessage();
}
function sendMessage() {
  logger.log(`Hello from worker number ${workerData.id}\r\n`);
}


Well, the simplest logger class: ./writer-worker-app/logger.js
const fs = require('fs');
function log(message) {
  return fs.appendFileSync('./my-file.txt', message);
}
module.exports = {
  log
};

When starting this application, we all hoped that in the end we would get some mess in the file and the donors would scream how needed locks with semaphores and other joys of parallel execution. But no! In the file, all lines go without interruption, except in random order: A wonderful experiment, another small victory for Noda :-) My assumption is that all synchronization occurs at the I / O level of Noda flows, but I will be glad to know the correct option in the comments. Just in case, we checked the work using not fs.appendFileSync , but fs.createWriteStream and the stream.write method . The result came out the same.

Hello from worker number 14
Hello from worker number 3
Hello from worker number 9
Hello from worker number 15
Hello from worker number 2
Hello from worker number 4
Hello from worker number 7
Hello from worker number 6
Hello from worker number 1
Hello from worker number 11





But we did not stop there.


A colleague suggested the task of synchronizing threads. For our specific example, let it be the task of sequentially writing to a file in ascending order of identifiers. First writes the first stream, then the second, then the third and so on.

To do this, I introduced another thread Manager. It was possible to get by with the main thing, but I am so pleased to create these isolated workers and build communication through messages. Before you start writing the implementation of the Stream-Manager, you need to create a communication channel between him and the writers-workers. The MessageChannel class was used for this . The instances of this class have two fields: port1 and port2 , each of which can listen and send messages to the other using the .on ('message') methodsand .postMessage () . This class was created within the framework of the “worker_threads” module for communication between threads, because usually when an object is transferred, it is simply cloned, and in an isolated thread execution environment it will be useless.

For communication between 2 flows, we must give everyone a port.

An interesting fact : at 10.5.0 it is impossible to pass the port through the constructor of the worker , you need to do this only through worker.postMessage (), and be sure to specify the port in the transferList parameter!

The thread manager itself will send commands to the writer threads in ascending order of their identifiers, and it will send the next command only after receiving a response from the writer about the successful operation.

Under-UML application diagram:


Our mutated main ./app.js file:
const { Worker, MessageChannel } = require('worker_threads');
const path = require('path');
const WORKERS_NUMBER = 100;
console.log('Main app initialized and started.');
const workersMeta = [];
for (var i = 1; i <= WORKERS_NUMBER; i++) {
  const channel = new MessageChannel();
  const worker = new Worker(path.join(__dirname, './writer-worker-app/app.js'), { workerData: { id: i } });
  workersMeta.push({ id: i, worker, channel });
}
workersMeta.forEach(({ worker, channel }) => {
  worker.postMessage({ orchestratorPort: channel.port1 }, [channel.port1]);
})
setTimeout(() => {
  const orchestrator = new Worker(path.join(__dirname, './orchestrator-worker-app/app.js'));
  const orchestratorData = workersMeta.map((meta) => ({ id: meta.id, port: meta.channel.port2 }));
  orchestrator.postMessage({ workerPorts: orchestratorData }, orchestratorData.map(w => w.port));
  console.log('All worker threads have been initialized');
}, WORKERS_NUMBER * 10);

Here we first create workers, then we send each port a port for communication with the manager (and only this way, through the constructor it is impossible to do this).

Then we create a thread manager, we send it a list of ports for communication with writer flows.
Updated : empirically, I found out that when working with streams, it is better to let them brew first (initialize as needed). For good it was necessary to listen to some answers from the flows in the style of “I'm ready!”, But I decided to go the easier way.

We will also change the behavior of the writer thread so that it sends a message only when it is told, and also returns the result when the write operation is completed:
./writer-worer-app/app.js
const { workerData, parentPort } = require('worker_threads');
const logger = require('./logger');
const id = workerData.id;
console.log(`Worker ${id} initializad.`);
parentPort.on('message', value => {
  const orchestratorPort = value.orchestratorPort;
  orchestratorPort.on('message', data => {
    if (data.command == 'write') {
      console.log(`Worker ${id} received write command`);
      sendMessage();
      sendResult(orchestratorPort);
    }
  });
  console.log(`Worker ${id} started.`);
});
function sendMessage() {
  logger.log(`Hello from worker number ${workerData.id}\r\n`);
}
function sendResult(port) {
  port.postMessage({ id, status: 'completed' });
}

We correctly initialized from the message of the parent stream, started to happen the channel of the stream manager, when we receive the command, we first write to the file, then we send the result. It should be noted that the file is written synchronously, so sendResult () is called immediately after sendMessage ().

All that remains is to write the implementation of our smart manager
./orchestrator-worker-app/app.js:
const { parentPort } = require('worker_threads');
console.log('Orchestrator initialized.')
let workerPorts;
parentPort.on('message', (value) => {
  workerPorts = value.workerPorts;
  workerPorts.forEach(wp => wp.port.on('message', handleResponse));
  console.log('Orchestrator started.');
  sendCommand(workerPorts[0]);
});
function handleResponse(status) {
  const responseWorkerId = status.id;
  let nextWorker = workerPorts.find(wp => wp.id == responseWorkerId + 1);
  if (!nextWorker) {
    nextWorker = workerPorts[0];
  }
  sendCommand(nextWorker);
}
function sendCommand(worker) {
  worker.port.postMessage({ command: 'write' });
}

We got a list of ports, ordered, for each port set a callback to the response, well, and sent the command to the first. In the callback itself, we are looking for the next writer and send a command to him. In order not to strain the system too much, the interval between the teams was set.

That's it, our multithreaded thread-management application is ready. We learned not only to generate worker-flows in Node.JS, but also to create effective ways of communication between them. In my personal opinion, the architecture of isolated threads in Node.JS with waiting and sending messages is more than convenient and promising. Thank you all for your attention.

All source code can be found here .

UPDATE


In order not to mislead readers, and also not to give unnecessary reasons to write that I cheat with timeouts, I updated the article and the repository.
Changes:
1) the intervals in the original writers are removed, now the while (true) is used in the hardcore
2) the --max-old-space-size = 4096 flag is added, just in case, because the current implementation of the streams is not very stable and I hope this helps somehow.
3) the intervals for sending messages from the thread manager have been deleted. Now the recording is non-stop.
4) a timeout was added when initializing the manager, why - it is described above.

TO DO:
1) add messages of variable length or counting the call of the logger - thanks FANAT1242
2) add a benchmark, compare the work of the first and second versions (how many lines will be written in 10 seconds, for example)

UPDATE 2


1) The logging code has been changed: now each message has a different length.
2) Writer-worker-app / app.old.js has been changed: each thread writes 1000 times, then terminates.

This was done to test the ideas of the FANAT1242 user. Messages all the same do not rewrite each other, the lines in the file are exactly 1000 * N threads.

Also popular now: