Perl Asynchronous Multithreaded Worker Pool

image

In the work of a web service, and indeed many other systems, the need to perform various background tasks is often encountered. To do this, they write scripts - vorkers - that take a list of available tasks and begin to perform them - at some speed and in some sequence.

Clear business, it is good when all tasks are carried out quickly and without delay.

To speed up the implementation of tasks, it is desirable to solve two problems:

  • To teach the worker not to wait for each individual stage of the task to complete (asynchrony)
  • To teach a worker to perform several tasks simultaneously (multithreading) (disclaimer: in fact, the term “multithreading” is used here to mean “multiprocessing”)

In this article, we will consider a worker implementation that will be both asynchronous and multi-threaded.

AnyEvent Module


For programming in asynchronous mode in Pearl there is an excellent AnyEvent module .

Just in case, it should be said that in fact AnyEvent is a wrapper over other low-level asynchronous modules. Just as DBI is a wrapper and universal interface to various databases, AnyEvent is a wrapper and universal interface to various implementations of asynchronous engines.

There are a huge number of various extensions for AnyEvent, including an extension for writing multi-threaded applications - the AnyEvent :: Fork :: Pool module .

The AnyEvent :: Fork :: Pool module provides an easy way to create a pool of workers that will process tasks in asynchronous multithreaded mode.

Script


Consider the anyevent_pool.pl script:

#!/usr/bin/perl
use strict;
use warnings;
use AnyEvent::Fork::Pool;
# Модуль воркера
my $mod = 'Worker';
# Функция воркера
my $sub = 'work';
# Определить количество ядер в системе
my $cpus = AnyEvent::Fork::Pool::ncpu 1;
# Создать пул воркеров
my $pool = AnyEvent::Fork
  ->new
  ->require ($mod)
  ->AnyEvent::Fork::Pool::run(
      "${mod}::$sub",         # Модуль::Функция - рабочая функция воркера
      init => "${mod}::init", # Модуль::init - функция инициализации воркера
      max  => $cpus,          # Количество воркеров в пуле
      idle => 0,              # Количество воркеров при простое
      load => 1,              # Размер очереди воркера
  );
# Поставить пулу задачи
for my $str (qw{q2 rtr4 ui3 asdg5}) {
  $pool->($str, sub {
      print "result: @_\n";
  });
};
AnyEvent->condvar->recv;

Despite the small volume, this script is a full-fledged asynchronous multi-threaded application.

Let's take it apart.

Variables


# Модуль воркера
my $mod = 'Worker';
# Функция воркера
my $sub = 'work';

These variables define the connection between the pool and the code that will perform specific background tasks. The pool is one at all, but tasks can be different. These variables indicate to the pool which code (which function from which module) you want to run to perform a specific task.

For example, you may have a Text module for processing text, and the module has length and trim functions. And you may also have an Image module, which can have resize and crop functions. Poole does not care what your functions do and how they are arranged. You just need to tell the pool which module they are in and what they are called, and the pool will execute them.

Important! The worker module does not need to be connected in the script through the “use Worker”. The pool itself will automatically load the worker module, you only need to correctly indicate the module name in the variable.

Number of Cores


# Определить количество ядер в системе
my $cpus = AnyEvent::Fork::Pool::ncpu 1;

For multi-threaded tasks, it is desirable to know how many cores are in the system. It is advisable that the number of threads that you will start equal the number of cores. If there are fewer threads, some cores will be idle in vain, if there are more threads, some threads will queue and instead of speeding up, scheduling losses will result.

If for some reason the number of cores could not be determined, then the value specified manually will be used. In this case, it is 1.

Pool


# Создать пул воркеров
my $pool = AnyEvent::Fork
  ->new
  ->require ($mod)
  ->AnyEvent::Fork::Pool::run(
      "${mod}::$sub",         # Модуль::Функция - рабочая функция воркера
      init => "${mod}::init", # Модуль::init - функция инициализации воркера
      max  => $cpus,          # Количество воркеров в пуле
      idle => 0,              # Количество воркеров при простое
      load => 1,              # Размер очереди воркера
  );

Explanation of the parameters:

  • The work function of the worker must always be indicated by the first parameter. This is the very function of the module itself, which we indicated in the first two “tuning” variables $ mod and $ sub. This is the only required parameter.
  • init - If your worker needs initialization, then in this parameter you can specify the name of the initializing function. In this case, the name of the function is indicated as “init”, since this is the usual name for such a function, but, in principle, you can specify any other name.
  • max - This parameter sets the number of threads that the pool will start. This is where you should specify the previously determined number of cores in the system (but if you want, you can specify any number if you know what you are doing).
  • idle - the number of workers who will wait "at a low start" is indicated here. The larger this number (but not more than the max parameter) - the faster the pool will respond to a new incoming task, but the more there will be useless waiting (and consuming resources) processes.
  • load - How many tasks will be given to each worker without waiting for the execution of the previous ones. The value depends heavily on the situation - in some cases, less is better, in some it is better more. All other things being equal, greater importance should increase the efficiency of the pool (wholesale - cheaper).

There are also other parameters that I do not consider here. They are highly specific and rarely required. A complete list of parameters can be found in the module documentation.

Pooling tasks


# Поставить пулу задачи
for my $str (qw{q2 rtr4 ui3 asdg5}) {
  $pool->($str, sub {
      print "result: @_\n";
  });
};

You can pass an arbitrary number of parameters to the pool, but the last parameter should be a callback. A callback is an anonymous function that will be called after the worker has completed the task. The worker's results will be transferred to this function.

In other words, this function is the receiver of the results of the $ sub function. Everything that the $ sub function returns will be passed as arguments to the callback function. Conditionally, this relationship can be written something like this - “callback ($ sub)”.

In our case, the callback function simply prints everything it receives.

The $ str variable is, in fact, the very task that the worker must perform. In our case, this is just one line (more precisely, 4 lines launched in a loop). The lines here have no deep meaning, I just called the cat to walk on the keyboard.

Depending on the situation, instead of a string, there can be anything - a file name, an identifier for a record in the database, a mathematical expression, a link to a complex data structure ... in short - anything. The pool does not matter what it will be, it does not process this value. The pool simply passes this value to the worker, but he must already know what to do with it.

Engine launch


AnyEvent->condvar->recv;

This line tells the AnyEvent module that you need to start the event engine and continue to work endlessly.

At this point, the script will loop. The above example does not have a way to stop and exit the endless cycle of processing tasks. The question of conditional exit from the AnyEvent cycle is more general, and here I want to consider only a special case of using the pool. You can read about the conditional exit from the cycle here .

Worker himself


Now the question arises - where, in fact, is the worker himself? Where is the code that executes the work directly?

This code is placed in a separate module, which we specified in the $ mod variable.

Here is the code for the Worker module:

package Worker;
use strict;
use warnings;
my $file;
sub init {
  open $file, '>>', 'file.txt';
  my $q = select($file);
  $|=1;
  select($q);
  return;
}
sub work {
  my ($str) = @_;
  for (1..length($str)) {
      print $file "$$ $str\n";
      sleep 1;
  };
  return $str, length($str);
}
1;

As you can see, there are two functions in the module - init and work.

The init function initializes the worker. In our case, the function opens a log file, into which the results of the work function work will be displayed. As mentioned above - the init function is optional, in our case I made it just for clarity.

The work function is the main function. This is the same working function that was set in the $ sub variable. It is in this function that all the work associated with the implementation of a specific task is performed.

In our case, the function does the simplest job - calculates the length of the string. For a more visual demonstration of the work of the worker, I added a loop with a second delay to the function, which outputs the line to the log as many times as there are letters in the line.

Please note - the function returns two values ​​- the string itself and its length. It is these two values ​​that will be transferred to the callback specified at the stage of setting the tasks to the pool (and in the callback, as mentioned above, these values ​​will simply be printed).

Here, in fact, is the whole code.

We start a pool


Now run our pool and see what happens:

image

Here we see the results of the pool. You may notice that the order of output is different from the order of the lines specified in the loop in the script. The reason is clear - the lines have different lengths, so the workers process the lines at different speeds. The simpler the task, the faster it is completed.

Now let's look not just at the results, but also at the work process of the workers. To do this, in the second window, run tail for the log file:

image

Please note - the results are mixed up, since the tasks are performed simultaneously. Process identifiers are visible on the left - we see that 4 processes are involved. I have 4 cores in the system, so all 4 tasks are simultaneously performed.

And finally, look at the process table:

image

This is how the process tree of our pool looks.

The script comes first in the list, then the pool manager (yes, there can be several pools), then the pool manager, and finally the workers.

If you are not too lazy and compare the process identifiers, you can see that the identifiers of the workers coincide with the identifiers in the log file.

Literature



Also popular now: