Node.js Developer Tools Job queue

    When implementing the back-end of web applications and mobile applications, even the simplest ones, it has become customary to use such tools as: databases, mail (smtp) server, redis server. The set of tools used is constantly expanding. For example, message queues, judging by the number of installations of the amqplib package (650 thousand installations per week), are used on a par with relational databases (mysql package 460 thousand installations per week and pg 800 thousand installations per week).

    Today I want to talk about job queues, which are so far used an order of magnitude less, although the need for them arises, in almost all real projects

    So, job queues allow you to perform some task asynchronously, in fact, perform a function with the given input parameters and at the set time.

    Depending on the parameters, the task can be performed:

    • immediately after adding to the queue of jobs;
    • once at a set time;
    • many times on schedule.

    Job queues allow you to transfer parameters to a job that is being run, track and re-run jobs that have failed, and set a limit on the number of jobs that are running simultaneously.

    The vast majority of applications on Node.js are associated with the development of a REST-API for web and mobile applications. Reducing the execution time of the REST-API is important for comfortable work of the user with the application. At the same time, a call to the REST-API can initiate lengthy and / or resource-intensive operations. For example, after making a purchase, you must send the user a push message to the mobile application, or send a request to make a purchase on the CRM REST-API. These queries can be performed asynchronously. How to do it right if you don’t have a tool for working with job queues? For example, you can send a message to the message queue, start a worker who will read these messages and perform the necessary work based on these messages.

    In fact, this is what job queues do. However, if you look closely, job queues have several fundamental differences from the message queue. Firstly, messages (static) are put in the message queue, and job queues involve some kind of work (function call). Secondly, the job queue implies the presence of some processor (worker) that will perform the given work. In this case, additional functionality is needed. The number of processor processors should be transparently scaled in case of increased load. On the other hand, it is necessary to limit the number of simultaneously running tasks on one processor-worker in order to smooth out peak loads and prevent denial of service. This shows that there is a need for a tool that could run asynchronous jobs,

    Using message queues, it is relatively simple to implement a job queue that runs immediately after a job is queued. But often it is required to complete the task once at a set time or according to a schedule. For these tasks, a number of packages are widely used that implement the cron logic in linux. In order not to be unfounded, I will say that the node-cron package has 480 thousand installations per week, node-schedule - 170 thousand installations per week.

    Using node-cron is, of course, more convenient than the ascetic setInterval (), but personally, I have encountered a number of problems when using it. If to express a general drawback, this is the lack of control over the number of simultaneously executed tasks (this stimulates peak loads: increasing the load slows down the work of tasks, slowing down the tasks increases the number of simultaneously executed tasks, which in turn loads the system even more), the inability to run node to increase productivity -cron on several cores (in this case, all tasks are independently executed on each core) and the lack of tools to track and restart tasks that have completed Xia with an error.

    I hope that I have shown that the need for such a tool as the job queue is on a par with such tools as databases. And such funds have appeared, although they are not yet widely used. I will list the most popular of them:

    Package nameNumber of installations per weekNumber of likes
    kue291908753
    bee queueno information1431
    agenda254595488
    bull562325909


    Today I will consider the use of the bull package, which I work with myself. Why did I choose this particular package (although I do not impose my choice on others). At that moment, when I started looking for a convenient implementation of the message queue, the bee-queue project was already stopped. The kue implementation, according to the benchmarks given in the bee-queue repository, lagged far behind other implementations and, in addition, did not contain the means to run periodically executed tasks. The agenda project implements queues with storing in the mongodb database. This is a big plus for some cases, if you need super-reliability when placing tasks in the queue. However, not only is this a decisive factor. Naturally, I tested all the library's endurance options, generating a large number of tasks in the queue, and still could not get uninterrupted work from the agenda.

    Therefore, I settled on bull which implements a convenient API, with sufficient speed and scalability, since the bull package uses a redis server as a backend. In particular, you can use a cluster of redis servers.

    When creating a queue, it is very important to select the optimal parameters for the job queue. There are many parameters, and the value of some of them did not reach me right away. After numerous experiments, I settled on the following parameters:

    const Bull = require('bull');
    const redis = {
      host: 'localhost',
      port: 6379,
      maxRetriesPerRequest: null,
      connectTimeout: 180000
      };
    const defaultJobOptions = {
      removeOnComplete: true,
      removeOnFail: false,
    };
    const limiter = {
      max: 10000,
      duration: 1000,
      bounceBack: false,
    };
    const settings = {
      lockDuration: 600000, // Key expiration time for job locks.
      stalledInterval: 5000, // How often check for stalled jobs (use 0 for never checking).
      maxStalledCount: 2, // Max amount of times a stalled job will be re-processed.
      guardInterval: 5000, // Poll interval for delayed jobs and added jobs.
      retryProcessDelay: 30000, // delay before processing next job in case of internal error.
      drainDelay: 5, // A timeout for when the queue is in drained state (empty waiting for jobs).
    };
    const bull = new Bull('my_queue', { redis, defaultJobOptions, settings, limiter });
    module.exports = { bull };
    

    In trivial cases, there is no need to create many queues, since in each queue you can specify names for different tasks, and associate a processor-worker with each name:

    const { bull } = require('../bull');
    bull.process('push:news', 1, `${__dirname}/push-news.js`);
    bull.process('push:status', 2, `${__dirname}/push-status.js`);
    ...
    bull.process('some:job', function(...args) { ... });
    

    I use the opportunity that goes to the bull “out of the box” - to parallelize processor-workers on several cores. To do this, the second parameter sets the number of cores on which the processor-worker will be launched, and in the third parameter, the file name with the definition of the job processing function. If such a feature is not needed, you can simply pass a callback function as the second parameter.

    The task is queued by a call to the add () method, to which the queue name and object are passed in the parameters, which will be subsequently passed to the task handler. For example, in an ORM hook, after creating an entry with new news, I can asynchronously send a push message to all clients:

      afterCreate(instance) {
          bull.add('push:news', _.pick(instance, 'id', 'title', 'message'), options);
      }
    

    The event handler accepts in the parameters the task object with the parameters passed to the add () method and the done () function, which must be called to confirm the task is completed or to inform that the task ended with an error:

    const { firebase: { admin } } = require('../firebase');
    const { makePayload } = require('./makePayload');
    module.exports = (job, done) => {
      const { id, title, message } = job.data;
      const data = {
        id: String(id),
        type: 'news',
      };
      const payloadRu = makePayload(title.ru, message.ru, data);
      const payloadEn = makePayload(title.en, message.en, data);
      return Promise.all([
        admin.messaging().send({ ...payloadRu, condition: "'news' in topics && 'ru' in topics" }),
        admin.messaging().send({ ...payloadEn, condition: "'news' in topics && 'en' in topics" }),
      ])
        .then(response => done(null, response))
        .catch(done);
    };
    

    To view the status of the job queue, you can use the arena-bull tool:

    const Arena = require('bull-arena');
    const redis = {
        host: 'localhost',
        port: 6379,
        maxRetriesPerRequest: null,
        connectTimeout: 180000
      };
    const arena = Arena({
      queues: [
        {
          name: 'my_gueue',
          hostId: 'My Queue',
          redis,
        },
      ],
    },
    {
      basePath: '/',
      disableListen: true,
    });
    module.exports = { arena };
    

    And finally, a little life hack. As I said, bull uses a redis server as a backend. When the redis server is restarted, the likelihood of the job disappearing is very small. But knowing the fact that system administrators sometimes can simply “clear the radish cache”, while deleting all tasks in particular, I was primarily concerned about periodically running tasks, which in this case stopped forever. In this regard, I found an opportunity to resume such periodic tasks:

    const cron = '*/10 * * * * *';
    const { bull } = require('./app/services/bull');
    bull.getRepeatableJobs()
      .then(jobs => Promise.all(_.map(jobs, (job) => {
        const [name, cron] = job.key.split(/:{2,}/);
        return bull.removeRepeatable(name, { cron });
      })))
      .then(() => bull.add('check:status', {}, { priority: 1, repeat: { cron } }));
    setInterval(() => bull.add('check:status', {}, { priority: 1, repeat: { cron } }), 60000);
    

    That is, the task is first excluded from the queue, and then set again, and all this (alas) by setInterval (). Actually, without such a life hack, I probably would not have decided to use periodic tasks on bull.

    apapacy@gmail.com
    July 3, 2019

    Also popular now: