Asynchronous loops and Stream API in Node.js 10


    This month, the tenth version of Node.js is released, in which we are waiting for a change in the behavior of the readable-stream caused by the appearance of asynchronous for-await-of loops . Let's see what it is and what we need to prepare for.


    For-await-of construct


    First, let's look at how asynchronous loops work with a simple example. For clarity, add completed promises.


    const promises = [
        Promise.resolve(1),
        Promise.resolve(2),
        Promise.resolve(3),
    ];

    The usual loop will go through the promises array and return the values ​​themselves:


    for (const value of promises) {
        console.log(value);
    }
    // > Promise({resolved: 1})
    // > Promise({resolved: 2})
    // > Promise({resolved: 3})

    The asynchronous loop will wait for the promise to resolve and return the value returned by the promise:


    for await (const value of promises) {
        console.log(value);
    }
    // > 1
    // > 2
    // > 3

    To make asynchronous loops work in earlier versions of Node.js, use the flag --harmony_async_iteration.

    ReadableStream and for-await-of


    The ReadableStream object has a property Symbol.asyncIterator, which also allows it to be passed to the for-await-of loop. Take for example fs.createReadableStream:


    const readStream = fs.createReadStream(file);
    const chunks = [];
    for await (const chunk of readStream) {
        chunks.push(chunk);
    }
    console.log(Buffer.concat(chunks));

    As you can see from the example, now we got rid of the calls on('data', ...and on('end', ..., and the code itself began to look more visual and predictable.


    Asynchronous Generators


    In some cases, additional processing of the received data may be necessary; asynchronous generators are used for this. We can implement a search on a regular expression file:


    async function * search(needle, chunks) {
        let pos = 0;
        for await (const chunk of chunks) {
            let string = chunk.toString();
            while (string.length) {
                const match = string.match(needle);
                if (! match) {
                    pos += string.length;
                    break;
                }
                yield {
                  index: pos + match.index,
                  value: match[0],
                };
                string = string.slice(match.index + match[0].length);
                pos += match.index;
            }
        }
    }

    Let's see what happened:


    const stream = fs.createReadStream(file);
    for await (const {index, value} of search(/(a|b)c/, stream)) {
        console.log('found "%s" at %s', value, index);
    }

    Agree, it’s quite convenient, on the fly we turned strings into objects and we did not need to use TransformStream and think about how to catch errors that can occur in two different streams, etc.


    Unix-like thread example


    The task of reading a file is quite common, but not exhaustive. Let's look at cases where stream processing of output like unix pipelines is required. To do this, we use asynchronous generators, through which we skip the result of the command ls.


    First we will create a child process const subproc = spawn('ls')and then we will read the standard output:


    for await (const chunk of subproc.stdout) {
        // ...
    }

    And since stdout generates output in the form of Buffer objects, the first thing to do is add a generator that will output the output from the Buffer type to String:


    async function *toString(chunks) {
        for await (const chunk of chunks) {
            yield chunk.toString();
        }
    }

    Next, we make a simple generator that will break the output line by line. It is important to take into account that the data portion transferred from createReadStream has a limited maximum length, which means that we can receive either a whole row or a piece of a very long string, or several lines at the same time:


    async function *chunksToLines(chunks) {
        let previous = '';
        for await (const chunk of chunks) {
            previous += chunk;
            while (true) {
                const i = previous.indexOf('\n');
                if (i < 0) {
                    break;
                }
                yield previous.slice(0, i + 1);
                previous = previous.slice(i + 1);
            }
        }
        if (previous.length > 0) {
            yield previous;
        }
    }

    Since each found value still contains a line break, we will create a generator to clear the value from hanging whitespace:


    async function *trim(values) {
        for await (const value of values) {
            yield value.trim();
        }
    }

    The last action will be line-by-line output to the console:


    async function print(values) {
        for await (const value of values) {
            console.log(value);
        }
    }

    Combine the resulting code:


    async function main() {
        const subproc = spawn('ls');
        await print(trim(chunksToLines(toString(subproc.stdout))));
        console.log('DONE');
    }

    As you can see, the code turned out to be somewhat difficult to read. If we want to add a few more calls or parameters, then we get porridge as a result. To avoid and make the code more linear, let's add a function pipe:


    function pipe(value, ...fns) {
        let result = value;
        for (const fn of fns) {
            result = fn(result);
        }
        return result;
    }

    Now the call can be reduced to the following form:


    async function main() {
        const subproc = spawn('ls');
        await pipe(
            subproc.stdout,
            toString,
            chunksToLines,
            trim,
            print,
        );
        console.log('DONE');
    }

    Operator |>


    It should be borne in mind that soon a new pipeline operator should enter the JS standard |>to do the same thing that it is doing now pipe:


    async function main() {
        const subproc = spawn('ls');
        await subproc.stdout
        |> toString
        |> chunksToLines
        |> trim
        |> print;
        console.log('DONE');
    }

    Conclusion


    As you can see, asynchronous loops and iterators have made the language even more expressive, capacious, and understandable. Hell from callbacks goes farther into the past and will soon become a horror story that we will scare grandchildren with. And generators seem to take their place in the JS hierarchy and will be used as intended.


    The basis for this article was material Axel Raushmayera Solution: Using the async iteration of natively in Node.js .

    Continuing the topic



    Also popular now: