Translation of the Stream API manual by Benjamin Winterberg

Hi, Habr! I present to your attention the translation of the article " Java 8 Stream Tutorial ".

This tutorial, based on code examples, provides a comprehensive overview of streams in Java 8. When I first met the Stream API, I was puzzled by the name, since it is very consonant with the InputStream and OutputStream from the java.io package; However, threads in Java 8 are completely different.Threads are monads that play an important role in the development of functional programming in Java.
In functional programming, a monad is a structure that represents a calculation in a chain of successive steps. The type and structure of a monad determine the chain of operations; in our case, a sequence of methods with built-in functions of a given type.
This tutorial will teach you how to work with streams and show you how to handle the various methods available in the Stream API. We will analyze the order of operations and trace how the sequence of methods in the chain affects performance. Let's get acquainted with the powerful methods of the Stream API, such as reduce, collect and flatMap. At the end of the manual we will pay attention to parallel work with threads.

If you do not feel free to work with lambda expressions, functional interfaces and reference methods, it will be useful for you to familiarize yourself with my guide to innovations in Java 8 ( translation to Habré), and then return to learning threads.

How threads work


The stream represents a sequence of elements and provides various methods for performing calculations on these elements:

List<String> myList =
    Arrays.asList("a1", "a2", "b1", "c2", "c1");
myList
    .stream()
    .filter(s -> s.startsWith("c"))
    .map(String::toUpperCase)
    .sorted()
    .forEach(System.out::println);
// C1// C2

Thread methods are intermediate (intermediate) and terminal (terminal). Intermediate methods return a thread, which allows you to consistently call many such methods. Terminal methods either do not return a value (void) or return a result of a type other than a stream. In the above example methods filter, map and sorted are intermediate, and the forEach - terminal. For a complete list of available stream methods, refer to the documentation . Such a chain of stream operations is also known as the pipeline operations (operation pipeline).

Most of the methods from the Stream API take as parameters lambda expressions, a functional interface, describing the specific behavior of the method. Most of them should be simultaneously nevmeshivayuscheysya (non-interfering), and not the memory state (stateless). What does this mean?

The method is non-interfering if it does not change the source data underlying the stream. For example, in the example above, no lambda expressions make changes to the list array myList.

The method is a stateless (non-memorized) state if the order of the operation is determined For example, none of the lambda expressions in the example are dependent on variable variables or external space states that could change at run time.

Different types of threads


Streams can be created from various source data, mainly from collections. Lists (Lists) and set (Sets) support new methods stream()and parllelStream()to create a serial and parallel flow. Parallel threads are able to work in multi-thread mode (on multiple threads) and will be discussed at the end of the tutorial. For now, consider sequential streams:

Arrays.asList("a1", "a2", "a3")
    .stream()
    .findFirst()
    .ifPresent(System.out::println);  // a1

Here the method call stream() for the list returns the normal stream object.
However, to work with a stream, it is not necessary to create a collection:

Stream.of("a1", "a2", "a3")
    .findFirst()
    .ifPresent(System.out::println);  // a1

Just use Stream.of()to create a stream of multiple object references.

In addition to the usual object streams, Java 8 has special types of threads for working with primitive types: int, long, double. As you can guess it IntStream, LongStream, DoubleStream.

IntStream streams can replace regular for (;;) loops using IntStream.range():

IntStream.range(1, 4)
    .forEach(System.out::println);
// 1// 2// 3

All of these streams for working with primitive types work in the same way as regular object streams except for the following:

  • Primitive streams use special lambda expressions. For example, IntFunction instead of Function, or IntPredicate instead of Predicate.
  • Primitive streams support additional terminal methods: sum()andaverage()

    Arrays.stream(newint[] {1, 2, 3})
        .map(n -> 2 * n + 1)
        .average()
        .ifPresent(System.out::println);  // 5.0


Sometimes it is useful to turn a stream of objects into a stream of primitives or vice versa. For this purpose, object streams support special methods: mapToInt(), mapToLong(), mapToDouble():

Stream.of("a1", "a2", "a3")
    .map(s -> s.substring(1))
    .mapToInt(Integer::parseInt)
    .max()
    .ifPresent(System.out::println);  // 3

Primitive streams can be converted to object streams by calling mapToObj():

IntStream.range(1, 4)
    .mapToObj(i -> "a" + i)
    .forEach(System.out::println);
// a1// a2// a3

In the following example, a stream of floating point numbers is mapped to a stream of integer numbers and then mapped to a stream of objects:

Stream.of(1.0, 2.0, 3.0)
    .mapToInt(Double::intValue)
    .mapToObj(i -> "a" + i)
    .forEach(System.out::println);
// a1// a2// a3

Execution order


Now that we have learned how to create different streams and how to work with them, we dive deeper and consider how streaming operations look under the hood.

An important characteristic of intermediate methods is their laziness . In this example, there is no terminal method:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        returntrue;
    });

When this code fragment is executed, nothing will be output to the console. And all because intermediate methods are performed only if there is a terminal method. Let's extend the example by adding a terminal method forEach:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        returntrue;
    })
    .forEach(s -> System.out.println("forEach: " + s));

Executing this code snippet results in the following output to the console:

filter:  d2
forEach: d2
filter:  a2
forEach: a2
filter:  b1
forEach: b1
filter:  b3
forEach: b3
filter:  c
forEach: c

The order in which the results are located may surprise. It can be naive to expect that the methods will be executed “horizontally”: one after another for all elements of the stream. However, instead, the element moves “vertically” along the chain. First, the first line “d2” passes through the method and filter then through forEachand only then, after the first element passes through the entire chain of methods, the next element begins to be processed.

Given this behavior, you can reduce the actual number of operations:

Stream.of("d2", "a2", "b1", "b3", "c")
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .anyMatch(s -> {
        System.out.println("anyMatch: " + s);
        return s.startsWith("A");
    });
// map:      d2// anyMatch: D2// map:      a2// anyMatch: A2

The method anyMatch returns true as soon as the predicate is applied to the input element. In this case, this is the second element of the sequence - “A2”. Accordingly, due to the “vertical” execution of the thread chain, it map will only be called twice. Thus, instead of displaying all the elements of the stream, the map minimum possible number of times will be called.

Why sequence matters


The following example is composed of two intermediate methods map and filterand terminal method forEach. Consider how these methods are performed:

Stream.of("d2", "a2", "b1", "b3", "c")
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("A");
    })
    .forEach(s -> System.out.println("forEach: " + s));
// map:     d2// filter:  D2// map:     a2// filter:  A2// forEach: A2// map:     b1// filter:  B1// map:     b3// filter:  B3// map:     c// filter:  C

It is not difficult to guess that both methods map and filter are called 5 times for the execution - once for each element of the original collection, while forEach is called only once - for the last filter element.

You can significantly reduce the number of operations if you change the order of method calls by placing filter in the first place:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));
// filter:  d2// filter:  a2// map:     a2// forEach: A2// filter:  b1// filter:  b3// filter:  c

Now the map is called only once. With a large number of incoming elements we will observe a tangible increase in performance. Keep this in mind when composing complex chains of methods.

Expand the above example by adding an additional sorting operation - method sorted:

Stream.of("d2", "a2", "b1", "b3", "c")
    .sorted((s1, s2) -> {
        System.out.printf("sort: %s; %s\n", s1, s2);
        return s1.compareTo(s2);
    })
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));

Sorting is a special kind of intermediate operations. This so-called transaction stateful (stateful), since for sorting the collection is necessary to consider its condition throughout the operation.

As a result of the execution of this code, we get the following output to the console:

sort:    a2; d2
sort:    b1; a2
sort:    b1; d2
sort:    b1; a2
sort:    b3; b1
sort:    b3; d2
sort:    c; b3
sort:    c; d2
filter:  a2
map:     a2
forEach: A2
filter:  b1
filter:  b3
filter:  c
filter:  d2

First, the entire collection is sorted. In other words, the method sorted runs horizontally. In this case, it sortedis called 8 times for several combinations of the elements of the incoming collection.

Once again, we optimize the execution of this code by changing the order of method calls in the chain:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .sorted((s1, s2) -> {
        System.out.printf("sort: %s; %s\n", s1, s2);
        return s1.compareTo(s2);
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));
// filter:  d2// filter:  a2// filter:  b1// filter:  b3// filter:  c// map:     a2// forEach: A2

In this example, sorted it is not called at all. filter reduces the input collection to one element. In the case of large input data, performance will benefit significantly.

Reuse streams


In Java 8, threads cannot be reused. After calling any terminal method, the thread ends:

Stream<String> stream =
    Stream.of("d2", "a2", "b1", "b3", "c")
        .filter(s -> s.startsWith("a"));
stream.anyMatch(s -> true);    // ok
stream.noneMatch(s -> true);   // exception

A call noneMatch after anyMatch in one thread results in the following exception:

java.lang.IllegalStateException: stream has already been operated upon or closed
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
    at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
    at com.winterbe.java8.Streams5.test7(Streams5.java:38)
    at com.winterbe.java8.Streams5.main(Streams5.java:28)

To overcome this limitation, create a new thread for each terminal method.

For example, you can create a supplier for the constructor of a new flow in which all intermediate methods will be installed:

Supplier<Stream<String>> streamSupplier =
    () -> Stream.of("d2", "a2", "b1", "b3", "c")
            .filter(s -> s.startsWith("a"));
streamSupplier.get().anyMatch(s -> true);   // ok
streamSupplier.get().noneMatch(s -> true);  // ok

Each method call get creates a new thread in which you can safely call the desired terminal method.

Advanced methods


Streams support a large number of different methods. We have already become familiar with the most important methods. To get acquainted with the rest, refer to the documentation . And now we dive even deeper into more complex methods: collect, flatMap and reduce.

Most of the code examples in this section refer to the following code snippet to demonstrate the work:

classPerson{
    String name;
    int age;
    Person(String name, int age) {
        this.name = name;
        this.age = age;
    }
    @Overridepublic String toString(){
        return name;
    }
}
List<Person> persons =
    Arrays.asList(
        new Person("Max", 18),
        new Person("Peter", 23),
        new Person("Pamela", 23),
        new Person("David", 12));

Collect


Collect A very useful terminal method that converts stream elements to a different type of result, for example, List, Set, or Map.

Collect accepts Collector, which contains four different methods: supplier (supplier). battery (accumulator), combiner, finisher (finisher). At first glance, this looks very difficult, but Java 8 supports various built-in collectors through a class Collectorswhere the most used methods are implemented.

Popular case:

List<Person> filtered =
    persons
        .stream()
        .filter(p -> p.name.startsWith("P"))
        .collect(Collectors.toList());
System.out.println(filtered);    // [Peter, Pamela]

As you can see, creating a list of stream items is easy. Need not a list but a lot? Use Collectors.toSet().

In the following example, people are grouped by age:

Map<Integer, List<Person>> personsByAge = persons
    .stream()
    .collect(Collectors.groupingBy(p -> p.age));
personsByAge
    .forEach((age, p) -> System.out.format("age %s: %s\n", age, p));
// age 18: [Max]// age 23: [Peter, Pamela]// age 12: [David]

The collectors are incredibly diverse. You can also aggregate the elements of a collection, for example, determine the average age:

Double averageAge = persons
    .stream()
    .collect(Collectors.averagingInt(p -> p.age));
System.out.println(averageAge);     // 19.0

For more comprehensive statistics, we use the summarizing collector, which returns a special object with information: minimum, maximum and average values, the sum of values ​​and the number of elements:

IntSummaryStatistics ageSummary =
    persons
        .stream()
        .collect(Collectors.summarizingInt(p -> p.age));
System.out.println(ageSummary);
// IntSummaryStatistics{count=4, sum=76, min=12, average=19.000000, max=23}

The following example merges all names into a single line:

String phrase = persons
    .stream()
    .filter(p -> p.age >= 18)
    .map(p -> p.name)
    .collect(Collectors.joining(" and ", "In Germany ", " are of legal age."));
System.out.println(phrase);
// In Germany Max and Peter and Pamela are of legal age.

The connecting collector accepts a separator as well as an optional prefix and suffix.

To convert stream elements to a display, you must determine how keys and values ​​should be displayed. Remember that the keys in the display must be unique. Otherwise, we get IllegalStateException. You can optionally add a merge function to bypass the exception:

Map<Integer, String> map = persons
    .stream()
    .collect(Collectors.toMap(
        p -> p.age,
        p -> p.name,
        (name1, name2) -> name1 + ";" + name2));
System.out.println(map);
// {18=Max, 23=Peter;Pamela, 12=David}

So, we’ve seen some of the most powerful inline collectors. Let's try to build your own. We want to convert all the elements of the stream into a single string, which consists of uppercase names, separated by a vertical bar |. To do this, create a new collector using Collector.of(). We need four components of our collector: supplier, battery, connector, finisher.

Collector<Person, StringJoiner, String> personNameCollector =
    Collector.of(
        () -> new StringJoiner(" | "),          // supplier
        (j, p) -> j.add(p.name.toUpperCase()),  // accumulator
        (j1, j2) -> j1.merge(j2),               // combiner
        StringJoiner::toString);                // finisher
String names = persons
    .stream()
    .collect(personNameCollector);
System.out.println(names);  // MAX | PETER | PAMELA | DAVID

Since strings in Java are immutable, we need a type helper class StringJoinerthat allows the collector to build a string for us. In the first stage, the supplier constructs StringJoinerwith the assigned delimiter. Battery is used to add each name to StringJoiner.

The connector knows how to connect two StringJoinera to one. And at the end the finisher constructs the desired string from the StringJoiners.

Flatmap


So, we learned how to turn stream objects into other types of objects using a method map. Map - a kind of limited method, since each object can be mapped to just one other object. But what if you want to map one object to many others, or not to display it at all? Here the method helps out flatMap. FlatMap turns each stream object into a stream of other objects. The contents of these streams are then packaged into the returned method stream flatMap.

In order to look at flatMap in action, we construct a suitable type hierarchy for an example:

classFoo{
    String name;
    List<Bar> bars = new ArrayList<>();
    Foo(String name) {
        this.name = name;
    }
}
classBar{
    String name;
    Bar(String name) {
        this.name = name;
    }
}

Create several objects:

List<Foo> foos = new ArrayList<>();
// create foos
IntStream
    .range(1, 4)
    .forEach(i -> foos.add(new Foo("Foo" + i)));
// create bars
foos.forEach(f ->
    IntStream
        .range(1, 4)
        .forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name))));

Now we have a list of three foo , each of which contains three bar .

FlatMap accepts a function that should return a stream of objects. Thus, to access the bar objects of each foo , we just need to select the appropriate function:

foos.stream()
    .flatMap(f -> f.bars.stream())
    .forEach(b -> System.out.println(b.name));
// Bar1 <- Foo1// Bar2 <- Foo1// Bar3 <- Foo1// Bar1 <- Foo2// Bar2 <- Foo2// Bar3 <- Foo2// Bar1 <- Foo3// Bar2 <- Foo3// Bar3 <- Foo3

So, we have successfully turned the stream from three foo objects into a stream from 9 bar objects .

Finally, all of the above code can be reduced to a simple pipeline of operations:

IntStream.range(1, 4)
    .mapToObj(i -> new Foo("Foo" + i))
    .peek(f -> IntStream.range(1, 4)
        .mapToObj(i -> new Bar("Bar" + i + " <- " f.name))
        .forEach(f.bars::add))
    .flatMap(f -> f.bars.stream())
    .forEach(b -> System.out.println(b.name));
 

FlatMap It is also available in the class Optionalintroduced in Java 8. FlatMap the class Optional returns an optional object of another class. This can be used to avoid cluttering up checks null.

Imagine a hierarchical structure like this:

classOuter{
    Nested nested;
}
classNested{
    Inner inner;
}
classInner{
    String foo;
}

To get the nested string foo from an external object, you need to add multiple checks null to avoid NullPointException:

Outer outer = new Outer();
if (outer != null && outer.nested != null && outer.nested.inner != null) {
    System.out.println(outer.nested.inner.foo);
}

The same can be achieved using the FlatMap class of the Optional class:

Optional.of(new Outer())
    .flatMap(o -> Optional.ofNullable(o.nested))
    .flatMap(n -> Optional.ofNullable(n.inner))
    .flatMap(i -> Optional.ofNullable(i.foo))
    .ifPresent(System.out::println);

Each call flatMap returns a wrapper Optional for the desired object, if present, or for nullif there is no object.

Reduce


The simplify operation combines all the elements of a stream into one result. Java 8 supports three different types of the reduce method.

The first reduces the flow of elements to a single stream element. We use this method to determine the element with the highest age:

persons
    .stream()
    .reduce((p1, p2) -> p1.age > p2.age ? p1 : p2)
    .ifPresent(System.out::println);    // Pamela

The method reduce takes an accumulating function with a binary operator (BinaryOperator). Here reduce is a bi-function (BiFunction), where both arguments belong to the same type. In our case, to type Person . The bi-function is practically the same as функция (Function), but it takes 2 arguments. In our example, the function compares the age of two people and returns the item with a greater age.

The following type of method reduce takes both the initial value and the accumulator with the binary operator. This method can be used to create a new item. We have a Person with a name and age, consisting of the addition of all the names and the sum of past years:

Person result =
    persons
        .stream()
        .reduce(new Person("", 0), (p1, p2) -> {
            p1.age += p2.age;
            p1.name += p2.name;
            return p1;
        });
System.out.format("name=%s; age=%s", result.name, result.age);
// name=MaxPeterPamelaDavid; age=76

The third method reduce takes three parameters: the initial value, the accumulator with a bi-function, and the unifying function of the type of a binary operator. Since the initial value of the type is not limited to the type of Person, a reduction can be used to determine the sum of the living years of each person:

Integer ageSum = persons
    .stream()
    .reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2);
System.out.println(ageSum);  // 76

As you can see, we got the result of 76, but what really happens under the hood?

Expand the above code snippet with the debug text output:

Integer ageSum = persons
    .stream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
            return sum1 + sum2;
        });
// accumulator: sum=0; person=Max// accumulator: sum=18; person=Peter// accumulator: sum=41; person=Pamela// accumulator: sum=64; person=David

As you can see, all the work is performed by the accumulating function. It is first called with the initial value of 0 and the first person is Max. In the next three steps, the sum constantly increases by the person’s age from the last step until it reaches the total age of 76.

And then what? Is the combiner never called? Consider parallel execution of this thread:

Integer ageSum = persons
    .parallelStream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
            return sum1 + sum2;
        });
// accumulator: sum=0; person=Pamela// accumulator: sum=0; person=David// accumulator: sum=0; person=Max// accumulator: sum=0; person=Peter// combiner: sum1=18; sum2=23// combiner: sum1=23; sum2=12// combiner: sum1=41; sum2=35

With parallel execution, we get a completely different console output. Now the unifier is actually called. Since the battery was called in parallel, the combiner had to sum the values ​​stored separately.

In the next chapter we will examine in more detail the parallel execution of threads.

Parallel streams


Streams can run in parallel to improve performance when working with large numbers of incoming items. Parallel threads use the usual ForkJoinPool accessible via static method call ForkJoinPool.commonPool(). The size of the main thread pool can reach 5 threads of execution - the exact number depends on the number of available physical processor cores.

ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism());    // 3

On my computer, the default thread pool is initialized with paralleling to 3 threads by default. This value can be increased or decreased by setting the following JVM parameter:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=5

Collections support a method parallelStream()for creating parallel data streams. You can also call an intermediate method parallel()to turn a serial stream into a parallel stream.

To understand the behavior of a thread in parallel execution, the following example prints information about each current thread (thread) into System.out:

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        returntrue;
    })
    .map(s -> {
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));

Consider conclusions with debug entries to better understand which thread is used to perform specific stream methods:

filter:  b1 [main]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  c2 [ForkJoinPool.commonPool-worker-3]
map:     c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: A2 [ForkJoinPool.commonPool-worker-1]
map:     b1 [main]
forEach: B1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-3]
map:     a1 [ForkJoinPool.commonPool-worker-3]
forEach: A1 [ForkJoinPool.commonPool-worker-3]
forEach: C1 [ForkJoinPool.commonPool-worker-2]

