Coro and another rouse-callback implementation

    CPAN has such a wonderful family of modules - Coro . These modules allow you to program in pearl using corutin.

    Small introduction

    Imagine that at any time, anywhere in the program (for example, inside the function body or at the next iteration of the cycle), you can completely save the current state and temporarily “switch” to another point in the program. Having done some useful work at this “other” point, you go back, restore the saved state, and then all the work happens as if this “switch” was not at all. Well, of course, not counting those changes in the general data that occurred at a new point. Having several "heavy" functions, each of which does not depend on the results of the rest, similar "switches" can simulate their parallel execution. That is, from the outside it will look as if the functions are performed in parallel, but, in fact, at each moment in time, a “piece” of only one of them is executed, and the size of this "piece" is determined by you. In other words, it turns out that each function executes in its own thread, all threads use only one processor core (regardless of their number in the system), and in order for each thread to get its own processor time, all of them must share this time . Due to the lack of true parallelism, all changes in shared data that have occurred in any thread become immediately available in all other threads, and since you specify the moments of switching between threads, the need for synchronization is sharply reduced (basically, it is needed to work with external resources ) and in order for each thread to get its processor time, all of them must share this time among themselves. Due to the lack of true parallelism, all changes in shared data that have occurred in any thread become immediately available in all other threads, and since you specify the moments of switching between threads, the need for synchronization is sharply reduced (basically, it is needed to work with external resources ) and in order for each thread to get its processor time, all of them must share this time among themselves. Due to the lack of true parallelism, all changes in shared data that have occurred in any thread become immediately available in all other threads, and since you specify the moments of switching between threads, the need for synchronization is sharply reduced (basically, it is needed to work with external resources )
    All this and much more can be implemented in the pearl using a family of modules called Coro. The main module of this family allows you to perform functions, or blocks of code in separate threads (I will call these threads coro threads below), and auxiliary modules add synchronization tools, message queues, integration with event loops, etc.

    Creating coro threads

    In order to create a coro stream, you must use one of the constructions:

    Coro::async { ... } @args;
    

    note the absence of a comma between the block and its arguments, or

    my $coro = new Coro(\&code_ref, @args);
    $coro->ready();
    

    In the first case Coro::async, the block passed to the function becomes the stream , and the arguments specified immediately after the block become available inside the block as arguments to the function (through @_). In the second case, you create a stream using a link to a function and arguments for this function. The second construct returns a reference to the created thread, for which the method is then called ready(). This is precisely the main difference between the second construction and the first - the created thread will be inactive until it is placed in the ready-queue (more on this below).
    In both cases, the thread "lives" as long as the corresponding function or block of code is executed. By the way, the program itself also runs in a separate coro thread - the main one.

    Switching between coro streams

    Unlike system threads, switching between which is carried out somewhere in the bowels of the operating system, it is necessary to switch manually between coro-threads. The most obvious switching points (you can come up with even more or less obvious ones):
    • Each n-th iteration of the "long-playing" cycle
    • Each blocking operation (working with a network, disk, etc.)

    In the second case, the processor is still not used until the data arrives over the network, or is not read from the disk (as well as transferred over the network, or written to disk).
    How to transfer control with Coro? In order to save the current state and interrupt the execution of the current coro-stream, it is necessary to use the static method schedule(); in addition, this method extracts the next coro-stream from the ready-queue and starts it to execute. Accordingly, in order for the coro thread calling to schedule()be able to obtain processor time again in the future, it must first put itself at the end of the ready queue using the methodready()(or any other thread should do it for him). The interrupted thread remains blocked (does not receive processor time) until it is placed at the end of the ready-queue; if this does not happen by the time other active threads complete their work, Coro will detect this and crash the program. Since the calls ready()and schedule()are used together often, Coro module provides for the convenience of the call cede(), which is the analogue of the next couple of lines:

    $Coro::current->ready();
    Coro::schedule;
    

    Let's look at an example.
    #!/usr/bin/perl
    $| = 1;
    use strict;
    use warnings;
    use Coro;
    # Создание coro-потока с помощью Coro::async
    Coro::async {
        my $thread_id = shift;
        # Устанавливаем описание для coro-потока
        $Coro::current->desc("Thread #$thread_id");
        for (my $i = 0; $i < 1_000_000; $i++) {
            if ($i % 1000 == 0) {
                print "$Coro::current->{desc} - Processed: $i items\n";
                # Помещаем текущий coro-поток в конец ready-очереди
                $Coro::current->ready();
                # Передаём управление следующему потоку из ready-очереди
                Coro::schedule();
            }
        }
    } 0;
    # Эта функция будет выполняться в отдельном coro-потоке
    sub my_thread {
        my $thread_id = shift;
        $Coro::current->desc("Thread #$thread_id");
        for (my $i = 0; $i < 1_000_000; $i++) {
            if ($i % 1000 == 0) {
                print "$Coro::current->{desc} - Processed: $i items\n";
                # Временно передаём управление следующему coro-потоку
                Coro::cede();
            }
        }
    }
    my @threads = ();
    for (my $thread_id = 1; $thread_id < 5; $thread_id++) {
        # Создаём неактивный coro-поток с помощью Coro::new()
        my $thread = new Coro(\&my_thread, $thread_id);
        # Помещаем созданный coro-поток в конец ready-очереди
        $thread->ready();
        push @threads, $thread;
    }
    while (my $thread = shift @threads) {
        # Блокируем главный coro-поток до тех пор, пока очередной coro-поток не завершится
        $thread->join();
    }
    

    Result:
    Thread # 0 - Processed: 0 items
    Thread # 1 - Processed: 0 items
    Thread # 2 - Processed: 0 items
    Thread # 3 - Processed: 0 items
    Thread # 4 - Processed: 0 items
    Thread # 0 - Processed: 1000 items
    Thread # 1 - Processed: 1000 items
    Thread # 2 - Processed: 1000 items
    Thread # 3 - Processed: 1000 items
    Thread # 4 - Processed: 1000 items
    ...
    Thread # 0 - Processed: 999000 items
    Thread # 1 - Processed: 999000 items
    Thread # 2 - Processed: 999000 items
    Thread # 3 - Processed: 999000 items
    Thread # 4 - Processed: 999000 items
    


    In the example, coro threads are created in different ways and transmit processor time to each other in different ways. All coro-streams do the same job - every 1000 iterations, report on the progress and interrupt their execution, giving the opportunity to work the rest of the coro-streams, first placing themselves at the end of the ready-queue (either explicitly or using cede()). The program continues to work until the main coro stream is completed, and the main coro stream is waiting for 4 of the 5 created coro streams to finish (the method call join()blocks the coro stream from which the call is made until coro the thread for which this method was called will not end).

    Event Loop Integration

    The above example demonstrates how coro threads share processor time, taking a break from time-consuming work. As noted above, a good reason to share processor time is also to perform blocking operations (usually I / O).
    When we are faced with the problem of effective program operation with many blocking operations, we usually solve this problem with the help of event loops. For example, we put the sockets in non-blocking mode and “hang” “watchers” on them, monitoring the readiness of the socket for writing or reading, and create a timer to interrupt timeout operations. As the events of interest to us occur, callbacks associated with the corresponding "watchers" are called from the bowels of the event cycle. As the project becomes more complex, it’s becoming more and more difficult to understand which callback, when and why it is being called. Using Coro, the situation improves markedly and the program code becomes more linear and understandable (purely my opinion).
    First of all, it should be noted that in the Coro module family there are three modules for integrating coro streams into event loops - these are Coro :: AnyEvent , Coro :: Event and Coro :: EV (the pieces of code below will be for Coro :: EV). In order to integrate the event loop into your program, you just need to start the loop itself in any coro-stream (for example, in the main one):

    Coro::async { EV::run() };
    

    For the convenience of event processing, the Coro module provides two useful functions - rouse_cb()and rouse_wait():
    • rouse_cb() generates and returns a callback, which, when called, saves the arguments passed to it and notifies Coro internals of the fact of the call
    • rouse_wait()blocks the current coro-thread until the last one created by the rouse_cb()callback function is called (you can also specify the call of which callback to wait as an argument); the function returns what was passed to the callback as arguments

    So the pieces of code below are equivalent:

    # 1. Без использования rouse_cb() и rouse_wait()
    my $timer = EV::timer(5, 5, sub {
        my ($watcher, $revents) = @_;
        print "Timer $wathcer: timeout\n";
    });
    #2. С использованием rouse_cb() и rouse_wait()
    my $timer = EV::timer(5, 5, rouse_cb());
    my ($watcher, $revents) = rouse_wait();
    print "Timer $wathcer: timeout\n";
    


    Another implementation of rouse callbacks

    The above piece of code does not convey all the power rouse_cb()and rouse_wait(), however, its understanding comes as you work on real projects. However, for myself, I found the main minus of the built-in rouse callbacks - if you save the callback returned by the function rouse_cb()and try to use it again (which is logical for cyclic operations, because why at each iteration create a new object that performs the same thing every time work?), nothing will come of it. Being called at least once, the callback retains its state and all subsequent calls rouse_wait()for this callback immediately return the previously saved arguments.
    Therefore, I decided to write my implementation of rouse-callback. In this implementation, the callback is an object, and instead of the function rouse_wait(), the method is usedwait() callback:

    my $cb = new My::RouseCallback;
    my $timer = EV::timer(5, 5, $cb);
    my ($watcher, $revents) = $cb->wait();
    print "Timer $wathcer: timeout\n";
    

    Implementation of My :: RouseCallback
    package My::RouseCallback;
    use strict;
    use warnings;
    use Coro;
    # "Хранилище" данных для всех созданных объектов My::RouseCallback
    my %STORAGE = ();
    # Создание коллбэка: my $cb = new My::RouseCallback;
    sub new {
        my ($class) = @_;
        my $context = {args => [], done => 0, coro => undef};
        my $self = bless sub {
            # Сохраняем переданные коллбэку аргументы
            $context->{args} = \@_;
            # Устанавливаем признак того, что коллбэк был вызван
            $context->{done} = 1;
            if ($context->{coro}) {
                # Активизируем ожидающий coro-поток
                $context->{coro}->ready();
            }
        }, $class;
        $STORAGE{"$self"} = $context;
        return $self;
    };
    # Ожидаем вызов коллбэка: $cb->wait();
    sub wait {
        my $self = shift;
        my $context = $STORAGE{"$self"};
        # Чтобы коллбэк знал, какой coro-поток ожидает его
        $context->{coro} = $Coro::current;
        # Блокируем текущий coro-поток до тех пор, пока не будет вызван коллбэк
        while ($context->{done} == 0) {
            Coro::schedule();
        }
        # Возвращаем переданные коллбэку аргументы и очищаем состояние
        my @args = @{ $context->{args} };
        $context->{args} = [];
        $context->{done} = 0;
        return @args;
    }
    sub DESTROY {
        my $self = shift;
        $self->();
        delete $STORAGE{"$self"};
    };
    1;
    __END__
    



    If you see the possibility of using Coro in your task, be sure to try, maybe you will like it. Study the documentation, share the knowledge gained in practice.

    PS. If you use modules from the EV and Coro families together, be careful. Both the first and second export the async () function by default. Therefore, when creating coro threads, it is always better to explicitly specify Coro :: async.

    Also popular now: