Bike: Promises at Node.js

    Good afternoon, Habrahabr.

    Foreword


    It was a fairly simple task: to get a set of documents from the database, convert each document and send all converted documents to the user, the order cannot be changed, an asynchronous function is used to process the document. If an error has appeared on some document - we are not sending documents, only an error and we finish processing the documents.
    To solve the problem, the Q library was chosen , since the Promise campaign is pretty for me. But there was one snag, it seems to be an elementary task, but it takes more than a second, or rather 1300 ms, instead of the expected 50-80 ms. In order to understand how everything is arranged and to be imbued with asynchrony, it was decided to write a specialized “bicycle” for this task.



    How was work with Q arranged?



    First of all, I would like to tell you how it was implemented initially.

    1. The procedure, which was sequentially through the array and returned us a promise.
    forEachSerial
    function forEachSerial(array, func) {
        var tickFunction = function (el, index, callback) {
            if (func.length == 2)
                func(el, callback);
            else if (func.length == 3)
                func(el, index, callback);
        }
        var functions = [];
        array.forEach(function (el, index) {
            functions.push(Q.nfcall(tickFunction, el, index));        
        });
        return Q.all(functions);
    }
    



    2. The procedure that processed the document and returned us a promise.
    documentToJSON
    function documentToJSON(el, options) {
        var obj = el.toObject(options);
        var deferred = Q.defer();
        var columns = ['...','...','...'];
        forEachSerial(columns,function (el, next) {
            el.somyAsyncFunction(options, function (err, result) {
                if (err)
                    next(err);
                else result.somyAsyncFunction2(options, function (err, result) {
                    if (!err)
                        obj[el] = result;
                    next(err);
                });
            });
        }).then(function () {
                deferred.resolve(obj);
            }, function (err) {
                deferred.reject(err);
            });
        return deferred.promise;
    }
    



    3. And the head procedure sending the result to the user
    sendResponse
    exports.get = function (req, res, next) {
        dbModel.find(search, {}, { skip: from, limit: limit }).sort({column1: -1}).exec(function (err, result) {
            if (err)
                next(new errorsHandlers.internalError());
            else {
                var result = [];
                forEachSerial(result,function (el, index, next) {
                    documentToJSON(el,options).then(function (obj) {
                        result[index] = obj;
                        next()
                    }, function (err) {
                        next(new errorsHandlers.internalError())
                    });
                }).then(function () {
                        res.send({responce: result});
                    }, function (err) {
                        next(new errorsHandlers.internalError())
                    });
            }
        });
    };
    



    Someone may notice that there are a lot of all kinds of “promises” even where they are not really needed, and that such a big loop led to such a speed, but the procedure gives only 20 simple documents, the transformations are primitive and it’s for sure to carry out such a quantity of time no good.

    Writing your promise library


    How does it work?

    The network is full of descriptions of what and how. I will describe briefly. Promise is a kind of promise. Some function promises us the result, we can get it using then (success, error) , in turn, if the processing is successful, we can assign a new promise and process it as well. In the particular case, it looks like this:
    Promise.then(step1).then(step2).then(step3).then(function () {
        //All OK
    }, function (err) {
        //Error in any step
    });
    

    The result of each stage is passed as a parameter to the next and so on sequentially. As a result, we process all errors in one block and get rid of "noodles".
    Inside, it looks something like this: events are generated that are raised on success or on error:
    var promise = fs.stat("foo");
    promise.addListener("success", function (value) {
        // ok
    })
    promise.addListener("error", function (error) {
        // error
    });
    

    All this can be read here .
    The theory is over, let's get down to practice.

    Let's start with the simple - Deferred

    The task of this object will be to create the events we need and issue Promise
    function deferred() {
        this.events = new EventEmitter(); //Объект события
        this.promise = new promise(this); // Возвращаемый нам Promise
        this.thenDeferred = []; //Последующие обработчики, нужны для того что бы передать ошибку дальше по цепочке
        var self = this;
        //Вызывается в успешном случае
        this.resolve = function () {
            self.events.emit('completed', arguments);
        }
        //Вызывается в случае ошибки
        this.reject = function (error) {
            self.events.emit('error', error);
            //Передаем ошибку дальше по цепочке
            self.thenDeferred.forEach(function (el) {
                el.reject(error);
            });
        }
    }
    

    Object - Promise

    Its task will be to monitor the events " completed " and " error " to call the necessary functions that are assigned through " then " and to track what this function returned there: if it returned another promise to us, then connect to it so that the next then would work , if just data, then execute the subsequent then , so we can build chains from then .
    function promise(def) {
        this.def = def;
        this.completed = false;
        this.events = def.events;
        var self = this;
        var thenDeferred;
        self._successListener = null;
        self._errorListener = null;
        //Результатом выполнения then - будет возвращаться новый promise
        this.then = function (success, error) {
            if (success)
                self._successListener = success;
            if (error)
                self._errorListener = error;
            thenDeferred = new deferred();
            self.def.thenDeferred.push(thenDeferred);
            return thenDeferred.promise;
        }
        //Обрабатываем успешное выполнение задачи
        this.events.on('completed', function (result) {
            // объекты, аргументы, массивы приводим к виду массива для передачи их в дальнейшем как атрибуты в функцию
            var args = inputOfFunctionToArray(result);
            //Если вдруг задача была уже выполнена, то дальше не проходим
            if (self.completed) return;
            self.completed = true;
            if (self._successListener) {
                var result;
                try {
                    result = self._successListener.apply(self, args);
                } catch (e) {
                    self.def.reject(e);
                    result;
                }
                //Если результатом функции Promise и есть последующие then, подключаемся к нему
                var promise;
                if (isPromise(result))
                    promise = result;
                else if (result instanceof deferred)
                    promise = result.promise;
                if (promise && thenDeferred) {
                    promise.then(function () {
                        var args = arguments;
                        process.nextTick(function () {
                            thenDeferred.resolve.apply(self, args);
                        });
                    }, function (error) {
                        process.nextTick(function () {
                            thenDeferred.reject(error);
                        });
                    });
                } else if (thenDeferred)
                               process.nextTick(function () {
                                       //Для скалярных параметров просто запускаем следующие then
                                      thenDeferred.resolve.apply(self, [result]);
                              });
            } else if (thenDeferred)
                process.nextTick(function () {
                    thenDeferred.resolve.apply(self, []);
                });
        });
        //Обрабатываем ошибки
        this.events.on('error', function (error) {
            if (self.completed) return;
            self.completed = true;
            if (self._errorListener)
                process.nextTick(function () {
                    self._errorListener.apply(self, [error]);
                });
        });
    }
    


    So, the basic model is ready. It remains to make a binding for functions with callback

    PromiseFn

    Its task is to make a wrapper for a function with callback with the ability to specify this and start arguments
    var promisefn = function (bind, fn) {
        var def = new deferred();
        //bind является не обязательным параметром
        if (typeof bind === 'function' && !fn) {
            fn = bind;
            bind = def;
        }
        //Назначаем наш callback для данной функции
        var callback = function (err) {
            if (err)
                def.reject(err);
            else {
                var args = [];
                for (var key in arguments)
                    args.push(arguments[key]);
                args.splice(0, 1);
                def.resolve.apply(bind, args);
            }
        };
        var result = function () {
            var args = [];
            for (var key in arguments)
                args.push(arguments[key]);
            args.push(callback);
            process.nextTick(function () {
                fn.apply(bind, args);
            });
            return def.promise;
        }
        return result;
    }
    


    And finally ALL - sequential execution of functions with callback

    Everything is simple here: an array of functions is passed to us, we bind them through promisefn, and when they are all fulfilled, we call resolve
    var all = function (functions) {
        var def = new deferred();
        process.nextTick(function () {
            var index = -1;
            var result = [];
            var next = function (err, arguments) {
                if (err) {
                    def.reject(err);
                    return;
                }
                if (arguments) result.push(inputOfFunctionToArray(arguments));
                index++;
                if (index >= functions.length) {
                    def.resolve(result);
                } else process.nextTick(function () {
                    promisefn(functions[index])().then(function () {
                        var args = arguments;
                        process.nextTick(function () {
                            next(err, args);
                        });
                    }, function (err) {
                        process.nextTick(function () {
                            next(err);
                        });
                    });
                });
            }
            process.nextTick(next);
        });
        return def.promise;
    }
    


    Finally


    After testing, the old approach (through the Q library) was rewritten, replaced by a couple of ads and launched under the same conditions. The result is positive - 50-100 ms (instead of the previous 1300 ms).
    All sources are available on Github , there you can also find examples. The invention of “bicycles” is at least useful in that it improves understanding.
    Thanks for attention!

    Also popular now: