Event machine on guard of the life cycle

    Disclaimer: This article describes an unobvious solution to an unobvious problem. Before rushingeggs put it into practice, I recommend reading the article to the end and think twice.

    but_why


    Hello! When working with code, we often have to deal with state . One of these cases is the life cycle of objects. Managing an object with several possible states can be a very nontrivial task. Add asynchronous execution to this and the task becomes more complicated. There is an effective and natural solution. In this article I will talk about the event machine and how to implement it in Go.


    Why manage the condition?


    For a start, we will define the concept itself. The simplest example of state is files and various connections. You can not just take and read the file. It must first be opened, and at the enddesirablebe sure to close. So, the current action depends on the result of the previous action: the reading depends on the discovery. The saved result is the state.


    The main problem with the state is complexity. Any state automatically complicates the code. You have to store the results of actions in memory and add various checks to the logic. That is why stateless architects are so attracted to programmers - no one wantstroublesdifficulties. If the results of your actions do not affect the execution logic, you do not need the state.


    However, there is one property that makes one reckon with difficulties. The state requires you to follow a specific order of action. In general, such situations should be avoided, but this is not always possible. An example is the life cycle of program objects. Thanks to good state management, it is possible to obtain predictable behavior of objects with a complex life cycle.


    Now let's figure out how to do it cool .


    Automatic as a way to solve problems


    AK74


    When people talk about states, finite automata immediately come to mind. It is logical, because an automaton is the most natural way to control a state.


    I will not delve into the theory of automata , information on the Internet is more than enough.

    If you look for examples of finite automata for Go, you will definitely meet Rob Pike’s lexer (Rob Pike). A great example of an automaton in which the input alphabet is processed data. This means that the state transitions are caused by the text that the lexer processes. Elegant solution to a specific problem.


    The main thing to understand is that an automaton is the solution of a strictly specific problem. Therefore, before considering it as a remedy for all problems, you must fully understand the task. Specifically, the entity you want to manage:


    • states - life cycle;
    • events - what exactly causes the transition to each state;
    • work result - output;
    • execution mode (synchronous / asynchronous);
    • main usage scenarios.

    Lexer is beautiful, but he changes his state only because of the data that he himself processes. And how to be in a situation when the user calls the transitions? This is where the event automat can help out.


    Real example


    To make it clearer, I will analyze an example from the library phono.


    For full immersion in context, you can read the introductory article . This is not necessary for this topic, but will help to better understand what we manage.

    And what is manageable?


    It is based phonoon the DSP pipeline. It consists of three stages of processing. Each stage can include from one to several components:


    pipe_diagram


    1. pipe.Pump(Eng. Pump) - mandatory stage of obtaining sound, always only one component.
    2. pipe.Processor(eng. handler) - optional stage of sound processing , from 0 to N components.
    3. pipe.Sink(English sink) - a mandatory stage of sound transmission , from 1 to N components.

    Actually we will manage the life cycle of the pipeline.


    Life cycle


    Here is the state diagram pipe.Pipe.


    pipe_lifecycle


    Transitions caused by the internal execution logic are in italics . In boldface - transitions caused by events. The diagram shows that the states are divided into 2 types:


    • states of rest - readyand paused, from them it is possible to pass only on an event
    • active states - runningand pausing, transitions in the event and due to the execution logic

    Before a detailed analysis of the code, a vivid example of the use of all states:


    // PlayWav воспроизводит .wav файл с помощью portaudio устройства по-умолчанию.funcPlayWav(wavFile string)error {
        bufferSize := phono.BufferSize(512) // размер буфера для передачи данных
        w, err := wav.NewPump(wavFile, bufferSize) // создаем wav pumpif err != nil {
            return err
        }
        pa := portaudio.NewSink( // создаем portaudio sink
            bufferSize,
            w.WavSampleRate(),
            w.WavNumChannels(),
        )
        p := pipe.New(          // создаем pipe.Pipe с исходным состоянием ready
            w.WavSampleRate(),
            pipe.WithPump(w),
            pipe.WithSinks(pa),
        )
        p.Run()                 // переход в состояние running с помощью p.Run()
        errc := p.Pause()       // переход в состояние pausing с помощью p.Pause()
        err = pipe.Wait(errc)   // ожидание перехода в состояние pausedif err != nil {
            return err
        }
        errc = p.Resume()       // переход в состояние running с помощью p.Resume()
        err = pipe.Wait(errc)   // ожидание перехода в состояние readyif err != nil {
            return err
        }
        return pipe.Wait(p.Close()) // освобождение ресурсов по окончанию работы
    }

    Now about everything in order.


    All source code is available in the repository .

    States and events


    Let's start with the most important thing.


    // state определяет одно из возможных состояний конвейера.type state interface {
        listen(*Pipe, target) (state, target)           // ожидание нового состояния
        transition(*Pipe, eventMessage) (state, error)  // функция перехода 
    }
    // idleState состояние покоя. Из него можно выйти только с помощью события.type idleState interface {
        state
    }
    // activeState активное состояние. Из него можно выйти с помощью события и // внутренней логики исполнения.type activeState interface {
        state
        sendMessage(*Pipe) state    // отправка нового сообщения
    }
    // типы состояний.type (
        idleReady     struct{}
        activeRunning struct{}
        activePausing struct{}
        idlePaused    struct{}
    )
    // переменные состояний.var (
        ready   idleReady     
        running activeRunning 
        paused  idlePaused    
        pausing activePausing 
    )

    Thanks to the individual types, the transitions are also declared separately for each state. This avoids the hugesausagestransition functions with nested operators switch. The states themselves do not contain any data or logic. For them, you can declare variables at the package level, so as not to do this every time. The interface is stateneeded for polymorphism. Pro activeStateand idleStatetalk a little later.


    The second most important part of our machine is events.


    // event тип события.type event int// переменные событий.const (
        run event = iota
        pause
        resume
        push
        measure
        cancel
    )
    // target обозначает конечное состояние для вызванного события.type target struct {
        state idleState  // целевое состояние
        errc  chan error // канал с ошибками, закрывается когда достигнуто состояние 
    }
    // eventMessage передается в автомат, когда пользователь вызывает событие.type eventMessage struct {
        event               // тип события
        params    params    // новые параметры
        components []string// id компонентов
        target              // конечное состояние для этого события
    }

    To understand why you need a type target, consider a simple example. We have created a new conveyor, he is able ready. Now we start it by function p.Run(). An event is sent to the machine run, the pipeline goes into a state running. How to know when the pipeline is finished? This is where the type will help us target. It indicates the state of rest to wait after the event. In our example, after the end of the work, the conveyor will again become state ready. The same on the diagram:



    Now more about the types of states. More precisely, about interfaces idleStateand activeState. Let's look at the functions listen(*Pipe, target) (state, target)for different types of stages:


    // listen ждёт перехода из стадии ready.func(s idleReady)listen(p *Pipe, t target)(state, target) {
        return p.idle(s, t)
    }
    // listen ждёт перехода из стадии running.func(s activeRunning)listen(p *Pipe, t target)(state, target) {
        return p.active(s, t)
    }

    Do pipe.Pipehave different functions to wait for the transition! What is there?


    // idle ждёт переход из стадии покоя. Слушает только канал событий.func(p *Pipe)idle(s idleState, t target)(state, target) {
        if s == t.state || s == ready {
            t = t.dismiss()         // цель достигнута, освобождаем target
        }
        for {
            var newState state
            var err error
            select {
            case e := <-p.events:                   // ждём событие
                newState, err = s.transition(p, e)  // вызываем функцию переходаif err != nil {
                    e.target.handle(err)
                } elseif e.hasTarget() {
                    t.dismiss()
                    t = e.target
                }
            }
            if s != newState {  
                return newState, t  // выходим, если произошёл переход
            }
        }
    }
    // active ждёт перехода из активной стадии. Слушает канал событий и каналы, // используемые при исполнении.func(p *Pipe)active(s activeState, t target)(state, target) {
        for {
            var newState state
            var err error
            select {
            case e := <-p.events:                   // ждём событие
                newState, err = s.transition(p, e)  // вызываем функцию переходаif err != nil {                     // успешный переход?
                    e.target.handle(err)            // нет, передаем ошибку наружу
                } elseif e.hasTarget() {           // да, замещаем target
                    t.dismiss()                     // отменяем текущий
                    t = e.target                    // замещаем новым 
                }
            case <-p.provide:                       // ждем запроса нового сообщения
                newState = s.sendMessage(p)         // отправляем новое сообщениеcase err, ok := <-p.errc:               // ждем ошибокif ok {                             // если ошибка получена, то
                    interrupt(p.cancel)             // прерываем исполнение
                    t.handle(err)                   // передаем ошибку наружу
                }                                   // если ошибок не получено, тоreturn ready, t                     // переходим в состояние ready
            }
            if s != newState {
                return newState, t  // выходим, если произошёл переход
            }
        }
    }

    Thus, we can listen to different channels in different states. For example, this allows you not to send messages during a pause - we just do not listen to the corresponding channel.


    Designer and start machine



    // New создает новый конвейер и применяет опции.// Новый конвейер находится в состоянии ready.funcNew(sampleRate phono.SampleRate, options ...Option) *Pipe {
        p := &Pipe{
            UID:        phono.NewUID(),
            sampleRate: sampleRate,
            log:        log.GetLogger(),
            processors: make([]*processRunner, 0),
            sinks:      make([]*sinkRunner, 0),
            metrics:    make(map[string]measurable),
            params:     make(map[string][]phono.ParamFunc),
            feedback:   make(map[string][]phono.ParamFunc),
            events:     make(chan eventMessage, 1), // канал для событий
            cancel:     make(chanstruct{}),        // канал для отмены выполнения
            provide:    make(chanstruct{}),
            consume:    make(chan message),
        }
        for _, option := range options {            // применяем опции
            option(p)()
        }
        go p.loop()                                 // запускаем главный циклreturn p
    }

    In addition to initialization and functional options , there is a start of a separate gorutina with a main cycle. Well, let's look at it:


    // loop выполняется, пока не перейдет в nil состояние.func(p *Pipe)loop() {
        var s state = ready         // изначальное состояние
        t := target{}
        for s != nil {
            s, t = s.listen(p, t)   // ждём перехода в новое состояние
            p.log.Debug(fmt.Sprintf("%v is %T", p, s))
        }
        t.dismiss()
        close(p.events)             // закрываем канал событий
    }
    // listen ждёт перехода из стадии ready.func(s idleReady)listen(p *Pipe, t target)(state, target) {
        return p.idle(s, t)
    }
    // transition совершает переход в зависимости от полученного события.func(s idleReady)transition(p *Pipe, e eventMessage)(state, error) {
        switch e.event {
        case cancel:
            interrupt(p.cancel)
            returnnil, nilcase push:
            e.params.applyTo(p.ID())
            p.params = p.params.merge(e.params)
            return s, nilcase measure:
            for _, id := range e.components {
                e.params.applyTo(id)
            }
            return s, nilcase run:
            if err := p.start(); err != nil {
                return s, err
            }
            return running, nil
        }
        return s, ErrInvalidState
    }

    The conveyor was created and stopped waiting for events.


    It's time to work


    Call p.Run()!



    // Run посылает событие run в конвейер.// Выполнение этого метода после pipe.Close вызовет панику.func(p *Pipe)Run()chanerror {
        runEvent := eventMessage{
            event: run,
            target: target{
                state: ready,               // целевое состояние покоя
                errc:  make(chan error, 1), 
            },
        }
        p.events <- runEvent
        return runEvent.target.errc
    }
    // listen ждёт перехода из стадии running.func(s activeRunning)listen(p *Pipe, t target)(state, target) {
        return p.active(s, t)
    }
    // transition совершает переход в зависимости от полученного события.func(s activeRunning)transition(p *Pipe, e eventMessage)(state, error) {
        switch e.event {
        case cancel:
            interrupt(p.cancel)
            err := Wait(p.errc)
            returnnil, err
        case measure:
            e.params.applyTo(p.ID())
            p.feedback = p.feedback.merge(e.params)
            return s, nilcase push:
            e.params.applyTo(p.ID())
            p.params = p.params.merge(e.params)
            return s, nilcase pause:
            return pausing, nil
        }
        return s, ErrInvalidState
    }
    // sendMessage генерирует новое сообщение.func(s activeRunning)sendMessage(p *Pipe)state {
        p.consume <- p.newMessage()
        return s
    }

    running generates messages and runs until the pipeline ends.


    Pause


    During the execution of the pipeline, we can pause it. In this state, the pipeline will not generate new messages. To do this, call the method p.Pause().



    // Pause посылает событие pause в конвейер.// Выполнение этого метода после pipe.Close вызовет панику.func(p *Pipe)Pause()chanerror {
        pauseEvent := eventMessage{
            event: pause,
            target: target{
                state: paused,              // целевое состояние покоя
                errc:  make(chan error, 1),
            },
        }
        p.events <- pauseEvent
        return pauseEvent.target.errc
    }
    // listen ждёт перехода из стадии pausing.func(s activePausing)listen(p *Pipe, t target)(state, target) {
        return p.active(s, t)
    }
    // transition совершает переход в зависимости от полученного события.func(s activePausing)transition(p *Pipe, e eventMessage)(state, error) {
        switch e.event {
        case cancel:
            interrupt(p.cancel)
            err := Wait(p.errc)
            returnnil, err
        case measure:
            e.params.applyTo(p.ID())
            p.feedback = p.feedback.merge(e.params)
            return s, nilcase push:
            e.params.applyTo(p.ID())
            p.params = p.params.merge(e.params)
            return s, nil
        }
        return s, ErrInvalidState
    }
    // sendMessage генерирует новое сообщение. Сообщение содержит функцию-параметр,// которая вызывается при получении сообщения адресатом. Эта функция блокирует// исполнение до тех пор, пока все адресаты не получат сообщение. Таким образом,// гарантируется, что пауза наступает, когда://  1. Генерация новых сообщений остановлена//  2. Все компоненты обработали последнее сообщениеfunc(s activePausing)sendMessage(p *Pipe)state {
        m := p.newMessage()
        iflen(m.feedback) == 0 {
            m.feedback = make(map[string][]phono.ParamFunc)
        }
        var wg sync.WaitGroup   // новая группа для ожидания
        wg.Add(len(p.sinks))    // добавляем все Sinkfor _, sink := range p.sinks {
            param := phono.ReceivedBy(&wg, sink.ID())   // функция-параметр
            m.feedback = m.feedback.add(param)          
        }
        p.consume <- m          // отправляем сообщения
        wg.Wait()               // ждем, когда все получат сообщенияreturn paused
    }

    As soon as all addressees receive a message, the pipeline will change to a state paused. If the message is the last, it will go to the state ready.


    Back to work!


    To exit the state paused, you need to call p.Resume().



    // Resume посылает событие resume в конвейер.// Выполнение этого метода после pipe.Close вызовет панику.func(p *Pipe)Resume()chanerror {
        resumeEvent := eventMessage{
            event: resume,
            target: target{
                state: ready,
                errc:  make(chan error, 1),
            },
        }
        p.events <- resumeEvent
        return resumeEvent.target.errc
    }
    // listen ждёт перехода из стадии paused.func(s idlePaused)listen(p *Pipe, t target)(state, target) {
        return p.idle(s, t)
    }
    // transition совершает переход в зависимости от полученного события.func(s idlePaused)transition(p *Pipe, e eventMessage)(state, error) {
        switch e.event {
        case cancel:
            interrupt(p.cancel)
            err := Wait(p.errc)
            returnnil, err
        case push:
            e.params.applyTo(p.ID())
            p.params = p.params.merge(e.params)
            return s, nilcase measure:
            for _, id := range e.components {
                e.params.applyTo(id)
            }
            return s, nilcase resume:
            return running, nil
        }
        return s, ErrInvalidState
    }

    Everything is trivial, the conveyor again goes into a state running.


    Roll up


    The conveyor can be stopped from any state. For this there is p.Close().



    // Close посылает событие cancel в конвейер.// Повторное выполнение этого метода вызовет панику.// Обязательно вызвать в конце, чтобы освободить ресурсы.func(p *Pipe)Close()chanerror {
        resumeEvent := eventMessage{
            event: cancel,
            target: target{
                state: nil,                 // конечное состояние
                errc:  make(chan error, 1),
            },
        }
        p.events <- resumeEvent
        return resumeEvent.target.errc
    }

    Who needs it?


    Not everyone. To understand how to manage the state, you need to understand your task. There are exactly two circumstances in which an event-based asynchronous automaton can be used:


    1. Difficult life cycle - there are three or more states with nonlinear transitions.
    2. Asynchronous execution is used.

    Even though the event automaton solves the problem, it is a rather complex pattern. Therefore, it should be used with great care and only after a full understanding of all the pros and cons.


    Links



    Also popular now: