Translation of the Stream API manual by Benjamin Winterberg
- From the sandbox
- Tutorial
Hi, Habr! I present to your attention the translation of the article " Java 8 Stream Tutorial ".
This tutorial, based on code examples, provides a comprehensive overview of streams in Java 8. When I first met the Stream API, I was puzzled by the name, since it is very consonant with the InputStream and OutputStream from the java.io package; However, threads in Java 8 are completely different.Threads are monads that play an important role in the development of functional programming in Java.
If you do not feel free to work with lambda expressions, functional interfaces and reference methods, it will be useful for you to familiarize yourself with my guide to innovations in Java 8 ( translation to Habré), and then return to learning threads.
The stream represents a sequence of elements and provides various methods for performing calculations on these elements:
Thread methods are intermediate (intermediate) and terminal (terminal). Intermediate methods return a thread, which allows you to consistently call many such methods. Terminal methods either do not return a value (void) or return a result of a type other than a stream. In the above example methods
Most of the methods from the Stream API take as parameters lambda expressions, a functional interface, describing the specific behavior of the method. Most of them should be simultaneously nevmeshivayuscheysya (non-interfering), and not the memory state (stateless). What does this mean?
The method is non-interfering if it does not change the source data underlying the stream. For example, in the example above, no lambda expressions make changes to the list array myList.
The method is a stateless (non-memorized) state if the order of the operation is determined For example, none of the lambda expressions in the example are dependent on variable variables or external space states that could change at run time.
Streams can be created from various source data, mainly from collections. Lists (Lists) and set (Sets) support new methods
Here the method call
However, to work with a stream, it is not necessary to create a collection:
Just use
In addition to the usual object streams, Java 8 has special types of threads for working with primitive types: int, long, double. As you can guess it
IntStream streams can replace regular for (;;) loops using
All of these streams for working with primitive types work in the same way as regular object streams except for the following:
Sometimes it is useful to turn a stream of objects into a stream of primitives or vice versa. For this purpose, object streams support special methods:
Primitive streams can be converted to object streams by calling
In the following example, a stream of floating point numbers is mapped to a stream of integer numbers and then mapped to a stream of objects:
Now that we have learned how to create different streams and how to work with them, we dive deeper and consider how streaming operations look under the hood.
An important characteristic of intermediate methods is their laziness . In this example, there is no terminal method:
When this code fragment is executed, nothing will be output to the console. And all because intermediate methods are performed only if there is a terminal method. Let's extend the example by adding a terminal method
Executing this code snippet results in the following output to the console:
The order in which the results are located may surprise. It can be naive to expect that the methods will be executed “horizontally”: one after another for all elements of the stream. However, instead, the element moves “vertically” along the chain. First, the first line “d2” passes through the method and
Given this behavior, you can reduce the actual number of operations:
The method
The following example is composed of two intermediate methods
It is not difficult to guess that both methods
You can significantly reduce the number of operations if you change the order of method calls by placing
Now the map is called only once. With a large number of incoming elements we will observe a tangible increase in performance. Keep this in mind when composing complex chains of methods.
Expand the above example by adding an additional sorting operation - method
Sorting is a special kind of intermediate operations. This so-called transaction stateful (stateful), since for sorting the collection is necessary to consider its condition throughout the operation.
As a result of the execution of this code, we get the following output to the console:
First, the entire collection is sorted. In other words, the method
Once again, we optimize the execution of this code by changing the order of method calls in the chain:
In this example,
In Java 8, threads cannot be reused. After calling any terminal method, the thread ends:
A call
To overcome this limitation, create a new thread for each terminal method.
For example, you can create a supplier for the constructor of a new flow in which all intermediate methods will be installed:
Each method call
Streams support a large number of different methods. We have already become familiar with the most important methods. To get acquainted with the rest, refer to the documentation . And now we dive even deeper into more complex methods:
Most of the code examples in this section refer to the following code snippet to demonstrate the work:
Popular case:
As you can see, creating a list of stream items is easy. Need not a list but a lot? Use
In the following example, people are grouped by age:
The collectors are incredibly diverse. You can also aggregate the elements of a collection, for example, determine the average age:
For more comprehensive statistics, we use the summarizing collector, which returns a special object with information: minimum, maximum and average values, the sum of values and the number of elements:
The following example merges all names into a single line:
The connecting collector accepts a separator as well as an optional prefix and suffix.
To convert stream elements to a display, you must determine how keys and values should be displayed. Remember that the keys in the display must be unique. Otherwise, we get
So, we’ve seen some of the most powerful inline collectors. Let's try to build your own. We want to convert all the elements of the stream into a single string, which consists of uppercase names, separated by a vertical bar |. To do this, create a new collector using
Since strings in Java are immutable, we need a type helper class
The connector knows how to connect two
So, we learned how to turn stream objects into other types of objects using a method
In order to look at
Create several objects:
Now we have a list of three foo , each of which contains three bar .
So, we have successfully turned the stream from three foo objects into a stream from 9 bar objects .
Finally, all of the above code can be reduced to a simple pipeline of operations:
Imagine a hierarchical structure like this:
To get the nested string foo from an external object, you need to add multiple checks
The same can be achieved using the FlatMap class of the Optional class:
Each call
The simplify operation combines all the elements of a stream into one result. Java 8 supports three different types of the reduce method.
The first reduces the flow of elements to a single stream element. We use this method to determine the element with the highest age:
The method
The following type of method
The third method
As you can see, we got the result of 76, but what really happens under the hood?
Expand the above code snippet with the debug text output:
As you can see, all the work is performed by the accumulating function. It is first called with the initial value of 0 and the first person is Max. In the next three steps, the sum constantly increases by the person’s age from the last step until it reaches the total age of 76.
And then what? Is the combiner never called? Consider parallel execution of this thread:
With parallel execution, we get a completely different console output. Now the unifier is actually called. Since the battery was called in parallel, the combiner had to sum the values stored separately.
In the next chapter we will examine in more detail the parallel execution of threads.
Streams can run in parallel to improve performance when working with large numbers of incoming items. Parallel threads use the usual
On my computer, the default thread pool is initialized with paralleling to 3 threads by default. This value can be increased or decreased by setting the following JVM parameter:
Collections support a method
To understand the behavior of a thread in parallel execution, the following example prints information about each current thread (thread) into
Consider conclusions with debug entries to better understand which thread is used to perform specific stream methods:
As you can see, in parallel execution of a data stream, all available threads of the current are used
Let's extend the example by adding a method
At first glance, the result may seem strange:
It seems to be
The console output shows that both functions, accumulating and unifying, are executed in parallel, using all possible streams:
It can be argued that parallel execution of the stream contributes to a significant increase in efficiency when working with large numbers of incoming elements. However, it should be remembered that some methods for parallel execution require additional calculations (unifying operations) that are not required for sequential execution.
In addition, for the parallel execution of the stream is used all the same
My tutorial on using threads in Java 8 is over. For a more detailed study of working with streams, you can refer to the documentation . If you want to go deeper and learn more about the mechanisms underlying the work of the streams, you may be interested to read the article by Martin Fowler Collection Pipelines .
If you are also interested in JavaScript, you may want to take a look at Stream.js - a JavaScript implementation of the Java 8 Streams API. You might also want to read my Java 8 Tutorial ( Russian translation in Habré) and Java 8 Nashorn Tutorial articles .
I hope this tutorial was useful and interesting for you, and you enjoyed the process of reading. The full code is stored in GitHub . Feel free to create a branch in the repository.
This tutorial, based on code examples, provides a comprehensive overview of streams in Java 8. When I first met the Stream API, I was puzzled by the name, since it is very consonant with the InputStream and OutputStream from the java.io package; However, threads in Java 8 are completely different.Threads are monads that play an important role in the development of functional programming in Java.
In functional programming, a monad is a structure that represents a calculation in a chain of successive steps. The type and structure of a monad determine the chain of operations; in our case, a sequence of methods with built-in functions of a given type.This tutorial will teach you how to work with streams and show you how to handle the various methods available in the Stream API. We will analyze the order of operations and trace how the sequence of methods in the chain affects performance. Let's get acquainted with the powerful methods of the Stream API, such as
reduce
, collect
and flatMap
. At the end of the manual we will pay attention to parallel work with threads. If you do not feel free to work with lambda expressions, functional interfaces and reference methods, it will be useful for you to familiarize yourself with my guide to innovations in Java 8 ( translation to Habré), and then return to learning threads.
How threads work
The stream represents a sequence of elements and provides various methods for performing calculations on these elements:
List<String> myList =
Arrays.asList("a1", "a2", "b1", "c2", "c1");
myList
.stream()
.filter(s -> s.startsWith("c"))
.map(String::toUpperCase)
.sorted()
.forEach(System.out::println);
// C1// C2
Thread methods are intermediate (intermediate) and terminal (terminal). Intermediate methods return a thread, which allows you to consistently call many such methods. Terminal methods either do not return a value (void) or return a result of a type other than a stream. In the above example methods
filter
, map
and sorted
are intermediate, and the forEach
- terminal. For a complete list of available stream methods, refer to the documentation . Such a chain of stream operations is also known as the pipeline operations (operation pipeline).Most of the methods from the Stream API take as parameters lambda expressions, a functional interface, describing the specific behavior of the method. Most of them should be simultaneously nevmeshivayuscheysya (non-interfering), and not the memory state (stateless). What does this mean?
The method is non-interfering if it does not change the source data underlying the stream. For example, in the example above, no lambda expressions make changes to the list array myList.
The method is a stateless (non-memorized) state if the order of the operation is determined For example, none of the lambda expressions in the example are dependent on variable variables or external space states that could change at run time.
Different types of threads
Streams can be created from various source data, mainly from collections. Lists (Lists) and set (Sets) support new methods
stream()
and parllelStream()
to create a serial and parallel flow. Parallel threads are able to work in multi-thread mode (on multiple threads) and will be discussed at the end of the tutorial. For now, consider sequential streams:Arrays.asList("a1", "a2", "a3")
.stream()
.findFirst()
.ifPresent(System.out::println); // a1
Here the method call
stream()
for the list returns the normal stream object. However, to work with a stream, it is not necessary to create a collection:
Stream.of("a1", "a2", "a3")
.findFirst()
.ifPresent(System.out::println); // a1
Just use
Stream.of()
to create a stream of multiple object references. In addition to the usual object streams, Java 8 has special types of threads for working with primitive types: int, long, double. As you can guess it
IntStream
, LongStream
, DoubleStream
. IntStream streams can replace regular for (;;) loops using
IntStream.range()
:IntStream.range(1, 4)
.forEach(System.out::println);
// 1// 2// 3
All of these streams for working with primitive types work in the same way as regular object streams except for the following:
- Primitive streams use special lambda expressions. For example, IntFunction instead of Function, or IntPredicate instead of Predicate.
- Primitive streams support additional terminal methods:
sum()
andaverage()
Arrays.stream(newint[] {1, 2, 3}) .map(n -> 2 * n + 1) .average() .ifPresent(System.out::println); // 5.0
Sometimes it is useful to turn a stream of objects into a stream of primitives or vice versa. For this purpose, object streams support special methods:
mapToInt()
, mapToLong()
, mapToDouble()
:Stream.of("a1", "a2", "a3")
.map(s -> s.substring(1))
.mapToInt(Integer::parseInt)
.max()
.ifPresent(System.out::println); // 3
Primitive streams can be converted to object streams by calling
mapToObj()
:IntStream.range(1, 4)
.mapToObj(i -> "a" + i)
.forEach(System.out::println);
// a1// a2// a3
In the following example, a stream of floating point numbers is mapped to a stream of integer numbers and then mapped to a stream of objects:
Stream.of(1.0, 2.0, 3.0)
.mapToInt(Double::intValue)
.mapToObj(i -> "a" + i)
.forEach(System.out::println);
// a1// a2// a3
Execution order
Now that we have learned how to create different streams and how to work with them, we dive deeper and consider how streaming operations look under the hood.
An important characteristic of intermediate methods is their laziness . In this example, there is no terminal method:
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
returntrue;
});
When this code fragment is executed, nothing will be output to the console. And all because intermediate methods are performed only if there is a terminal method. Let's extend the example by adding a terminal method
forEach
:Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
returntrue;
})
.forEach(s -> System.out.println("forEach: " + s));
Executing this code snippet results in the following output to the console:
filter: d2
forEach: d2
filter: a2
forEach: a2
filter: b1
forEach: b1
filter: b3
forEach: b3
filter: c
forEach: c
The order in which the results are located may surprise. It can be naive to expect that the methods will be executed “horizontally”: one after another for all elements of the stream. However, instead, the element moves “vertically” along the chain. First, the first line “d2” passes through the method and
filter
then through forEach
and only then, after the first element passes through the entire chain of methods, the next element begins to be processed. Given this behavior, you can reduce the actual number of operations:
Stream.of("d2", "a2", "b1", "b3", "c")
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.anyMatch(s -> {
System.out.println("anyMatch: " + s);
return s.startsWith("A");
});
// map: d2// anyMatch: D2// map: a2// anyMatch: A2
The method
anyMatch
returns true as soon as the predicate is applied to the input element. In this case, this is the second element of the sequence - “A2”. Accordingly, due to the “vertical” execution of the thread chain, it map
will only be called twice. Thus, instead of displaying all the elements of the stream, the map
minimum possible number of times will be called.Why sequence matters
The following example is composed of two intermediate methods
map
and filter
and terminal method forEach
. Consider how these methods are performed:Stream.of("d2", "a2", "b1", "b3", "c")
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("A");
})
.forEach(s -> System.out.println("forEach: " + s));
// map: d2// filter: D2// map: a2// filter: A2// forEach: A2// map: b1// filter: B1// map: b3// filter: B3// map: c// filter: C
It is not difficult to guess that both methods
map
and filter
are called 5 times for the execution - once for each element of the original collection, while forEach
is called only once - for the last filter element. You can significantly reduce the number of operations if you change the order of method calls by placing
filter
in the first place:Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("a");
})
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println("forEach: " + s));
// filter: d2// filter: a2// map: a2// forEach: A2// filter: b1// filter: b3// filter: c
Now the map is called only once. With a large number of incoming elements we will observe a tangible increase in performance. Keep this in mind when composing complex chains of methods.
Expand the above example by adding an additional sorting operation - method
sorted
:Stream.of("d2", "a2", "b1", "b3", "c")
.sorted((s1, s2) -> {
System.out.printf("sort: %s; %s\n", s1, s2);
return s1.compareTo(s2);
})
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("a");
})
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println("forEach: " + s));
Sorting is a special kind of intermediate operations. This so-called transaction stateful (stateful), since for sorting the collection is necessary to consider its condition throughout the operation.
As a result of the execution of this code, we get the following output to the console:
sort: a2; d2
sort: b1; a2
sort: b1; d2
sort: b1; a2
sort: b3; b1
sort: b3; d2
sort: c; b3
sort: c; d2
filter: a2
map: a2
forEach: A2
filter: b1
filter: b3
filter: c
filter: d2
First, the entire collection is sorted. In other words, the method
sorted
runs horizontally. In this case, it sorted
is called 8 times for several combinations of the elements of the incoming collection. Once again, we optimize the execution of this code by changing the order of method calls in the chain:
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("a");
})
.sorted((s1, s2) -> {
System.out.printf("sort: %s; %s\n", s1, s2);
return s1.compareTo(s2);
})
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println("forEach: " + s));
// filter: d2// filter: a2// filter: b1// filter: b3// filter: c// map: a2// forEach: A2
In this example,
sorted
it is not called at all. filter
reduces the input collection to one element. In the case of large input data, performance will benefit significantly.Reuse streams
In Java 8, threads cannot be reused. After calling any terminal method, the thread ends:
Stream<String> stream =
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> s.startsWith("a"));
stream.anyMatch(s -> true); // ok
stream.noneMatch(s -> true); // exception
A call
noneMatch
after anyMatch
in one thread results in the following exception:java.lang.IllegalStateException: stream has already been operated upon or closed
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
at com.winterbe.java8.Streams5.test7(Streams5.java:38)
at com.winterbe.java8.Streams5.main(Streams5.java:28)
To overcome this limitation, create a new thread for each terminal method.
For example, you can create a supplier for the constructor of a new flow in which all intermediate methods will be installed:
Supplier<Stream<String>> streamSupplier =
() -> Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> s.startsWith("a"));
streamSupplier.get().anyMatch(s -> true); // ok
streamSupplier.get().noneMatch(s -> true); // ok
Each method call
get
creates a new thread in which you can safely call the desired terminal method.Advanced methods
Streams support a large number of different methods. We have already become familiar with the most important methods. To get acquainted with the rest, refer to the documentation . And now we dive even deeper into more complex methods:
collect
, flatMap
and reduce
. Most of the code examples in this section refer to the following code snippet to demonstrate the work:
classPerson{
String name;
int age;
Person(String name, int age) {
this.name = name;
this.age = age;
}
@Overridepublic String toString(){
return name;
}
}
List<Person> persons =
Arrays.asList(
new Person("Max", 18),
new Person("Peter", 23),
new Person("Pamela", 23),
new Person("David", 12));
Collect
Collect
A very useful terminal method that converts stream elements to a different type of result, for example, List, Set, or Map. Collect
accepts Collector
, which contains four different methods: supplier (supplier). battery (accumulator), combiner, finisher (finisher). At first glance, this looks very difficult, but Java 8 supports various built-in collectors through a class Collectors
where the most used methods are implemented. Popular case:
List<Person> filtered =
persons
.stream()
.filter(p -> p.name.startsWith("P"))
.collect(Collectors.toList());
System.out.println(filtered); // [Peter, Pamela]
As you can see, creating a list of stream items is easy. Need not a list but a lot? Use
Collectors.toSet()
. In the following example, people are grouped by age:
Map<Integer, List<Person>> personsByAge = persons
.stream()
.collect(Collectors.groupingBy(p -> p.age));
personsByAge
.forEach((age, p) -> System.out.format("age %s: %s\n", age, p));
// age 18: [Max]// age 23: [Peter, Pamela]// age 12: [David]
The collectors are incredibly diverse. You can also aggregate the elements of a collection, for example, determine the average age:
Double averageAge = persons
.stream()
.collect(Collectors.averagingInt(p -> p.age));
System.out.println(averageAge); // 19.0
For more comprehensive statistics, we use the summarizing collector, which returns a special object with information: minimum, maximum and average values, the sum of values and the number of elements:
IntSummaryStatistics ageSummary =
persons
.stream()
.collect(Collectors.summarizingInt(p -> p.age));
System.out.println(ageSummary);
// IntSummaryStatistics{count=4, sum=76, min=12, average=19.000000, max=23}
The following example merges all names into a single line:
String phrase = persons
.stream()
.filter(p -> p.age >= 18)
.map(p -> p.name)
.collect(Collectors.joining(" and ", "In Germany ", " are of legal age."));
System.out.println(phrase);
// In Germany Max and Peter and Pamela are of legal age.
The connecting collector accepts a separator as well as an optional prefix and suffix.
To convert stream elements to a display, you must determine how keys and values should be displayed. Remember that the keys in the display must be unique. Otherwise, we get
IllegalStateException
. You can optionally add a merge function to bypass the exception:Map<Integer, String> map = persons
.stream()
.collect(Collectors.toMap(
p -> p.age,
p -> p.name,
(name1, name2) -> name1 + ";" + name2));
System.out.println(map);
// {18=Max, 23=Peter;Pamela, 12=David}
So, we’ve seen some of the most powerful inline collectors. Let's try to build your own. We want to convert all the elements of the stream into a single string, which consists of uppercase names, separated by a vertical bar |. To do this, create a new collector using
Collector.of()
. We need four components of our collector: supplier, battery, connector, finisher.Collector<Person, StringJoiner, String> personNameCollector =
Collector.of(
() -> new StringJoiner(" | "), // supplier
(j, p) -> j.add(p.name.toUpperCase()), // accumulator
(j1, j2) -> j1.merge(j2), // combiner
StringJoiner::toString); // finisher
String names = persons
.stream()
.collect(personNameCollector);
System.out.println(names); // MAX | PETER | PAMELA | DAVID
Since strings in Java are immutable, we need a type helper class
StringJoiner
that allows the collector to build a string for us. In the first stage, the supplier constructs StringJoiner
with the assigned delimiter. Battery is used to add each name to StringJoiner
. The connector knows how to connect two
StringJoiner
a to one. And at the end the finisher constructs the desired string from the StringJoiner
s.Flatmap
So, we learned how to turn stream objects into other types of objects using a method
map
. Map
- a kind of limited method, since each object can be mapped to just one other object. But what if you want to map one object to many others, or not to display it at all? Here the method helps out flatMap
. FlatMap
turns each stream object into a stream of other objects. The contents of these streams are then packaged into the returned method stream flatMap
. In order to look at
flatMap
in action, we construct a suitable type hierarchy for an example:classFoo{
String name;
List<Bar> bars = new ArrayList<>();
Foo(String name) {
this.name = name;
}
}
classBar{
String name;
Bar(String name) {
this.name = name;
}
}
Create several objects:
List<Foo> foos = new ArrayList<>();
// create foos
IntStream
.range(1, 4)
.forEach(i -> foos.add(new Foo("Foo" + i)));
// create bars
foos.forEach(f ->
IntStream
.range(1, 4)
.forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name))));
Now we have a list of three foo , each of which contains three bar .
FlatMap
accepts a function that should return a stream of objects. Thus, to access the bar objects of each foo , we just need to select the appropriate function:foos.stream()
.flatMap(f -> f.bars.stream())
.forEach(b -> System.out.println(b.name));
// Bar1 <- Foo1// Bar2 <- Foo1// Bar3 <- Foo1// Bar1 <- Foo2// Bar2 <- Foo2// Bar3 <- Foo2// Bar1 <- Foo3// Bar2 <- Foo3// Bar3 <- Foo3
So, we have successfully turned the stream from three foo objects into a stream from 9 bar objects .
Finally, all of the above code can be reduced to a simple pipeline of operations:
IntStream.range(1, 4)
.mapToObj(i -> new Foo("Foo" + i))
.peek(f -> IntStream.range(1, 4)
.mapToObj(i -> new Bar("Bar" + i + " <- " f.name))
.forEach(f.bars::add))
.flatMap(f -> f.bars.stream())
.forEach(b -> System.out.println(b.name));
FlatMap
It is also available in the class Optional
introduced in Java 8. FlatMap
the class Optional
returns an optional object of another class. This can be used to avoid cluttering up checks null
. Imagine a hierarchical structure like this:
classOuter{
Nested nested;
}
classNested{
Inner inner;
}
classInner{
String foo;
}
To get the nested string foo from an external object, you need to add multiple checks
null
to avoid NullPointException
:Outer outer = new Outer();
if (outer != null && outer.nested != null && outer.nested.inner != null) {
System.out.println(outer.nested.inner.foo);
}
The same can be achieved using the FlatMap class of the Optional class:
Optional.of(new Outer())
.flatMap(o -> Optional.ofNullable(o.nested))
.flatMap(n -> Optional.ofNullable(n.inner))
.flatMap(i -> Optional.ofNullable(i.foo))
.ifPresent(System.out::println);
Each call
flatMap
returns a wrapper Optional
for the desired object, if present, or for null
if there is no object.Reduce
The simplify operation combines all the elements of a stream into one result. Java 8 supports three different types of the reduce method.
The first reduces the flow of elements to a single stream element. We use this method to determine the element with the highest age:
persons
.stream()
.reduce((p1, p2) -> p1.age > p2.age ? p1 : p2)
.ifPresent(System.out::println); // Pamela
The method
reduce
takes an accumulating function with a binary operator (BinaryOperator). Here reduce
is a bi-function (BiFunction), where both arguments belong to the same type. In our case, to type Person . The bi-function is practically the same as функция
(Function), but it takes 2 arguments. In our example, the function compares the age of two people and returns the item with a greater age. The following type of method
reduce
takes both the initial value and the accumulator with the binary operator. This method can be used to create a new item. We have a Person with a name and age, consisting of the addition of all the names and the sum of past years:Person result =
persons
.stream()
.reduce(new Person("", 0), (p1, p2) -> {
p1.age += p2.age;
p1.name += p2.name;
return p1;
});
System.out.format("name=%s; age=%s", result.name, result.age);
// name=MaxPeterPamelaDavid; age=76
The third method
reduce
takes three parameters: the initial value, the accumulator with a bi-function, and the unifying function of the type of a binary operator. Since the initial value of the type is not limited to the type of Person, a reduction can be used to determine the sum of the living years of each person:Integer ageSum = persons
.stream()
.reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2);
System.out.println(ageSum); // 76
As you can see, we got the result of 76, but what really happens under the hood?
Expand the above code snippet with the debug text output:
Integer ageSum = persons
.stream()
.reduce(0,
(sum, p) -> {
System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
return sum += p.age;
},
(sum1, sum2) -> {
System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
return sum1 + sum2;
});
// accumulator: sum=0; person=Max// accumulator: sum=18; person=Peter// accumulator: sum=41; person=Pamela// accumulator: sum=64; person=David
As you can see, all the work is performed by the accumulating function. It is first called with the initial value of 0 and the first person is Max. In the next three steps, the sum constantly increases by the person’s age from the last step until it reaches the total age of 76.
And then what? Is the combiner never called? Consider parallel execution of this thread:
Integer ageSum = persons
.parallelStream()
.reduce(0,
(sum, p) -> {
System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
return sum += p.age;
},
(sum1, sum2) -> {
System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
return sum1 + sum2;
});
// accumulator: sum=0; person=Pamela// accumulator: sum=0; person=David// accumulator: sum=0; person=Max// accumulator: sum=0; person=Peter// combiner: sum1=18; sum2=23// combiner: sum1=23; sum2=12// combiner: sum1=41; sum2=35
With parallel execution, we get a completely different console output. Now the unifier is actually called. Since the battery was called in parallel, the combiner had to sum the values stored separately.
In the next chapter we will examine in more detail the parallel execution of threads.
Parallel streams
Streams can run in parallel to improve performance when working with large numbers of incoming items. Parallel threads use the usual
ForkJoinPool
accessible via static method call ForkJoinPool.commonPool()
. The size of the main thread pool can reach 5 threads of execution - the exact number depends on the number of available physical processor cores.ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism()); // 3
On my computer, the default thread pool is initialized with paralleling to 3 threads by default. This value can be increased or decreased by setting the following JVM parameter:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
Collections support a method
parallelStream()
for creating parallel data streams. You can also call an intermediate method parallel()
to turn a serial stream into a parallel stream. To understand the behavior of a thread in parallel execution, the following example prints information about each current thread (thread) into
System.out
:Arrays.asList("a1", "a2", "b1", "c2", "c1")
.parallelStream()
.filter(s -> {
System.out.format("filter: %s [%s]\n",
s, Thread.currentThread().getName());
returntrue;
})
.map(s -> {
System.out.format("map: %s [%s]\n",
s, Thread.currentThread().getName());
return s.toUpperCase();
})
.forEach(s -> System.out.format("forEach: %s [%s]\n",
s, Thread.currentThread().getName()));
Consider conclusions with debug entries to better understand which thread is used to perform specific stream methods:
filter: b1 [main]
filter: a2 [ForkJoinPool.commonPool-worker-1]
map: a2 [ForkJoinPool.commonPool-worker-1]
filter: c2 [ForkJoinPool.commonPool-worker-3]
map: c2 [ForkJoinPool.commonPool-worker-3]
filter: c1 [ForkJoinPool.commonPool-worker-2]
map: c1 [ForkJoinPool.commonPool-worker-2]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: A2 [ForkJoinPool.commonPool-worker-1]
map: b1 [main]
forEach: B1 [main]
filter: a1 [ForkJoinPool.commonPool-worker-3]
map: a1 [ForkJoinPool.commonPool-worker-3]
forEach: A1 [ForkJoinPool.commonPool-worker-3]
forEach: C1 [ForkJoinPool.commonPool-worker-2]
As you can see, in parallel execution of a data stream, all available threads of the current are used
ForkJoinPool
. The output sequence may differ, since the execution sequence of each specific thread is not defined (thread). Let's extend the example by adding a method
sort
:Arrays.asList("a1", "a2", "b1", "c2", "c1")
.parallelStream()
.filter(s -> {
System.out.format("filter: %s [%s]\n",
s, Thread.currentThread().getName());
returntrue;
})
.map(s -> {
System.out.format("map: %s [%s]\n",
s, Thread.currentThread().getName());
return s.toUpperCase();
})
.sorted((s1, s2) -> {
System.out.format("sort: %s <> %s [%s]\n",
s1, s2, Thread.currentThread().getName());
return s1.compareTo(s2);
})
.forEach(s -> System.out.format("forEach: %s [%s]\n",
s, Thread.currentThread().getName()));
At first glance, the result may seem strange:
filter: c2 [ForkJoinPool.commonPool-worker-3]
filter: c1 [ForkJoinPool.commonPool-worker-2]
map: c1 [ForkJoinPool.commonPool-worker-2]
filter: a2 [ForkJoinPool.commonPool-worker-1]
map: a2 [ForkJoinPool.commonPool-worker-1]
filter: b1 [main]
map: b1 [main]
filter: a1 [ForkJoinPool.commonPool-worker-2]
map: a1 [ForkJoinPool.commonPool-worker-2]
map: c2 [ForkJoinPool.commonPool-worker-3]
sort: A2 <> A1 [main]
sort: B1 <> A2 [main]
sort: C2 <> B1 [main]
sort: C1 <> C2 [main]
sort: C1 <> B1 [main]
sort: C1 <> C2 [main]
forEach: A1 [ForkJoinPool.commonPool-worker-1]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: B1 [main]
forEach: A2 [ForkJoinPool.commonPool-worker-2]
forEach: C1 [ForkJoinPool.commonPool-worker-1]
It seems to be
sort
executed sequentially and only in the main stream . In fact, when running a stream in parallel under the hood of a method sort
from the Stream API Arrays
, a class sorting method added in Java 8 is hidden Arrays.parallelSort()
. As stated in the documentation, this method, based on the length of the incoming collection, determines how exactly - the sorting will be performed in parallel or sequentially:If the length of a particular array is less than the minimum “grain”, sorting is performed by executing the Arrays.sort method.Let's return to the example with the method
reduce
from the previous chapter. We have already found out that the unifying function is called only when working in parallel with the thread. Consider which threads are involved:List<Person> persons = Arrays.asList(
new Person("Max", 18),
new Person("Peter", 23),
new Person("Pamela", 23),
new Person("David", 12));
persons
.parallelStream()
.reduce(0,
(sum, p) -> {
System.out.format("accumulator: sum=%s; person=%s [%s]\n",
sum, p, Thread.currentThread().getName());
return sum += p.age;
},
(sum1, sum2) -> {
System.out.format("combiner: sum1=%s; sum2=%s [%s]\n",
sum1, sum2, Thread.currentThread().getName());
return sum1 + sum2;
});
The console output shows that both functions, accumulating and unifying, are executed in parallel, using all possible streams:
accumulator: sum=0; person=Pamela; [main]
accumulator: sum=0; person=Max; [ForkJoinPool.commonPool-worker-3]
accumulator: sum=0; person=David; [ForkJoinPool.commonPool-worker-2]
accumulator: sum=0; person=Peter; [ForkJoinPool.commonPool-worker-1]
combiner: sum1=18; sum2=23; [ForkJoinPool.commonPool-worker-1]
combiner: sum1=23; sum2=12; [ForkJoinPool.commonPool-worker-2]
combiner: sum1=41; sum2=35; [ForkJoinPool.commonPool-worker-2]
It can be argued that parallel execution of the stream contributes to a significant increase in efficiency when working with large numbers of incoming elements. However, it should be remembered that some methods for parallel execution require additional calculations (unifying operations) that are not required for sequential execution.
In addition, for the parallel execution of the stream is used all the same
ForkJoinPool
, so widely used in the JVM. So the use of slow blocking methods of the flow can negatively affect the performance of the entire program, due to the blocking of threads (threads) used for processing in other tasks.That's all
My tutorial on using threads in Java 8 is over. For a more detailed study of working with streams, you can refer to the documentation . If you want to go deeper and learn more about the mechanisms underlying the work of the streams, you may be interested to read the article by Martin Fowler Collection Pipelines .
If you are also interested in JavaScript, you may want to take a look at Stream.js - a JavaScript implementation of the Java 8 Streams API. You might also want to read my Java 8 Tutorial ( Russian translation in Habré) and Java 8 Nashorn Tutorial articles .
I hope this tutorial was useful and interesting for you, and you enjoyed the process of reading. The full code is stored in GitHub . Feel free to create a branch in the repository.