As you can see, in parallel execution of a data stream, all available threads of the current are used ForkJoinPool. The output sequence may differ, since the execution sequence of each specific thread is not defined (thread).

Let's extend the example by adding a method sort:

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        returntrue;
    })
    .map(s -> {
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .sorted((s1, s2) -> {
        System.out.format("sort: %s <> %s [%s]\n",
            s1, s2, Thread.currentThread().getName());
        return s1.compareTo(s2);
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));

At first glance, the result may seem strange:

filter:  c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  b1 [main]
map:     b1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-2]
map:     a1 [ForkJoinPool.commonPool-worker-2]
map:     c2 [ForkJoinPool.commonPool-worker-3]
sort:    A2 <> A1 [main]
sort:    B1 <> A2 [main]
sort:    C2 <> B1 [main]
sort:    C1 <> C2 [main]
sort:    C1 <> B1 [main]
sort:    C1 <> C2 [main]
forEach: A1 [ForkJoinPool.commonPool-worker-1]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: B1 [main]
forEach: A2 [ForkJoinPool.commonPool-worker-2]
forEach: C1 [ForkJoinPool.commonPool-worker-1]

It seems to be sort executed sequentially and only in the main stream . In fact, when running a stream in parallel under the hood of a method sort from the Stream API Arrays, a class sorting method added in Java 8 is hidden Arrays.parallelSort(). As stated in the documentation, this method, based on the length of the incoming collection, determines how exactly - the sorting will be performed in parallel or sequentially:
If the length of a particular array is less than the minimum “grain”, sorting is performed by executing the Arrays.sort method.
Let's return to the example with the method reduce from the previous chapter. We have already found out that the unifying function is called only when working in parallel with the thread. Consider which threads are involved:

List<Person> persons = Arrays.asList(
    new Person("Max", 18),
    new Person("Peter", 23),
    new Person("Pamela", 23),
    new Person("David", 12));
persons
    .parallelStream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s [%s]\n",
                sum, p, Thread.currentThread().getName());
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s [%s]\n",
                sum1, sum2, Thread.currentThread().getName());
            return sum1 + sum2;
        });

The console output shows that both functions, accumulating and unifying, are executed in parallel, using all possible streams:

accumulator: sum=0; person=Pamela; [main]
accumulator: sum=0; person=Max;    [ForkJoinPool.commonPool-worker-3]
accumulator: sum=0; person=David;  [ForkJoinPool.commonPool-worker-2]
accumulator: sum=0; person=Peter;  [ForkJoinPool.commonPool-worker-1]
combiner:    sum1=18; sum2=23;     [ForkJoinPool.commonPool-worker-1]
combiner:    sum1=23; sum2=12;     [ForkJoinPool.commonPool-worker-2]
combiner:    sum1=41; sum2=35;     [ForkJoinPool.commonPool-worker-2]

It can be argued that parallel execution of the stream contributes to a significant increase in efficiency when working with large numbers of incoming elements. However, it should be remembered that some methods for parallel execution require additional calculations (unifying operations) that are not required for sequential execution.

In addition, for the parallel execution of the stream is used all the same ForkJoinPool, so widely used in the JVM. So the use of slow blocking methods of the flow can negatively affect the performance of the entire program, due to the blocking of threads (threads) used for processing in other tasks.

That's all


My tutorial on using threads in Java 8 is over. For a more detailed study of working with streams, you can refer to the documentation . If you want to go deeper and learn more about the mechanisms underlying the work of the streams, you may be interested to read the article by Martin Fowler Collection Pipelines .

If you are also interested in JavaScript, you may want to take a look at Stream.js - a JavaScript implementation of the Java 8 Streams API. You might also want to read my Java 8 Tutorial ( Russian translation in Habré) and Java 8 Nashorn Tutorial articles .

I hope this tutorial was useful and interesting for you, and you enjoyed the process of reading. The full code is stored in GitHub . Feel free to create a branch in the repository.

Also popular now: