Python implementation of event-driven paradigm with coroutines

    This article is about how to use custom Python generators to make your own implementation of coroutines that switch on receiving events. The simplicity of the code of the resulting module will pleasantly surprise you and clarify the new and little-used features of the language that can be obtained using such generators. The article will help to understand how it is arranged in serious implementations: asyncio , tornado , etc.

    Theoretical moments and disclaimer


    The concept of coroutine has a very broad interpretation, so you should decide what characteristics they will have in our implementation:
    • They are performed jointly in one thread;
    • Execution may be interrupted to wait for a specific event;
    • Execution may resume after receiving the expected event;
    • May return the result upon completion.

    As a result, we get: event-oriented programming without callback functions and cooperative multitasking . The effect of using such a programming paradigm will be significant only for tasks that respond to uneven events. First of all, these are I / O processing tasks: network servers, user interfaces, etc. Another possible application is the task of calculating the state of characters in the game world. But categorically not suitable for tasks that produce long calculations.
    It should be clearly understood that while the running coroutine has not been interrupted to wait for the event, all the others are in a state of stop, even if the event they expected has already occurred.

    The basis of everything


    In Python, generators are a good basis for all this, if properly prepared in the literal and figurative sense. More precisely, extended generators whose APIs were finally formed in Python version 3.3. In previous versions, the return of the value (result) upon completion of the generator was not implemented and there was no convenient mechanism for calling one generator from another. Nevertheless, coroutine implementations were earlier, but due to the limitations of ordinary generators, they were not as “beautiful” as what we get. A very good article on this subject, “A Curious Course on Coroutines and Concurrency”its only drawback is that there is no updated version. Such where the implementation of coroutine in python uses the latest innovations in the language, in particular in the Enhanced Python Generators API. Below are the features of the extended generators that we need.
    The transmission of messages to the coroutine will be based on the ability to set the generator state. Copy the code below into the window of the running Python interpreter version 3.3 and higher.
    def gen_factory():
        state = None
        while True:
            print("state:", state)
            state = yield state
    gen = gen_factory()
    

    The generator is created, it must be started.
    >>> next(gen)
    state: None
    

    The initial state is received. Change the state:
    >>> gen.send("OK")
    state: OK
    'OK'
    

    We see that the state has changed and returned as a result. The following send calls will return the state they are already sending.

    Why do we need all this?


    Imagine a task: to send greetings to Petrov once every two seconds, to Ivanov once every three seconds, and to the whole world once every five seconds. In the form of Python code, you can imagine something like this:
    def hello(name, timeout):
        while True:
            sleep(timeout)
            print("Привет, {}!".format(name))
    hello("Петров", 2.0)
    hello("Иванов", 3.0)
    hello("Мир", 5.0)
    

    It looks good, but only Petrov will receive greetings. However! A small modification that does not affect the clarity of the code, but even vice versa - clarifies our idea, and this can already work as expected.
    @coroutine
    def hello(name, timeout):
        while True:
            yield from sleep(timeout)
            print("Привет, {}!".format(name))
    hello("Петров", 2.0)
    hello("Иванов", 3.0)
    hello("Мир", 5.0)
    run()
    

    The code turned out in the style of the pythonic way - it clearly illustrates the task, linear without callbacks, without unnecessary frills with objects, any comments in it are superfluous. It remains only to implement the coroutine decorator, its version of the sleep function and the run function. In the implementation, of course, it will not do without bells and whistles. But this is also a pythonic way, to hide all the magic behind the facade of library modules.

    The most interesting


    We call the module with the implementation unpretentious - concurrency, with meaning and reflects the fact that it is, in fact, the implementation of cooperative multitasking. It is clear that the decorator will have to make a generator out of a regular function and start it (make the first call to next). The yield from language construct forwards the call to the next generator. That is, the sleep function should create a generator in which you can hide all the magic. Only the code of the received event will be returned to the generator that caused it. Here, the returned result is not processed, the code here can essentially get only one result, which means that the timeout has expired. Waiting for I / O can return different types of events, for example (read / write / timeout). Moreover, generators generated by functions of type sleep can return on yield from any data type and, accordingly, their functionality may not be limited to waiting for events. The run function will launch the event dispatcher, its task is to receive the event from the outside and / or generate it inside, determine its recipient and actually send it.
    Let's start with the decorator:
    class coroutine(object):
        """Делает из функции сопрограмму на базе расширенного генератора."""
        _current = None
        def __init__(self, callable):
            self._callable = callable
        def __call__(self, *args, **kwargs):
            corogen = self._callable(*args, **kwargs)
            cls = self.__class__
            if cls._current is None:
                try:
                    cls._current = corogen
                    next(corogen)
                finally:
                    cls._current = None
            return corogen
    

    It is made in the form of a class, a typical trick, as promised, it creates and runs a generator. A construction with _current has been added in order to avoid starting the generator if the decorated function that creates it is called inside the body of another generator. In this case, the first call will be made. It will also help to figure out which generator the event should be sent to, so that it gets in a chain to the generator created by the sleep function.
    def sleep(timeout):
        """Приостанавливает выполнение до получения события "таймаут истек"."""
        corogen = coroutine._current
        dispatcher.setup_timeout(corogen, timeout)
        revent = yield
        return revent
    

    Here we see the dispatcher.setup_sleep call, this tells the event dispatcher that the generator is waiting for a timeout event after the number of seconds specified by the timeout parameter has expired.
    from collections import deque
    from time import time, sleep as sys_sleep
    class Dispatcher(object):
        """Объект реализующий диспечер событий."""
        def __init__(self):
            self._pending = deque()
            self._deadline = time() + 3600.0
        def setup_timeout(self, corogen, timeout):
            deadline = time() + timeout
            self._deadline = min([self._deadline, deadline])
            self._pending.append([corogen, deadline])
            self._pending = deque(sorted(self._pending, key=lambda a: a[1]))
        def run(self):
            """Запускает цикл обработки событий."""
            while len(self._pending) > 0:
                timeout = self._deadline - time()
                self._deadline = time() + 3600.0
                if timeout > 0:
                    sys_sleep(timeout)
                while len(self._pending) > 0:
                    if self._pending[0][1] <= time():
                        corogen, _ = self._pending.popleft()
                        try:
                            coroutine._current = corogen
                            corogen.send("timeout")
                        except StopIteration:
                            pass
                        finally:
                            coroutine._current = None
                    else:
                        break
    dispatcher = Dispatcher()
    run = lambda: dispatcher.run()
    

    The event dispatcher code is also not unusual. Where to send events is determined using the class variable coroutine._current. When the module is loaded, an instance of the class is created, in a working implementation, of course, this should be a singleton. The collections.deque class is used instead of the list, since it is faster and more useful with its popleft method. Well, that's all, in fact, and there is no special magic. All of it is in fact hidden even deeper, in the implementation of advanced Python generators. They can only be cooked correctly.

    File: concurrency.py
    # concurrency.py
    from collections import deque
    from time import time, sleep as sys_sleep
    class coroutine(object):
        """Делает из функции сопрограмму на базе расширенного генератора."""
        _current = None
        def __init__(self, callable):
            self._callable = callable
        def __call__(self, *args, **kwargs):
            corogen = self._callable(*args, **kwargs)
            cls = self.__class__
            if cls._current is None:
                try:
                    cls._current = corogen
                    next(corogen)
                finally:
                    cls._current = None
            return corogen
    def sleep(timeout):
        """Приостанавливает выполнение до получения события "таймаут истек"."""
        corogen = coroutine._current
        dispatcher.setup_timeout(corogen, timeout)
        revent = yield
        return revent
    class Dispatcher(object):
        """Объект реализующий диспечер событий."""
        def __init__(self):
            self._pending = deque()
            self._deadline = time() + 3600.0
        def setup_timeout(self, corogen, timeout):
            deadline = time() + timeout
            self._deadline = min([self._deadline, deadline])
            self._pending.append([corogen, deadline])
            self._pending = deque(sorted(self._pending, key=lambda a: a[1]))
        def run(self):
            """Запускает цикл обработки событий."""
            while len(self._pending) > 0:
                timeout = self._deadline - time()
                self._deadline = time() + 3600.0
                if timeout > 0:
                    sys_sleep(timeout)
                while len(self._pending) > 0:
                    if self._pending[0][1] <= time():
                        corogen, _ = self._pending.popleft()
                        try:
                            coroutine._current = corogen
                            corogen.send("timeout")
                        except StopIteration:
                            pass
                        finally:
                            coroutine._current = None
                    else:
                        break
    dispatcher = Dispatcher()
    run = lambda: dispatcher.run()
    


    File: sample.py
    # sample.py
    from concurency import coroutine, sleep, run
    @coroutine
    def hello(name, timeout):
        while True:
            yield from sleep(timeout)
            print("Привет, {}!".format(name))
    hello("Петров", 2.0)
    hello("Иванов", 3.0)
    hello("Мир", 5.0)
    run()
    



    Outro


    If the topic is interesting, you can continue towards implementing the expectation of I / O events with an asynchronous TCP Echo server as an example. With a real event dispatcher, implemented as a dynamic library written in another, faster than Python language.

    Also popular now: